]> icculus.org git repositories - icculus/xz.git/blob - src/lzma/process.c
Sort of garbage collection commit. :-| Many things are still
[icculus/xz.git] / src / lzma / process.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       process.c
4 /// \brief      Compresses or uncompresses a file
5 //
6 //  Copyright (C) 2007 Lasse Collin
7 //
8 //  This program is free software; you can redistribute it and/or
9 //  modify it under the terms of the GNU Lesser General Public
10 //  License as published by the Free Software Foundation; either
11 //  version 2.1 of the License, or (at your option) any later version.
12 //
13 //  This program is distributed in the hope that it will be useful,
14 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
15 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 //  Lesser General Public License for more details.
17 //
18 ///////////////////////////////////////////////////////////////////////////////
19
20 #include "private.h"
21
22
23 typedef struct {
24         lzma_stream strm;
25         void *options;
26
27         file_pair *pair;
28
29         /// We don't need this for *anything* but seems that at least with
30         /// glibc pthread_create() doesn't allow NULL.
31         pthread_t thread;
32
33         bool in_use;
34
35 } thread_data;
36
37
38 /// Number of available threads
39 static size_t free_threads;
40
41 /// Thread-specific data
42 static thread_data *threads;
43
44 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
45 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
46
47 /// Attributes of new coder threads. They are created in detached state.
48 /// Coder threads signal to the service thread themselves when they are done.
49 static pthread_attr_t thread_attr;
50
51
52 //////////
53 // Init //
54 //////////
55
56 extern void
57 process_init(void)
58 {
59         threads = malloc(sizeof(thread_data) * opt_threads);
60         if (threads == NULL) {
61                 out_of_memory();
62                 my_exit(ERROR);
63         }
64
65         for (size_t i = 0; i < opt_threads; ++i)
66                 memzero(&threads[i], sizeof(threads[0]));
67
68         if (pthread_attr_init(&thread_attr)
69                         || pthread_attr_setdetachstate(
70                                 &thread_attr, PTHREAD_CREATE_DETACHED)) {
71                 out_of_memory();
72                 my_exit(ERROR);
73         }
74
75         free_threads = opt_threads;
76
77         return;
78 }
79
80
81 //////////////////////////
82 // Thread-specific data //
83 //////////////////////////
84
85 static thread_data *
86 get_thread_data(void)
87 {
88         pthread_mutex_lock(&mutex);
89
90         while (free_threads == 0) {
91                 pthread_cond_wait(&cond, &mutex);
92
93                 if (user_abort) {
94                         pthread_cond_signal(&cond);
95                         pthread_mutex_unlock(&mutex);
96                         return NULL;
97                 }
98         }
99
100         thread_data *t = threads;
101         while (t->in_use)
102                 ++t;
103
104         t->in_use = true;
105         --free_threads;
106
107         pthread_mutex_unlock(&mutex);
108
109         return t;
110 }
111
112
113 static void
114 release_thread_data(thread_data *t)
115 {
116         pthread_mutex_lock(&mutex);
117
118         t->in_use = false;
119         ++free_threads;
120
121         pthread_cond_signal(&cond);
122         pthread_mutex_unlock(&mutex);
123
124         return;
125 }
126
127
128 static int
129 create_thread(void *(*func)(thread_data *t), thread_data *t)
130 {
131         if (opt_threads == 1) {
132                 func(t);
133         } else {
134                 const int err = pthread_create(&t->thread, &thread_attr,
135                                 (void *(*)(void *))(func), t);
136                 if (err) {
137                         errmsg(V_ERROR, _("Cannot create a thread: %s"),
138                                         strerror(err));
139                         user_abort = 1;
140                         return -1;
141                 }
142         }
143
144         return 0;
145 }
146
147
148 /////////////////////////
149 // One thread per file //
150 /////////////////////////
151
152 static int
153 single_init(thread_data *t)
154 {
155         lzma_ret ret;
156
157         if (opt_mode == MODE_COMPRESS) {
158                 if (opt_header == HEADER_ALONE) {
159                         ret = lzma_alone_encoder(&t->strm,
160                                         opt_filters[0].options);
161                 } else {
162                         ret = lzma_stream_encoder(&t->strm,
163                                         opt_filters, opt_check);
164                 }
165         } else {
166                 // TODO Restrict file format if requested on the command line.
167                 ret = lzma_auto_decoder(&t->strm, opt_memory,
168                                 LZMA_WARN_UNSUPPORTED_CHECK
169                                         | LZMA_CONCATENATED);
170         }
171
172         if (ret != LZMA_OK) {
173                 if (ret == LZMA_MEM_ERROR)
174                         out_of_memory();
175                 else
176                         internal_error();
177
178                 return -1;
179         }
180
181         return 0;
182 }
183
184
185 static void *
186 single(thread_data *t)
187 {
188         if (single_init(t)) {
189                 io_close(t->pair, false);
190                 release_thread_data(t);
191                 return NULL;
192         }
193
194         uint8_t in_buf[BUFSIZ];
195         uint8_t out_buf[BUFSIZ];
196         lzma_action action = LZMA_RUN;
197         bool success = false;
198
199         t->strm.avail_in = 0;
200         t->strm.next_out = out_buf;
201         t->strm.avail_out = BUFSIZ;
202
203         while (!user_abort) {
204                 if (t->strm.avail_in == 0 && !t->pair->src_eof) {
205                         t->strm.next_in = in_buf;
206                         t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
207
208                         if (t->strm.avail_in == SIZE_MAX)
209                                 break;
210
211                         if (t->pair->src_eof)
212                                 action = LZMA_FINISH;
213                 }
214
215                 const lzma_ret ret = lzma_code(&t->strm, action);
216
217                 if ((t->strm.avail_out == 0 || ret != LZMA_OK)
218                                 && opt_mode != MODE_TEST) {
219                         if (io_write(t->pair, out_buf,
220                                         BUFSIZ - t->strm.avail_out))
221                                 break;
222
223                         t->strm.next_out = out_buf;
224                         t->strm.avail_out = BUFSIZ;
225                 }
226
227                 if (ret != LZMA_OK) {
228                         if (ret == LZMA_STREAM_END) {
229                                 // FIXME !!! This doesn't work when decoding
230                                 // LZMA_Alone files, because LZMA_Alone decoder
231                                 // doesn't wait for LZMA_FINISH.
232                                 assert(t->pair->src_eof);
233                                 success = true;
234                         } else {
235                                 errmsg(V_ERROR, "%s: %s", t->pair->src_name,
236                                                 str_strm_error(ret));
237                         }
238
239                         break;
240                 }
241         }
242
243         io_close(t->pair, success);
244         release_thread_data(t);
245
246         return NULL;
247 }
248
249
250 ///////////////////////////////
251 // Multiple threads per file //
252 ///////////////////////////////
253
254 // TODO
255
256 // I'm not sure what would the best way to implement this. Here's one
257 // possible way:
258 //  - Reader thread would read the input data and control the coders threads.
259 //  - Every coder thread is associated with input and output buffer pools.
260 //    The input buffer pool is filled by reader thread, and the output buffer
261 //    pool is emptied by the writer thread.
262 //  - Writer thread writes the output data of the oldest living coder thread.
263 //
264 // The per-file thread started by the application's main thread is used as
265 // the reader thread. In the beginning, it starts the writer thread and the
266 // first coder thread. The coder thread would be left waiting for input from
267 // the reader thread, and the writer thread would be waiting for input from
268 // the coder thread.
269 //
270 // The reader thread reads the input data into a ring buffer, whose size
271 // depends on the value returned by lzma_chunk_size(). If the ring buffer
272 // gets full, the buffer is marked "to be finished", which indicates to
273 // the coder thread that no more input is coming. Then a new coder thread
274 // would be started.
275 //
276 // TODO
277
278 /*
279 typedef struct {
280         /// Buffers
281         uint8_t (*buffers)[BUFSIZ];
282
283         /// Number of buffers
284         size_t buffer_count;
285
286         /// buffers[read_pos] is the buffer currently being read. Once finish
287         /// is true and read_pos == write_pos, end of input has been reached.
288         size_t read_pos;
289
290         /// buffers[write_pos] is the buffer into which data is currently
291         /// being written.
292         size_t write_pos;
293
294         /// This variable matters only when read_pos == write_pos && finish.
295         /// In that case, this variable will contain the size of the
296         /// buffers[read_pos].
297         size_t last_size;
298
299         /// True once no more data is being written to the buffer. When this
300         /// is set, the last_size variable must have been set too.
301         bool finish;
302
303         /// Mutex to protect access to the variables in this structure
304         pthread_mutex_t mutex;
305
306         /// Condition to indicate when another thread can continue
307         pthread_cond_t cond;
308 } mem_pool;
309
310
311 static foo
312 multi_reader(thread_data *t)
313 {
314         bool done = false;
315
316         do {
317                 const size_t size = io_read(t->pair,
318                                 m->buffers + m->write_pos, BUFSIZ);
319                 if (size == SIZE_MAX) {
320                         // TODO
321                 } else if (t->pair->src_eof) {
322                         m->last_size = size;
323                 }
324
325                 pthread_mutex_lock(&m->mutex);
326
327                 if (++m->write_pos == m->buffer_count)
328                         m->write_pos = 0;
329
330                 if (m->write_pos == m->read_pos || t->pair->src_eof)
331                         m->finish = true;
332
333                 pthread_cond_signal(&m->cond);
334                 pthread_mutex_unlock(&m->mutex);
335
336         } while (!m->finish);
337
338         return done ? 0 : -1;
339 }
340
341
342 static foo
343 multi_code()
344 {
345         lzma_action = LZMA_RUN;
346
347         while (true) {
348                 pthread_mutex_lock(&m->mutex);
349
350                 while (m->read_pos == m->write_pos && !m->finish)
351                         pthread_cond_wait(&m->cond, &m->mutex);
352
353                 pthread_mutex_unlock(&m->mutex);
354
355                 if (m->finish) {
356                         t->strm.avail_in = m->last_size;
357                         if (opt_mode == MODE_COMPRESS)
358                                 action = LZMA_FINISH;
359                 } else {
360                         t->strm.avail_in = BUFSIZ;
361                 }
362
363                 t->strm.next_in = m->buffers + m->read_pos;
364
365                 const lzma_ret ret = lzma_code(&t->strm, action);
366
367         }
368 }
369
370 */
371
372
373 ///////////////////////
374 // Starting new file //
375 ///////////////////////
376
377 extern void
378 process_file(const char *filename)
379 {
380         thread_data *t = get_thread_data();
381         if (t == NULL)
382                 return; // User abort
383
384         // If this fails, it shows appropriate error messages too.
385         t->pair = io_open(filename);
386         if (t->pair == NULL) {
387                 release_thread_data(t);
388                 return;
389         }
390
391         // TODO Currently only one-thread-per-file mode is implemented.
392
393         if (create_thread(&single, t)) {
394                 io_close(t->pair, false);
395                 release_thread_data(t);
396         }
397
398         return;
399 }