1 ///////////////////////////////////////////////////////////////////////////////
4 /// \brief Compresses or uncompresses a file
6 // Copyright (C) 2007 Lasse Collin
8 // This program is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
18 ///////////////////////////////////////////////////////////////////////////////
29 /// We don't need this for *anything* but seems that at least with
30 /// glibc pthread_create() doesn't allow NULL.
38 /// Number of available threads
39 static size_t free_threads;
41 /// Thread-specific data
42 static thread_data *threads;
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
59 threads = malloc(sizeof(thread_data) * opt_threads);
60 if (threads == NULL) {
65 for (size_t i = 0; i < opt_threads; ++i)
66 memzero(&threads[i], sizeof(threads[0]));
68 if (pthread_attr_init(&thread_attr)
69 || pthread_attr_setdetachstate(
70 &thread_attr, PTHREAD_CREATE_DETACHED)) {
75 free_threads = opt_threads;
81 //////////////////////////
82 // Thread-specific data //
83 //////////////////////////
88 pthread_mutex_lock(&mutex);
90 while (free_threads == 0) {
91 pthread_cond_wait(&cond, &mutex);
94 pthread_cond_signal(&cond);
95 pthread_mutex_unlock(&mutex);
100 thread_data *t = threads;
107 pthread_mutex_unlock(&mutex);
114 release_thread_data(thread_data *t)
116 pthread_mutex_lock(&mutex);
121 pthread_cond_signal(&cond);
122 pthread_mutex_unlock(&mutex);
129 create_thread(void *(*func)(thread_data *t), thread_data *t)
131 if (opt_threads == 1) {
134 const int err = pthread_create(&t->thread, &thread_attr,
135 (void *(*)(void *))(func), t);
137 errmsg(V_ERROR, _("Cannot create a thread: %s"),
148 /////////////////////////
149 // One thread per file //
150 /////////////////////////
153 single_init(thread_data *t)
155 lzma_ret ret = LZMA_PROG_ERROR;
157 if (opt_mode == MODE_COMPRESS) {
158 switch (opt_format) {
160 // args.c ensures this.
165 ret = lzma_stream_encoder(&t->strm,
166 opt_filters, opt_check);
170 ret = lzma_alone_encoder(&t->strm,
171 opt_filters[0].options);
175 ret = lzma_raw_encoder(&t->strm, opt_filters);
179 const uint32_t flags = LZMA_TELL_UNSUPPORTED_CHECK
182 switch (opt_format) {
184 ret = lzma_auto_decoder(&t->strm, opt_memory, flags);
188 ret = lzma_stream_decoder(&t->strm, opt_memory, flags);
192 ret = lzma_alone_decoder(&t->strm, opt_memory);
196 // Memory usage has already been checked in args.c.
197 ret = lzma_raw_decoder(&t->strm, opt_filters);
202 if (ret != LZMA_OK) {
203 if (ret == LZMA_MEM_ERROR)
216 single(thread_data *t)
218 if (single_init(t)) {
219 io_close(t->pair, false);
220 release_thread_data(t);
224 uint8_t in_buf[BUFSIZ];
225 uint8_t out_buf[BUFSIZ];
226 lzma_action action = LZMA_RUN;
228 bool success = false;
230 t->strm.avail_in = 0;
231 t->strm.next_out = out_buf;
232 t->strm.avail_out = BUFSIZ;
234 while (!user_abort) {
235 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
236 t->strm.next_in = in_buf;
237 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
239 if (t->strm.avail_in == SIZE_MAX)
242 if (t->pair->src_eof)
243 action = LZMA_FINISH;
246 ret = lzma_code(&t->strm, action);
248 if ((t->strm.avail_out == 0 || ret != LZMA_OK)
249 && opt_mode != MODE_TEST) {
250 if (io_write(t->pair, out_buf,
251 BUFSIZ - t->strm.avail_out))
254 t->strm.next_out = out_buf;
255 t->strm.avail_out = BUFSIZ;
258 if (ret != LZMA_OK) {
259 // Check that there is no trailing garbage. This is
260 // needed for LZMA_Alone and raw streams.
261 if (ret == LZMA_STREAM_END && (t->strm.avail_in != 0
262 || (!t->pair->src_eof && io_read(
263 t->pair, in_buf, 1) != 0)))
264 ret = LZMA_DATA_ERROR;
266 if (ret != LZMA_STREAM_END) {
267 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
268 str_strm_error(ret));
272 assert(t->pair->src_eof);
278 io_close(t->pair, success);
279 release_thread_data(t);
285 ///////////////////////////////
286 // Multiple threads per file //
287 ///////////////////////////////
291 // I'm not sure what would the best way to implement this. Here's one
293 // - Reader thread would read the input data and control the coders threads.
294 // - Every coder thread is associated with input and output buffer pools.
295 // The input buffer pool is filled by reader thread, and the output buffer
296 // pool is emptied by the writer thread.
297 // - Writer thread writes the output data of the oldest living coder thread.
299 // The per-file thread started by the application's main thread is used as
300 // the reader thread. In the beginning, it starts the writer thread and the
301 // first coder thread. The coder thread would be left waiting for input from
302 // the reader thread, and the writer thread would be waiting for input from
305 // The reader thread reads the input data into a ring buffer, whose size
306 // depends on the value returned by lzma_chunk_size(). If the ring buffer
307 // gets full, the buffer is marked "to be finished", which indicates to
308 // the coder thread that no more input is coming. Then a new coder thread
316 uint8_t (*buffers)[BUFSIZ];
318 /// Number of buffers
321 /// buffers[read_pos] is the buffer currently being read. Once finish
322 /// is true and read_pos == write_pos, end of input has been reached.
325 /// buffers[write_pos] is the buffer into which data is currently
329 /// This variable matters only when read_pos == write_pos && finish.
330 /// In that case, this variable will contain the size of the
331 /// buffers[read_pos].
334 /// True once no more data is being written to the buffer. When this
335 /// is set, the last_size variable must have been set too.
338 /// Mutex to protect access to the variables in this structure
339 pthread_mutex_t mutex;
341 /// Condition to indicate when another thread can continue
347 multi_reader(thread_data *t)
352 const size_t size = io_read(t->pair,
353 m->buffers + m->write_pos, BUFSIZ);
354 if (size == SIZE_MAX) {
356 } else if (t->pair->src_eof) {
360 pthread_mutex_lock(&m->mutex);
362 if (++m->write_pos == m->buffer_count)
365 if (m->write_pos == m->read_pos || t->pair->src_eof)
368 pthread_cond_signal(&m->cond);
369 pthread_mutex_unlock(&m->mutex);
371 } while (!m->finish);
373 return done ? 0 : -1;
380 lzma_action = LZMA_RUN;
383 pthread_mutex_lock(&m->mutex);
385 while (m->read_pos == m->write_pos && !m->finish)
386 pthread_cond_wait(&m->cond, &m->mutex);
388 pthread_mutex_unlock(&m->mutex);
391 t->strm.avail_in = m->last_size;
392 if (opt_mode == MODE_COMPRESS)
393 action = LZMA_FINISH;
395 t->strm.avail_in = BUFSIZ;
398 t->strm.next_in = m->buffers + m->read_pos;
400 const lzma_ret ret = lzma_code(&t->strm, action);
408 ///////////////////////
409 // Starting new file //
410 ///////////////////////
413 process_file(const char *filename)
415 thread_data *t = get_thread_data();
417 return; // User abort
419 // If this fails, it shows appropriate error messages too.
420 t->pair = io_open(filename);
421 if (t->pair == NULL) {
422 release_thread_data(t);
426 // TODO Currently only one-thread-per-file mode is implemented.
428 if (create_thread(&single, t)) {
429 io_close(t->pair, false);
430 release_thread_data(t);