1 ///////////////////////////////////////////////////////////////////////////////
4 /// \brief Compresses or uncompresses a file
6 // Copyright (C) 2007 Lasse Collin
8 // This program is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
18 ///////////////////////////////////////////////////////////////////////////////
29 /// We don't need this for *anything* but seems that at least with
30 /// glibc pthread_create() doesn't allow NULL.
38 /// Number of available threads
39 static size_t free_threads;
41 /// Thread-specific data
42 static thread_data *threads;
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
59 threads = malloc(sizeof(thread_data) * opt_threads);
60 if (threads == NULL) {
65 for (size_t i = 0; i < opt_threads; ++i)
66 memzero(&threads[i], sizeof(threads[0]));
68 if (pthread_attr_init(&thread_attr)
69 || pthread_attr_setdetachstate(
70 &thread_attr, PTHREAD_CREATE_DETACHED)) {
75 free_threads = opt_threads;
81 //////////////////////////
82 // Thread-specific data //
83 //////////////////////////
88 pthread_mutex_lock(&mutex);
90 while (free_threads == 0) {
91 pthread_cond_wait(&cond, &mutex);
94 pthread_cond_signal(&cond);
95 pthread_mutex_unlock(&mutex);
100 thread_data *t = threads;
107 pthread_mutex_unlock(&mutex);
114 release_thread_data(thread_data *t)
116 pthread_mutex_lock(&mutex);
121 pthread_cond_signal(&cond);
122 pthread_mutex_unlock(&mutex);
129 create_thread(void *(*func)(thread_data *t), thread_data *t)
131 if (opt_threads == 1) {
134 const int err = pthread_create(&t->thread, &thread_attr,
135 (void *(*)(void *))(func), t);
137 errmsg(V_ERROR, _("Cannot create a thread: %s"),
148 /////////////////////////
149 // One thread per file //
150 /////////////////////////
153 single_init(thread_data *t)
157 if (opt_mode == MODE_COMPRESS) {
158 if (opt_header == HEADER_ALONE) {
159 ret = lzma_alone_encoder(&t->strm,
160 opt_filters[0].options);
162 ret = lzma_stream_encoder(&t->strm,
163 opt_filters, opt_check);
166 // TODO Restrict file format if requested on the command line.
167 ret = lzma_auto_decoder(&t->strm, opt_memory,
168 LZMA_WARN_UNSUPPORTED_CHECK
169 | LZMA_CONCATENATED);
172 if (ret != LZMA_OK) {
173 if (ret == LZMA_MEM_ERROR)
186 single(thread_data *t)
188 if (single_init(t)) {
189 io_close(t->pair, false);
190 release_thread_data(t);
194 uint8_t in_buf[BUFSIZ];
195 uint8_t out_buf[BUFSIZ];
196 lzma_action action = LZMA_RUN;
197 bool success = false;
199 t->strm.avail_in = 0;
200 t->strm.next_out = out_buf;
201 t->strm.avail_out = BUFSIZ;
203 while (!user_abort) {
204 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
205 t->strm.next_in = in_buf;
206 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
208 if (t->strm.avail_in == SIZE_MAX)
211 if (t->pair->src_eof)
212 action = LZMA_FINISH;
215 const lzma_ret ret = lzma_code(&t->strm, action);
217 if ((t->strm.avail_out == 0 || ret != LZMA_OK)
218 && opt_mode != MODE_TEST) {
219 if (io_write(t->pair, out_buf,
220 BUFSIZ - t->strm.avail_out))
223 t->strm.next_out = out_buf;
224 t->strm.avail_out = BUFSIZ;
227 if (ret != LZMA_OK) {
228 if (ret == LZMA_STREAM_END) {
229 // FIXME !!! This doesn't work when decoding
230 // LZMA_Alone files, because LZMA_Alone decoder
231 // doesn't wait for LZMA_FINISH.
232 assert(t->pair->src_eof);
235 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
236 str_strm_error(ret));
243 io_close(t->pair, success);
244 release_thread_data(t);
250 ///////////////////////////////
251 // Multiple threads per file //
252 ///////////////////////////////
256 // I'm not sure what would the best way to implement this. Here's one
258 // - Reader thread would read the input data and control the coders threads.
259 // - Every coder thread is associated with input and output buffer pools.
260 // The input buffer pool is filled by reader thread, and the output buffer
261 // pool is emptied by the writer thread.
262 // - Writer thread writes the output data of the oldest living coder thread.
264 // The per-file thread started by the application's main thread is used as
265 // the reader thread. In the beginning, it starts the writer thread and the
266 // first coder thread. The coder thread would be left waiting for input from
267 // the reader thread, and the writer thread would be waiting for input from
270 // The reader thread reads the input data into a ring buffer, whose size
271 // depends on the value returned by lzma_chunk_size(). If the ring buffer
272 // gets full, the buffer is marked "to be finished", which indicates to
273 // the coder thread that no more input is coming. Then a new coder thread
281 uint8_t (*buffers)[BUFSIZ];
283 /// Number of buffers
286 /// buffers[read_pos] is the buffer currently being read. Once finish
287 /// is true and read_pos == write_pos, end of input has been reached.
290 /// buffers[write_pos] is the buffer into which data is currently
294 /// This variable matters only when read_pos == write_pos && finish.
295 /// In that case, this variable will contain the size of the
296 /// buffers[read_pos].
299 /// True once no more data is being written to the buffer. When this
300 /// is set, the last_size variable must have been set too.
303 /// Mutex to protect access to the variables in this structure
304 pthread_mutex_t mutex;
306 /// Condition to indicate when another thread can continue
312 multi_reader(thread_data *t)
317 const size_t size = io_read(t->pair,
318 m->buffers + m->write_pos, BUFSIZ);
319 if (size == SIZE_MAX) {
321 } else if (t->pair->src_eof) {
325 pthread_mutex_lock(&m->mutex);
327 if (++m->write_pos == m->buffer_count)
330 if (m->write_pos == m->read_pos || t->pair->src_eof)
333 pthread_cond_signal(&m->cond);
334 pthread_mutex_unlock(&m->mutex);
336 } while (!m->finish);
338 return done ? 0 : -1;
345 lzma_action = LZMA_RUN;
348 pthread_mutex_lock(&m->mutex);
350 while (m->read_pos == m->write_pos && !m->finish)
351 pthread_cond_wait(&m->cond, &m->mutex);
353 pthread_mutex_unlock(&m->mutex);
356 t->strm.avail_in = m->last_size;
357 if (opt_mode == MODE_COMPRESS)
358 action = LZMA_FINISH;
360 t->strm.avail_in = BUFSIZ;
363 t->strm.next_in = m->buffers + m->read_pos;
365 const lzma_ret ret = lzma_code(&t->strm, action);
373 ///////////////////////
374 // Starting new file //
375 ///////////////////////
378 process_file(const char *filename)
380 thread_data *t = get_thread_data();
382 return; // User abort
384 // If this fails, it shows appropriate error messages too.
385 t->pair = io_open(filename);
386 if (t->pair == NULL) {
387 release_thread_data(t);
391 // TODO Currently only one-thread-per-file mode is implemented.
393 if (create_thread(&single, t)) {
394 io_close(t->pair, false);
395 release_thread_data(t);