]> icculus.org git repositories - icculus/xz.git/blob - src/lzma/process.c
Made the preset numbering more logical in liblzma API.
[icculus/xz.git] / src / lzma / process.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       process.c
4 /// \brief      Compresses or uncompresses a file
5 //
6 //  Copyright (C) 2007 Lasse Collin
7 //
8 //  This program is free software; you can redistribute it and/or
9 //  modify it under the terms of the GNU Lesser General Public
10 //  License as published by the Free Software Foundation; either
11 //  version 2.1 of the License, or (at your option) any later version.
12 //
13 //  This program is distributed in the hope that it will be useful,
14 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
15 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 //  Lesser General Public License for more details.
17 //
18 ///////////////////////////////////////////////////////////////////////////////
19
20 #include "private.h"
21
22
23 typedef struct {
24         lzma_stream strm;
25         void *options;
26
27         file_pair *pair;
28
29         /// We don't need this for *anything* but seems that at least with
30         /// glibc pthread_create() doesn't allow NULL.
31         pthread_t thread;
32
33         bool in_use;
34
35 } thread_data;
36
37
38 /// Number of available threads
39 static size_t free_threads;
40
41 /// Thread-specific data
42 static thread_data *threads;
43
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
46
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
50
51
52 //////////
53 // Init //
54 //////////
55
56 extern void
57 process_init(void)
58 {
59         threads = malloc(sizeof(thread_data) * opt_threads);
60         if (threads == NULL) {
61                 out_of_memory();
62                 my_exit(ERROR);
63         }
64
65         for (size_t i = 0; i < opt_threads; ++i)
66                 memzero(&threads[i], sizeof(threads[0]));
67
68         if (pthread_attr_init(&thread_attr)
69                         || pthread_attr_setdetachstate(
70                                 &thread_attr, PTHREAD_CREATE_DETACHED)) {
71                 out_of_memory();
72                 my_exit(ERROR);
73         }
74
75         free_threads = opt_threads;
76
77         return;
78 }
79
80
81 //////////////////////////
82 // Thread-specific data //
83 //////////////////////////
84
85 static thread_data *
86 get_thread_data(void)
87 {
88         pthread_mutex_lock(&mutex);
89
90         while (free_threads == 0) {
91                 pthread_cond_wait(&cond, &mutex);
92
93                 if (user_abort) {
94                         pthread_cond_signal(&cond);
95                         pthread_mutex_unlock(&mutex);
96                         return NULL;
97                 }
98         }
99
100         thread_data *t = threads;
101         while (t->in_use)
102                 ++t;
103
104         t->in_use = true;
105         --free_threads;
106
107         pthread_mutex_unlock(&mutex);
108
109         return t;
110 }
111
112
113 static void
114 release_thread_data(thread_data *t)
115 {
116         pthread_mutex_lock(&mutex);
117
118         t->in_use = false;
119         ++free_threads;
120
121         pthread_cond_signal(&cond);
122         pthread_mutex_unlock(&mutex);
123
124         return;
125 }
126
127
128 static int
129 create_thread(void *(*func)(thread_data *t), thread_data *t)
130 {
131         if (opt_threads == 1) {
132                 func(t);
133         } else {
134                 const int err = pthread_create(&t->thread, &thread_attr,
135                                 (void *(*)(void *))(func), t);
136                 if (err) {
137                         errmsg(V_ERROR, _("Cannot create a thread: %s"),
138                                         strerror(err));
139                         user_abort = 1;
140                         return -1;
141                 }
142         }
143
144         return 0;
145 }
146
147
148 /////////////////////////
149 // One thread per file //
150 /////////////////////////
151
152 static int
153 single_init(thread_data *t)
154 {
155         lzma_ret ret = LZMA_PROG_ERROR;
156
157         if (opt_mode == MODE_COMPRESS) {
158                 switch (opt_format) {
159                 case FORMAT_AUTO:
160                         // args.c ensures this.
161                         assert(0);
162                         break;
163
164                 case FORMAT_XZ:
165                         ret = lzma_stream_encoder(&t->strm,
166                                         opt_filters, opt_check);
167                         break;
168
169                 case FORMAT_LZMA:
170                         ret = lzma_alone_encoder(&t->strm,
171                                         opt_filters[0].options);
172                         break;
173
174                 case FORMAT_RAW:
175                         ret = lzma_raw_encoder(&t->strm, opt_filters);
176                         break;
177                 }
178         } else {
179                 const uint32_t flags = LZMA_TELL_UNSUPPORTED_CHECK
180                                 | LZMA_CONCATENATED;
181
182                 switch (opt_format) {
183                 case FORMAT_AUTO:
184                         ret = lzma_auto_decoder(&t->strm, opt_memory, flags);
185                         break;
186
187                 case FORMAT_XZ:
188                         ret = lzma_stream_decoder(&t->strm, opt_memory, flags);
189                         break;
190
191                 case FORMAT_LZMA:
192                         ret = lzma_alone_decoder(&t->strm, opt_memory);
193                         break;
194
195                 case FORMAT_RAW:
196                         // Memory usage has already been checked in args.c.
197                         ret = lzma_raw_decoder(&t->strm, opt_filters);
198                         break;
199                 }
200         }
201
202         if (ret != LZMA_OK) {
203                 if (ret == LZMA_MEM_ERROR)
204                         out_of_memory();
205                 else
206                         internal_error();
207
208                 return -1;
209         }
210
211         return 0;
212 }
213
214
215 static void *
216 single(thread_data *t)
217 {
218         if (single_init(t)) {
219                 io_close(t->pair, false);
220                 release_thread_data(t);
221                 return NULL;
222         }
223
224         uint8_t in_buf[BUFSIZ];
225         uint8_t out_buf[BUFSIZ];
226         lzma_action action = LZMA_RUN;
227         lzma_ret ret;
228         bool success = false;
229
230         t->strm.avail_in = 0;
231         t->strm.next_out = out_buf;
232         t->strm.avail_out = BUFSIZ;
233
234         while (!user_abort) {
235                 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
236                         t->strm.next_in = in_buf;
237                         t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
238
239                         if (t->strm.avail_in == SIZE_MAX)
240                                 break;
241
242                         if (t->pair->src_eof)
243                                 action = LZMA_FINISH;
244                 }
245
246                 ret = lzma_code(&t->strm, action);
247
248                 if ((t->strm.avail_out == 0 || ret != LZMA_OK)
249                                 && opt_mode != MODE_TEST) {
250                         if (io_write(t->pair, out_buf,
251                                         BUFSIZ - t->strm.avail_out))
252                                 break;
253
254                         t->strm.next_out = out_buf;
255                         t->strm.avail_out = BUFSIZ;
256                 }
257
258                 if (ret != LZMA_OK) {
259                         // Check that there is no trailing garbage. This is
260                         // needed for LZMA_Alone and raw streams.
261                         if (ret == LZMA_STREAM_END && (t->strm.avail_in != 0
262                                         || (!t->pair->src_eof && io_read(
263                                                 t->pair, in_buf, 1) != 0)))
264                                 ret = LZMA_DATA_ERROR;
265
266                         if (ret != LZMA_STREAM_END) {
267                                 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
268                                                 str_strm_error(ret));
269                                 break;
270                         }
271
272                         assert(t->pair->src_eof);
273                         success = true;
274                         break;
275                 }
276         }
277
278         io_close(t->pair, success);
279         release_thread_data(t);
280
281         return NULL;
282 }
283
284
285 ///////////////////////////////
286 // Multiple threads per file //
287 ///////////////////////////////
288
289 // TODO
290
291 // I'm not sure what would the best way to implement this. Here's one
292 // possible way:
293 //  - Reader thread would read the input data and control the coders threads.
294 //  - Every coder thread is associated with input and output buffer pools.
295 //    The input buffer pool is filled by reader thread, and the output buffer
296 //    pool is emptied by the writer thread.
297 //  - Writer thread writes the output data of the oldest living coder thread.
298 //
299 // The per-file thread started by the application's main thread is used as
300 // the reader thread. In the beginning, it starts the writer thread and the
301 // first coder thread. The coder thread would be left waiting for input from
302 // the reader thread, and the writer thread would be waiting for input from
303 // the coder thread.
304 //
305 // The reader thread reads the input data into a ring buffer, whose size
306 // depends on the value returned by lzma_chunk_size(). If the ring buffer
307 // gets full, the buffer is marked "to be finished", which indicates to
308 // the coder thread that no more input is coming. Then a new coder thread
309 // would be started.
310 //
311 // TODO
312
313 /*
314 typedef struct {
315         /// Buffers
316         uint8_t (*buffers)[BUFSIZ];
317
318         /// Number of buffers
319         size_t buffer_count;
320
321         /// buffers[read_pos] is the buffer currently being read. Once finish
322         /// is true and read_pos == write_pos, end of input has been reached.
323         size_t read_pos;
324
325         /// buffers[write_pos] is the buffer into which data is currently
326         /// being written.
327         size_t write_pos;
328
329         /// This variable matters only when read_pos == write_pos && finish.
330         /// In that case, this variable will contain the size of the
331         /// buffers[read_pos].
332         size_t last_size;
333
334         /// True once no more data is being written to the buffer. When this
335         /// is set, the last_size variable must have been set too.
336         bool finish;
337
338         /// Mutex to protect access to the variables in this structure
339         pthread_mutex_t mutex;
340
341         /// Condition to indicate when another thread can continue
342         pthread_cond_t cond;
343 } mem_pool;
344
345
346 static foo
347 multi_reader(thread_data *t)
348 {
349         bool done = false;
350
351         do {
352                 const size_t size = io_read(t->pair,
353                                 m->buffers + m->write_pos, BUFSIZ);
354                 if (size == SIZE_MAX) {
355                         // TODO
356                 } else if (t->pair->src_eof) {
357                         m->last_size = size;
358                 }
359
360                 pthread_mutex_lock(&m->mutex);
361
362                 if (++m->write_pos == m->buffer_count)
363                         m->write_pos = 0;
364
365                 if (m->write_pos == m->read_pos || t->pair->src_eof)
366                         m->finish = true;
367
368                 pthread_cond_signal(&m->cond);
369                 pthread_mutex_unlock(&m->mutex);
370
371         } while (!m->finish);
372
373         return done ? 0 : -1;
374 }
375
376
377 static foo
378 multi_code()
379 {
380         lzma_action = LZMA_RUN;
381
382         while (true) {
383                 pthread_mutex_lock(&m->mutex);
384
385                 while (m->read_pos == m->write_pos && !m->finish)
386                         pthread_cond_wait(&m->cond, &m->mutex);
387
388                 pthread_mutex_unlock(&m->mutex);
389
390                 if (m->finish) {
391                         t->strm.avail_in = m->last_size;
392                         if (opt_mode == MODE_COMPRESS)
393                                 action = LZMA_FINISH;
394                 } else {
395                         t->strm.avail_in = BUFSIZ;
396                 }
397
398                 t->strm.next_in = m->buffers + m->read_pos;
399
400                 const lzma_ret ret = lzma_code(&t->strm, action);
401
402         }
403 }
404
405 */
406
407
408 ///////////////////////
409 // Starting new file //
410 ///////////////////////
411
412 extern void
413 process_file(const char *filename)
414 {
415         thread_data *t = get_thread_data();
416         if (t == NULL)
417                 return; // User abort
418
419         // If this fails, it shows appropriate error messages too.
420         t->pair = io_open(filename);
421         if (t->pair == NULL) {
422                 release_thread_data(t);
423                 return;
424         }
425
426         // TODO Currently only one-thread-per-file mode is implemented.
427
428         if (create_thread(&single, t)) {
429                 io_close(t->pair, false);
430                 release_thread_data(t);
431         }
432
433         return;
434 }