]> icculus.org git repositories - icculus/xz.git/blob - src/lzma/process.c
Update the code to mostly match the new simpler file format
[icculus/xz.git] / src / lzma / process.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       process.c
4 /// \brief      Compresses or uncompresses a file
5 //
6 //  Copyright (C) 2007 Lasse Collin
7 //
8 //  This program is free software; you can redistribute it and/or
9 //  modify it under the terms of the GNU Lesser General Public
10 //  License as published by the Free Software Foundation; either
11 //  version 2.1 of the License, or (at your option) any later version.
12 //
13 //  This program is distributed in the hope that it will be useful,
14 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
15 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 //  Lesser General Public License for more details.
17 //
18 ///////////////////////////////////////////////////////////////////////////////
19
20 #include "private.h"
21
22
23 typedef struct {
24         lzma_stream strm;
25         void *options;
26
27         file_pair *pair;
28
29         /// We don't need this for *anything* but seems that at least with
30         /// glibc pthread_create() doesn't allow NULL.
31         pthread_t thread;
32
33         bool in_use;
34
35 } thread_data;
36
37
38 /// Number of available threads
39 static size_t free_threads;
40
41 /// Thread-specific data
42 static thread_data *threads;
43
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
46
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
50
51
52 //////////
53 // Init //
54 //////////
55
56 extern void
57 process_init(void)
58 {
59         threads = malloc(sizeof(thread_data) * opt_threads);
60         if (threads == NULL) {
61                 out_of_memory();
62                 my_exit(ERROR);
63         }
64
65         for (size_t i = 0; i < opt_threads; ++i)
66                 threads[i] = (thread_data){
67                         .strm = LZMA_STREAM_INIT_VAR,
68                         .options = NULL,
69                         .pair = NULL,
70                         .in_use = false,
71                 };
72
73         if (pthread_attr_init(&thread_attr)
74                         || pthread_attr_setdetachstate(
75                                 &thread_attr, PTHREAD_CREATE_DETACHED)) {
76                 out_of_memory();
77                 my_exit(ERROR);
78         }
79
80         free_threads = opt_threads;
81
82         return;
83 }
84
85
86 //////////////////////////
87 // Thread-specific data //
88 //////////////////////////
89
90 static thread_data *
91 get_thread_data(void)
92 {
93         pthread_mutex_lock(&mutex);
94
95         while (free_threads == 0) {
96                 pthread_cond_wait(&cond, &mutex);
97
98                 if (user_abort) {
99                         pthread_cond_signal(&cond);
100                         pthread_mutex_unlock(&mutex);
101                         return NULL;
102                 }
103         }
104
105         thread_data *t = threads;
106         while (t->in_use)
107                 ++t;
108
109         t->in_use = true;
110         --free_threads;
111
112         pthread_mutex_unlock(&mutex);
113
114         return t;
115 }
116
117
118 static void
119 release_thread_data(thread_data *t)
120 {
121         pthread_mutex_lock(&mutex);
122
123         t->in_use = false;
124         ++free_threads;
125
126         pthread_cond_signal(&cond);
127         pthread_mutex_unlock(&mutex);
128
129         return;
130 }
131
132
133 static int
134 create_thread(void *(*func)(thread_data *t), thread_data *t)
135 {
136         if (opt_threads == 1) {
137                 func(t);
138         } else {
139                 const int err = pthread_create(&t->thread, &thread_attr,
140                                 (void *(*)(void *))(func), t);
141                 if (err) {
142                         errmsg(V_ERROR, _("Cannot create a thread: %s"),
143                                         strerror(err));
144                         user_abort = 1;
145                         return -1;
146                 }
147         }
148
149         return 0;
150 }
151
152
153 /////////////////////////
154 // One thread per file //
155 /////////////////////////
156
157 static int
158 single_init(thread_data *t)
159 {
160         lzma_ret ret;
161
162         if (opt_mode == MODE_COMPRESS) {
163                 if (opt_header == HEADER_ALONE) {
164                         ret = lzma_alone_encoder(&t->strm,
165                                         opt_filters[0].options);
166                 } else {
167                         ret = lzma_stream_encoder(&t->strm,
168                                         opt_filters, opt_check);
169                 }
170         } else {
171                 // TODO Restrict file format if requested on the command line.
172                 ret = lzma_auto_decoder(&t->strm);
173         }
174
175         if (ret != LZMA_OK) {
176                 if (ret == LZMA_MEM_ERROR)
177                         out_of_memory();
178                 else
179                         internal_error();
180
181                 return -1;
182         }
183
184         return 0;
185 }
186
187
188 static lzma_ret
189 single_skip_padding(thread_data *t, uint8_t *in_buf)
190 {
191         // Handle decoding of concatenated Streams. There can be arbitrary
192         // number of nul-byte padding between the Streams, which must be
193         // ignored.
194         //
195         // NOTE: Concatenating LZMA_Alone files works only if at least
196         // one of lc, lp, and pb is non-zero. Using the concatenation
197         // on LZMA_Alone files is strongly discouraged.
198         while (true) {
199                 while (t->strm.avail_in > 0) {
200                         if (*t->strm.next_in != '\0')
201                                 return LZMA_OK;
202
203                         ++t->strm.next_in;
204                         --t->strm.avail_in;
205                 }
206
207                 if (t->pair->src_eof)
208                         return LZMA_STREAM_END;
209
210                 t->strm.next_in = in_buf;
211                 t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
212                 if (t->strm.avail_in == SIZE_MAX)
213                         return LZMA_DATA_ERROR;
214         }
215 }
216
217
218 static void *
219 single(thread_data *t)
220 {
221         if (single_init(t)) {
222                 io_close(t->pair, false);
223                 release_thread_data(t);
224                 return NULL;
225         }
226
227         uint8_t in_buf[BUFSIZ];
228         uint8_t out_buf[BUFSIZ];
229         lzma_action action = LZMA_RUN;
230         lzma_ret ret;
231         bool success = false;
232
233         t->strm.avail_in = 0;
234
235         while (!user_abort) {
236                 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
237                         t->strm.next_in = in_buf;
238                         t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
239
240                         if (t->strm.avail_in == SIZE_MAX)
241                                 break;
242                         else if (t->pair->src_eof
243                                         && opt_mode == MODE_COMPRESS)
244                                 action = LZMA_FINISH;
245                 }
246
247                 t->strm.next_out = out_buf;
248                 t->strm.avail_out = BUFSIZ;
249
250                 ret = lzma_code(&t->strm, action);
251
252                 if (opt_mode != MODE_TEST)
253                         if (io_write(t->pair, out_buf,
254                                         BUFSIZ - t->strm.avail_out))
255                                 break;
256
257                 if (ret != LZMA_OK) {
258                         if (ret == LZMA_STREAM_END) {
259                                 if (opt_mode == MODE_COMPRESS) {
260                                         assert(t->pair->src_eof);
261                                         success = true;
262                                         break;
263                                 }
264
265                                 // Support decoding concatenated .lzma files.
266                                 ret = single_skip_padding(t, in_buf);
267
268                                 if (ret == LZMA_STREAM_END) {
269                                         assert(t->pair->src_eof);
270                                         success = true;
271                                         break;
272                                 }
273
274                                 if (ret == LZMA_OK && !single_init(t))
275                                         continue;
276
277                                 break;
278
279                         } else {
280                                 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
281                                                 str_strm_error(ret));
282                                 break;
283                         }
284                 }
285         }
286
287         io_close(t->pair, success);
288         release_thread_data(t);
289
290         return NULL;
291 }
292
293
294 ///////////////////////////////
295 // Multiple threads per file //
296 ///////////////////////////////
297
298 // TODO
299
300 // I'm not sure what would the best way to implement this. Here's one
301 // possible way:
302 //  - Reader thread would read the input data and control the coders threads.
303 //  - Every coder thread is associated with input and output buffer pools.
304 //    The input buffer pool is filled by reader thread, and the output buffer
305 //    pool is emptied by the writer thread.
306 //  - Writer thread writes the output data of the oldest living coder thread.
307 //
308 // The per-file thread started by the application's main thread is used as
309 // the reader thread. In the beginning, it starts the writer thread and the
310 // first coder thread. The coder thread would be left waiting for input from
311 // the reader thread, and the writer thread would be waiting for input from
312 // the coder thread.
313 //
314 // The reader thread reads the input data into a ring buffer, whose size
315 // depends on the value returned by lzma_chunk_size(). If the ring buffer
316 // gets full, the buffer is marked "to be finished", which indicates to
317 // the coder thread that no more input is coming. Then a new coder thread
318 // would be started.
319 //
320 // TODO
321
322 /*
323 typedef struct {
324         /// Buffers
325         uint8_t (*buffers)[BUFSIZ];
326
327         /// Number of buffers
328         size_t buffer_count;
329
330         /// buffers[read_pos] is the buffer currently being read. Once finish
331         /// is true and read_pos == write_pos, end of input has been reached.
332         size_t read_pos;
333
334         /// buffers[write_pos] is the buffer into which data is currently
335         /// being written.
336         size_t write_pos;
337
338         /// This variable matters only when read_pos == write_pos && finish.
339         /// In that case, this variable will contain the size of the
340         /// buffers[read_pos].
341         size_t last_size;
342
343         /// True once no more data is being written to the buffer. When this
344         /// is set, the last_size variable must have been set too.
345         bool finish;
346
347         /// Mutex to protect access to the variables in this structure
348         pthread_mutex_t mutex;
349
350         /// Condition to indicate when another thread can continue
351         pthread_cond_t cond;
352 } mem_pool;
353
354
355 static foo
356 multi_reader(thread_data *t)
357 {
358         bool done = false;
359
360         do {
361                 const size_t size = io_read(t->pair,
362                                 m->buffers + m->write_pos, BUFSIZ);
363                 if (size == SIZE_MAX) {
364                         // TODO
365                 } else if (t->pair->src_eof) {
366                         m->last_size = size;
367                 }
368
369                 pthread_mutex_lock(&m->mutex);
370
371                 if (++m->write_pos == m->buffer_count)
372                         m->write_pos = 0;
373
374                 if (m->write_pos == m->read_pos || t->pair->src_eof)
375                         m->finish = true;
376
377                 pthread_cond_signal(&m->cond);
378                 pthread_mutex_unlock(&m->mutex);
379
380         } while (!m->finish);
381
382         return done ? 0 : -1;
383 }
384
385
386 static foo
387 multi_code()
388 {
389         lzma_action = LZMA_RUN;
390
391         while (true) {
392                 pthread_mutex_lock(&m->mutex);
393
394                 while (m->read_pos == m->write_pos && !m->finish)
395                         pthread_cond_wait(&m->cond, &m->mutex);
396
397                 pthread_mutex_unlock(&m->mutex);
398
399                 if (m->finish) {
400                         t->strm.avail_in = m->last_size;
401                         if (opt_mode == MODE_COMPRESS)
402                                 action = LZMA_FINISH;
403                 } else {
404                         t->strm.avail_in = BUFSIZ;
405                 }
406
407                 t->strm.next_in = m->buffers + m->read_pos;
408
409                 const lzma_ret ret = lzma_code(&t->strm, action);
410
411         }
412 }
413
414 */
415
416
417 ///////////////////////
418 // Starting new file //
419 ///////////////////////
420
421 extern void
422 process_file(const char *filename)
423 {
424         thread_data *t = get_thread_data();
425         if (t == NULL)
426                 return; // User abort
427
428         // If this fails, it shows appropriate error messages too.
429         t->pair = io_open(filename);
430         if (t->pair == NULL) {
431                 release_thread_data(t);
432                 return;
433         }
434
435         // TODO Currently only one-thread-per-file mode is implemented.
436
437         if (create_thread(&single, t)) {
438                 io_close(t->pair, false);
439                 release_thread_data(t);
440         }
441
442         return;
443 }