1 ///////////////////////////////////////////////////////////////////////////////
4 /// \brief Compresses or uncompresses a file
6 // Copyright (C) 2007 Lasse Collin
8 // This program is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
18 ///////////////////////////////////////////////////////////////////////////////
29 /// We don't need this for *anything* but seems that at least with
30 /// glibc pthread_create() doesn't allow NULL.
38 /// Number of available threads
39 static size_t free_threads;
41 /// Thread-specific data
42 static thread_data *threads;
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
59 threads = malloc(sizeof(thread_data) * opt_threads);
60 if (threads == NULL) {
65 for (size_t i = 0; i < opt_threads; ++i)
66 threads[i] = (thread_data){
67 .strm = LZMA_STREAM_INIT_VAR,
73 if (pthread_attr_init(&thread_attr)
74 || pthread_attr_setdetachstate(
75 &thread_attr, PTHREAD_CREATE_DETACHED)) {
80 free_threads = opt_threads;
86 //////////////////////////
87 // Thread-specific data //
88 //////////////////////////
93 pthread_mutex_lock(&mutex);
95 while (free_threads == 0) {
96 pthread_cond_wait(&cond, &mutex);
99 pthread_cond_signal(&cond);
100 pthread_mutex_unlock(&mutex);
105 thread_data *t = threads;
112 pthread_mutex_unlock(&mutex);
119 release_thread_data(thread_data *t)
121 pthread_mutex_lock(&mutex);
126 pthread_cond_signal(&cond);
127 pthread_mutex_unlock(&mutex);
134 create_thread(void *(*func)(thread_data *t), thread_data *t)
136 if (opt_threads == 1) {
139 const int err = pthread_create(&t->thread, &thread_attr,
140 (void *(*)(void *))(func), t);
142 errmsg(V_ERROR, _("Cannot create a thread: %s"),
153 /////////////////////////
154 // One thread per file //
155 /////////////////////////
158 single_init(thread_data *t)
162 if (opt_mode == MODE_COMPRESS) {
163 const lzma_vli uncompressed_size
164 = t->pair->src_fd != STDIN_FILENO
165 ? (lzma_vli)(t->pair->src_st.st_size)
166 : LZMA_VLI_VALUE_UNKNOWN;
168 // TODO Support Multi-Block Streams to store Extra.
169 if (opt_header == HEADER_ALONE) {
170 lzma_options_alone alone;
171 alone.uncompressed_size = uncompressed_size;
172 memcpy(&alone.lzma, opt_filters[0].options,
174 ret = lzma_alone_encoder(&t->strm, &alone);
176 lzma_options_stream stream = {
178 .has_crc32 = opt_check != LZMA_CHECK_NONE,
179 .uncompressed_size = uncompressed_size,
182 memcpy(stream.filters, opt_filters,
183 sizeof(stream.filters));
184 ret = lzma_stream_encoder_single(&t->strm, &stream);
187 // TODO Restrict file format if requested on the command line.
188 ret = lzma_auto_decoder(&t->strm, NULL, NULL);
191 if (ret != LZMA_OK) {
192 if (ret == LZMA_MEM_ERROR)
205 single_skip_padding(thread_data *t, uint8_t *in_buf)
207 // Handle decoding of concatenated Streams. There can be arbitrary
208 // number of nul-byte padding between the Streams, which must be
211 // NOTE: Concatenating LZMA_Alone files works only if at least
212 // one of lc, lp, and pb is non-zero. Using the concatenation
213 // on LZMA_Alone files is strongly discouraged.
215 while (t->strm.avail_in > 0) {
216 if (*t->strm.next_in != '\0')
223 if (t->pair->src_eof)
224 return LZMA_STREAM_END;
226 t->strm.next_in = in_buf;
227 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
228 if (t->strm.avail_in == SIZE_MAX)
229 return LZMA_DATA_ERROR;
235 single(thread_data *t)
237 if (single_init(t)) {
238 io_close(t->pair, false);
239 release_thread_data(t);
243 uint8_t in_buf[BUFSIZ];
244 uint8_t out_buf[BUFSIZ];
245 lzma_action action = LZMA_RUN;
247 bool success = false;
249 t->strm.avail_in = 0;
251 while (!user_abort) {
252 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
253 t->strm.next_in = in_buf;
254 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
256 if (t->strm.avail_in == SIZE_MAX)
258 else if (t->pair->src_eof
259 && opt_mode == MODE_COMPRESS)
260 action = LZMA_FINISH;
263 t->strm.next_out = out_buf;
264 t->strm.avail_out = BUFSIZ;
266 ret = lzma_code(&t->strm, action);
268 if (opt_mode != MODE_TEST)
269 if (io_write(t->pair, out_buf,
270 BUFSIZ - t->strm.avail_out))
273 if (ret != LZMA_OK) {
274 if (ret == LZMA_STREAM_END) {
275 if (opt_mode == MODE_COMPRESS) {
276 assert(t->pair->src_eof);
281 // Support decoding concatenated .lzma files.
282 ret = single_skip_padding(t, in_buf);
284 if (ret == LZMA_STREAM_END) {
285 assert(t->pair->src_eof);
290 if (ret == LZMA_OK && !single_init(t))
296 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
297 str_strm_error(ret));
303 io_close(t->pair, success);
304 release_thread_data(t);
310 ///////////////////////////////
311 // Multiple threads per file //
312 ///////////////////////////////
316 // I'm not sure what would the best way to implement this. Here's one
318 // - Reader thread would read the input data and control the coders threads.
319 // - Every coder thread is associated with input and output buffer pools.
320 // The input buffer pool is filled by reader thread, and the output buffer
321 // pool is emptied by the writer thread.
322 // - Writer thread writes the output data of the oldest living coder thread.
324 // The per-file thread started by the application's main thread is used as
325 // the reader thread. In the beginning, it starts the writer thread and the
326 // first coder thread. The coder thread would be left waiting for input from
327 // the reader thread, and the writer thread would be waiting for input from
330 // The reader thread reads the input data into a ring buffer, whose size
331 // depends on the value returned by lzma_chunk_size(). If the ring buffer
332 // gets full, the buffer is marked "to be finished", which indicates to
333 // the coder thread that no more input is coming. Then a new coder thread
341 uint8_t (*buffers)[BUFSIZ];
343 /// Number of buffers
346 /// buffers[read_pos] is the buffer currently being read. Once finish
347 /// is true and read_pos == write_pos, end of input has been reached.
350 /// buffers[write_pos] is the buffer into which data is currently
354 /// This variable matters only when read_pos == write_pos && finish.
355 /// In that case, this variable will contain the size of the
356 /// buffers[read_pos].
359 /// True once no more data is being written to the buffer. When this
360 /// is set, the last_size variable must have been set too.
363 /// Mutex to protect access to the variables in this structure
364 pthread_mutex_t mutex;
366 /// Condition to indicate when another thread can continue
372 multi_reader(thread_data *t)
377 const size_t size = io_read(t->pair,
378 m->buffers + m->write_pos, BUFSIZ);
379 if (size == SIZE_MAX) {
381 } else if (t->pair->src_eof) {
385 pthread_mutex_lock(&m->mutex);
387 if (++m->write_pos == m->buffer_count)
390 if (m->write_pos == m->read_pos || t->pair->src_eof)
393 pthread_cond_signal(&m->cond);
394 pthread_mutex_unlock(&m->mutex);
396 } while (!m->finish);
398 return done ? 0 : -1;
405 lzma_action = LZMA_RUN;
408 pthread_mutex_lock(&m->mutex);
410 while (m->read_pos == m->write_pos && !m->finish)
411 pthread_cond_wait(&m->cond, &m->mutex);
413 pthread_mutex_unlock(&m->mutex);
416 t->strm.avail_in = m->last_size;
417 if (opt_mode == MODE_COMPRESS)
418 action = LZMA_FINISH;
420 t->strm.avail_in = BUFSIZ;
423 t->strm.next_in = m->buffers + m->read_pos;
425 const lzma_ret ret = lzma_code(&t->strm, action);
433 ///////////////////////
434 // Starting new file //
435 ///////////////////////
438 process_file(const char *filename)
440 thread_data *t = get_thread_data();
442 return; // User abort
444 // If this fails, it shows appropriate error messages too.
445 t->pair = io_open(filename);
446 if (t->pair == NULL) {
447 release_thread_data(t);
451 // TODO Currently only one-thread-per-file mode is implemented.
453 if (create_thread(&single, t)) {
454 io_close(t->pair, false);
455 release_thread_data(t);