]> icculus.org git repositories - icculus/xz.git/blob - src/lzma/process.c
Replaced the range decoder optimization that used arithmetic
[icculus/xz.git] / src / lzma / process.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       process.c
4 /// \brief      Compresses or uncompresses a file
5 //
6 //  Copyright (C) 2007 Lasse Collin
7 //
8 //  This program is free software; you can redistribute it and/or
9 //  modify it under the terms of the GNU Lesser General Public
10 //  License as published by the Free Software Foundation; either
11 //  version 2.1 of the License, or (at your option) any later version.
12 //
13 //  This program is distributed in the hope that it will be useful,
14 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
15 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 //  Lesser General Public License for more details.
17 //
18 ///////////////////////////////////////////////////////////////////////////////
19
20 #include "private.h"
21
22
23 typedef struct {
24         lzma_stream strm;
25         void *options;
26
27         file_pair *pair;
28
29         /// We don't need this for *anything* but seems that at least with
30         /// glibc pthread_create() doesn't allow NULL.
31         pthread_t thread;
32
33         bool in_use;
34
35 } thread_data;
36
37
38 /// Number of available threads
39 static size_t free_threads;
40
41 /// Thread-specific data
42 static thread_data *threads;
43
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
46
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
50
51
52 //////////
53 // Init //
54 //////////
55
56 extern void
57 process_init(void)
58 {
59         threads = malloc(sizeof(thread_data) * opt_threads);
60         if (threads == NULL) {
61                 out_of_memory();
62                 my_exit(ERROR);
63         }
64
65         for (size_t i = 0; i < opt_threads; ++i)
66                 threads[i] = (thread_data){
67                         .strm = LZMA_STREAM_INIT_VAR,
68                         .options = NULL,
69                         .pair = NULL,
70                         .in_use = false,
71                 };
72
73         if (pthread_attr_init(&thread_attr)
74                         || pthread_attr_setdetachstate(
75                                 &thread_attr, PTHREAD_CREATE_DETACHED)) {
76                 out_of_memory();
77                 my_exit(ERROR);
78         }
79
80         free_threads = opt_threads;
81
82         return;
83 }
84
85
86 //////////////////////////
87 // Thread-specific data //
88 //////////////////////////
89
90 static thread_data *
91 get_thread_data(void)
92 {
93         pthread_mutex_lock(&mutex);
94
95         while (free_threads == 0) {
96                 pthread_cond_wait(&cond, &mutex);
97
98                 if (user_abort) {
99                         pthread_cond_signal(&cond);
100                         pthread_mutex_unlock(&mutex);
101                         return NULL;
102                 }
103         }
104
105         thread_data *t = threads;
106         while (t->in_use)
107                 ++t;
108
109         t->in_use = true;
110         --free_threads;
111
112         pthread_mutex_unlock(&mutex);
113
114         return t;
115 }
116
117
118 static void
119 release_thread_data(thread_data *t)
120 {
121         pthread_mutex_lock(&mutex);
122
123         t->in_use = false;
124         ++free_threads;
125
126         pthread_cond_signal(&cond);
127         pthread_mutex_unlock(&mutex);
128
129         return;
130 }
131
132
133 static int
134 create_thread(void *(*func)(thread_data *t), thread_data *t)
135 {
136         if (opt_threads == 1) {
137                 func(t);
138         } else {
139                 const int err = pthread_create(&t->thread, &thread_attr,
140                                 (void *(*)(void *))(func), t);
141                 if (err) {
142                         errmsg(V_ERROR, _("Cannot create a thread: %s"),
143                                         strerror(err));
144                         user_abort = 1;
145                         return -1;
146                 }
147         }
148
149         return 0;
150 }
151
152
153 /////////////////////////
154 // One thread per file //
155 /////////////////////////
156
157 static int
158 single_init(thread_data *t)
159 {
160         lzma_ret ret;
161
162         if (opt_mode == MODE_COMPRESS) {
163                 const lzma_vli uncompressed_size
164                                 = t->pair->src_fd != STDIN_FILENO
165                                 ? (lzma_vli)(t->pair->src_st.st_size)
166                                 : LZMA_VLI_VALUE_UNKNOWN;
167
168                 // TODO Support Multi-Block Streams to store Extra.
169                 if (opt_header == HEADER_ALONE) {
170                         lzma_options_alone alone;
171                         alone.uncompressed_size = uncompressed_size;
172                         memcpy(&alone.lzma, opt_filters[0].options,
173                                         sizeof(alone.lzma));
174                         ret = lzma_alone_encoder(&t->strm, &alone);
175                 } else {
176                         lzma_options_stream stream = {
177                                 .check = opt_check,
178                                 .has_crc32 = opt_check != LZMA_CHECK_NONE,
179                                 .uncompressed_size = uncompressed_size,
180                                 .alignment = 0,
181                         };
182                         memcpy(stream.filters, opt_filters,
183                                         sizeof(stream.filters));
184                         ret = lzma_stream_encoder_single(&t->strm, &stream);
185                 }
186         } else {
187                 // TODO Restrict file format if requested on the command line.
188                 ret = lzma_auto_decoder(&t->strm, NULL, NULL);
189         }
190
191         if (ret != LZMA_OK) {
192                 if (ret == LZMA_MEM_ERROR)
193                         out_of_memory();
194                 else
195                         internal_error();
196
197                 return -1;
198         }
199
200         return 0;
201 }
202
203
204 static lzma_ret
205 single_skip_padding(thread_data *t, uint8_t *in_buf)
206 {
207         // Handle decoding of concatenated Streams. There can be arbitrary
208         // number of nul-byte padding between the Streams, which must be
209         // ignored.
210         //
211         // NOTE: Concatenating LZMA_Alone files works only if at least
212         // one of lc, lp, and pb is non-zero. Using the concatenation
213         // on LZMA_Alone files is strongly discouraged.
214         while (true) {
215                 while (t->strm.avail_in > 0) {
216                         if (*t->strm.next_in != '\0')
217                                 return LZMA_OK;
218
219                         ++t->strm.next_in;
220                         --t->strm.avail_in;
221                 }
222
223                 if (t->pair->src_eof)
224                         return LZMA_STREAM_END;
225
226                 t->strm.next_in = in_buf;
227                 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
228                 if (t->strm.avail_in == SIZE_MAX)
229                         return LZMA_DATA_ERROR;
230         }
231 }
232
233
234 static void *
235 single(thread_data *t)
236 {
237         if (single_init(t)) {
238                 io_close(t->pair, false);
239                 release_thread_data(t);
240                 return NULL;
241         }
242
243         uint8_t in_buf[BUFSIZ];
244         uint8_t out_buf[BUFSIZ];
245         lzma_action action = LZMA_RUN;
246         lzma_ret ret;
247         bool success = false;
248
249         t->strm.avail_in = 0;
250
251         while (!user_abort) {
252                 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
253                         t->strm.next_in = in_buf;
254                         t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
255
256                         if (t->strm.avail_in == SIZE_MAX)
257                                 break;
258                         else if (t->pair->src_eof
259                                         && opt_mode == MODE_COMPRESS)
260                                 action = LZMA_FINISH;
261                 }
262
263                 t->strm.next_out = out_buf;
264                 t->strm.avail_out = BUFSIZ;
265
266                 ret = lzma_code(&t->strm, action);
267
268                 if (opt_mode != MODE_TEST)
269                         if (io_write(t->pair, out_buf,
270                                         BUFSIZ - t->strm.avail_out))
271                                 break;
272
273                 if (ret != LZMA_OK) {
274                         if (ret == LZMA_STREAM_END) {
275                                 if (opt_mode == MODE_COMPRESS) {
276                                         assert(t->pair->src_eof);
277                                         success = true;
278                                         break;
279                                 }
280
281                                 // Support decoding concatenated .lzma files.
282                                 ret = single_skip_padding(t, in_buf);
283
284                                 if (ret == LZMA_STREAM_END) {
285                                         assert(t->pair->src_eof);
286                                         success = true;
287                                         break;
288                                 }
289
290                                 if (ret == LZMA_OK && !single_init(t))
291                                         continue;
292
293                                 break;
294
295                         } else {
296                                 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
297                                                 str_strm_error(ret));
298                                 break;
299                         }
300                 }
301         }
302
303         io_close(t->pair, success);
304         release_thread_data(t);
305
306         return NULL;
307 }
308
309
310 ///////////////////////////////
311 // Multiple threads per file //
312 ///////////////////////////////
313
314 // TODO
315
316 // I'm not sure what would the best way to implement this. Here's one
317 // possible way:
318 //  - Reader thread would read the input data and control the coders threads.
319 //  - Every coder thread is associated with input and output buffer pools.
320 //    The input buffer pool is filled by reader thread, and the output buffer
321 //    pool is emptied by the writer thread.
322 //  - Writer thread writes the output data of the oldest living coder thread.
323 //
324 // The per-file thread started by the application's main thread is used as
325 // the reader thread. In the beginning, it starts the writer thread and the
326 // first coder thread. The coder thread would be left waiting for input from
327 // the reader thread, and the writer thread would be waiting for input from
328 // the coder thread.
329 //
330 // The reader thread reads the input data into a ring buffer, whose size
331 // depends on the value returned by lzma_chunk_size(). If the ring buffer
332 // gets full, the buffer is marked "to be finished", which indicates to
333 // the coder thread that no more input is coming. Then a new coder thread
334 // would be started.
335 //
336 // TODO
337
338 /*
339 typedef struct {
340         /// Buffers
341         uint8_t (*buffers)[BUFSIZ];
342
343         /// Number of buffers
344         size_t buffer_count;
345
346         /// buffers[read_pos] is the buffer currently being read. Once finish
347         /// is true and read_pos == write_pos, end of input has been reached.
348         size_t read_pos;
349
350         /// buffers[write_pos] is the buffer into which data is currently
351         /// being written.
352         size_t write_pos;
353
354         /// This variable matters only when read_pos == write_pos && finish.
355         /// In that case, this variable will contain the size of the
356         /// buffers[read_pos].
357         size_t last_size;
358
359         /// True once no more data is being written to the buffer. When this
360         /// is set, the last_size variable must have been set too.
361         bool finish;
362
363         /// Mutex to protect access to the variables in this structure
364         pthread_mutex_t mutex;
365
366         /// Condition to indicate when another thread can continue
367         pthread_cond_t cond;
368 } mem_pool;
369
370
371 static foo
372 multi_reader(thread_data *t)
373 {
374         bool done = false;
375
376         do {
377                 const size_t size = io_read(t->pair,
378                                 m->buffers + m->write_pos, BUFSIZ);
379                 if (size == SIZE_MAX) {
380                         // TODO
381                 } else if (t->pair->src_eof) {
382                         m->last_size = size;
383                 }
384
385                 pthread_mutex_lock(&m->mutex);
386
387                 if (++m->write_pos == m->buffer_count)
388                         m->write_pos = 0;
389
390                 if (m->write_pos == m->read_pos || t->pair->src_eof)
391                         m->finish = true;
392
393                 pthread_cond_signal(&m->cond);
394                 pthread_mutex_unlock(&m->mutex);
395
396         } while (!m->finish);
397
398         return done ? 0 : -1;
399 }
400
401
402 static foo
403 multi_code()
404 {
405         lzma_action = LZMA_RUN;
406
407         while (true) {
408                 pthread_mutex_lock(&m->mutex);
409
410                 while (m->read_pos == m->write_pos && !m->finish)
411                         pthread_cond_wait(&m->cond, &m->mutex);
412
413                 pthread_mutex_unlock(&m->mutex);
414
415                 if (m->finish) {
416                         t->strm.avail_in = m->last_size;
417                         if (opt_mode == MODE_COMPRESS)
418                                 action = LZMA_FINISH;
419                 } else {
420                         t->strm.avail_in = BUFSIZ;
421                 }
422
423                 t->strm.next_in = m->buffers + m->read_pos;
424
425                 const lzma_ret ret = lzma_code(&t->strm, action);
426
427         }
428 }
429
430 */
431
432
433 ///////////////////////
434 // Starting new file //
435 ///////////////////////
436
437 extern void
438 process_file(const char *filename)
439 {
440         thread_data *t = get_thread_data();
441         if (t == NULL)
442                 return; // User abort
443
444         // If this fails, it shows appropriate error messages too.
445         t->pair = io_open(filename);
446         if (t->pair == NULL) {
447                 release_thread_data(t);
448                 return;
449         }
450
451         // TODO Currently only one-thread-per-file mode is implemented.
452
453         if (create_thread(&single, t)) {
454                 io_close(t->pair, false);
455                 release_thread_data(t);
456         }
457
458         return;
459 }