1 ///////////////////////////////////////////////////////////////////////////////
4 /// \brief Compresses or uncompresses a file
6 // Copyright (C) 2007 Lasse Collin
8 // This program is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
18 ///////////////////////////////////////////////////////////////////////////////
29 /// We don't need this for *anything* but seems that at least with
30 /// glibc pthread_create() doesn't allow NULL.
38 /// Number of available threads
39 static size_t free_threads;
41 /// Thread-specific data
42 static thread_data *threads;
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
59 threads = malloc(sizeof(thread_data) * opt_threads);
60 if (threads == NULL) {
65 for (size_t i = 0; i < opt_threads; ++i)
66 threads[i] = (thread_data){
67 .strm = LZMA_STREAM_INIT_VAR,
73 if (pthread_attr_init(&thread_attr)
74 || pthread_attr_setdetachstate(
75 &thread_attr, PTHREAD_CREATE_DETACHED)) {
80 free_threads = opt_threads;
86 //////////////////////////
87 // Thread-specific data //
88 //////////////////////////
93 pthread_mutex_lock(&mutex);
95 while (free_threads == 0) {
96 pthread_cond_wait(&cond, &mutex);
99 pthread_cond_signal(&cond);
100 pthread_mutex_unlock(&mutex);
105 thread_data *t = threads;
112 pthread_mutex_unlock(&mutex);
119 release_thread_data(thread_data *t)
121 pthread_mutex_lock(&mutex);
126 pthread_cond_signal(&cond);
127 pthread_mutex_unlock(&mutex);
134 create_thread(void *(*func)(thread_data *t), thread_data *t)
136 if (opt_threads == 1) {
139 const int err = pthread_create(&t->thread, &thread_attr,
140 (void *(*)(void *))(func), t);
142 errmsg(V_ERROR, _("Cannot create a thread: %s"),
153 /////////////////////////
154 // One thread per file //
155 /////////////////////////
158 single_init(thread_data *t)
162 if (opt_mode == MODE_COMPRESS) {
163 if (opt_header == HEADER_ALONE) {
164 ret = lzma_alone_encoder(&t->strm,
165 opt_filters[0].options);
167 ret = lzma_stream_encoder(&t->strm,
168 opt_filters, opt_check);
171 // TODO Restrict file format if requested on the command line.
172 ret = lzma_auto_decoder(&t->strm);
175 if (ret != LZMA_OK) {
176 if (ret == LZMA_MEM_ERROR)
189 single_skip_padding(thread_data *t, uint8_t *in_buf)
191 // Handle decoding of concatenated Streams. There can be arbitrary
192 // number of nul-byte padding between the Streams, which must be
195 // NOTE: Concatenating LZMA_Alone files works only if at least
196 // one of lc, lp, and pb is non-zero. Using the concatenation
197 // on LZMA_Alone files is strongly discouraged.
199 while (t->strm.avail_in > 0) {
200 if (*t->strm.next_in != '\0')
207 if (t->pair->src_eof)
208 return LZMA_STREAM_END;
210 t->strm.next_in = in_buf;
211 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
212 if (t->strm.avail_in == SIZE_MAX)
213 return LZMA_DATA_ERROR;
219 single(thread_data *t)
221 if (single_init(t)) {
222 io_close(t->pair, false);
223 release_thread_data(t);
227 uint8_t in_buf[BUFSIZ];
228 uint8_t out_buf[BUFSIZ];
229 lzma_action action = LZMA_RUN;
231 bool success = false;
233 t->strm.avail_in = 0;
235 while (!user_abort) {
236 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
237 t->strm.next_in = in_buf;
238 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
240 if (t->strm.avail_in == SIZE_MAX)
242 else if (t->pair->src_eof
243 && opt_mode == MODE_COMPRESS)
244 action = LZMA_FINISH;
247 t->strm.next_out = out_buf;
248 t->strm.avail_out = BUFSIZ;
250 ret = lzma_code(&t->strm, action);
252 if (opt_mode != MODE_TEST)
253 if (io_write(t->pair, out_buf,
254 BUFSIZ - t->strm.avail_out))
257 if (ret != LZMA_OK) {
258 if (ret == LZMA_STREAM_END) {
259 if (opt_mode == MODE_COMPRESS) {
260 assert(t->pair->src_eof);
265 // Support decoding concatenated .lzma files.
266 ret = single_skip_padding(t, in_buf);
268 if (ret == LZMA_STREAM_END) {
269 assert(t->pair->src_eof);
274 if (ret == LZMA_OK && !single_init(t))
280 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
281 str_strm_error(ret));
287 io_close(t->pair, success);
288 release_thread_data(t);
294 ///////////////////////////////
295 // Multiple threads per file //
296 ///////////////////////////////
300 // I'm not sure what would the best way to implement this. Here's one
302 // - Reader thread would read the input data and control the coders threads.
303 // - Every coder thread is associated with input and output buffer pools.
304 // The input buffer pool is filled by reader thread, and the output buffer
305 // pool is emptied by the writer thread.
306 // - Writer thread writes the output data of the oldest living coder thread.
308 // The per-file thread started by the application's main thread is used as
309 // the reader thread. In the beginning, it starts the writer thread and the
310 // first coder thread. The coder thread would be left waiting for input from
311 // the reader thread, and the writer thread would be waiting for input from
314 // The reader thread reads the input data into a ring buffer, whose size
315 // depends on the value returned by lzma_chunk_size(). If the ring buffer
316 // gets full, the buffer is marked "to be finished", which indicates to
317 // the coder thread that no more input is coming. Then a new coder thread
325 uint8_t (*buffers)[BUFSIZ];
327 /// Number of buffers
330 /// buffers[read_pos] is the buffer currently being read. Once finish
331 /// is true and read_pos == write_pos, end of input has been reached.
334 /// buffers[write_pos] is the buffer into which data is currently
338 /// This variable matters only when read_pos == write_pos && finish.
339 /// In that case, this variable will contain the size of the
340 /// buffers[read_pos].
343 /// True once no more data is being written to the buffer. When this
344 /// is set, the last_size variable must have been set too.
347 /// Mutex to protect access to the variables in this structure
348 pthread_mutex_t mutex;
350 /// Condition to indicate when another thread can continue
356 multi_reader(thread_data *t)
361 const size_t size = io_read(t->pair,
362 m->buffers + m->write_pos, BUFSIZ);
363 if (size == SIZE_MAX) {
365 } else if (t->pair->src_eof) {
369 pthread_mutex_lock(&m->mutex);
371 if (++m->write_pos == m->buffer_count)
374 if (m->write_pos == m->read_pos || t->pair->src_eof)
377 pthread_cond_signal(&m->cond);
378 pthread_mutex_unlock(&m->mutex);
380 } while (!m->finish);
382 return done ? 0 : -1;
389 lzma_action = LZMA_RUN;
392 pthread_mutex_lock(&m->mutex);
394 while (m->read_pos == m->write_pos && !m->finish)
395 pthread_cond_wait(&m->cond, &m->mutex);
397 pthread_mutex_unlock(&m->mutex);
400 t->strm.avail_in = m->last_size;
401 if (opt_mode == MODE_COMPRESS)
402 action = LZMA_FINISH;
404 t->strm.avail_in = BUFSIZ;
407 t->strm.next_in = m->buffers + m->read_pos;
409 const lzma_ret ret = lzma_code(&t->strm, action);
417 ///////////////////////
418 // Starting new file //
419 ///////////////////////
422 process_file(const char *filename)
424 thread_data *t = get_thread_data();
426 return; // User abort
428 // If this fails, it shows appropriate error messages too.
429 t->pair = io_open(filename);
430 if (t->pair == NULL) {
431 release_thread_data(t);
435 // TODO Currently only one-thread-per-file mode is implemented.
437 if (create_thread(&single, t)) {
438 io_close(t->pair, false);
439 release_thread_data(t);