Commit 1fc7241e authored by Mark Adler's avatar Mark Adler

Do not abort on inflate data error -- continue to process files.

This introduces try/catch/throw for error handling throughout pigz.
Each thread has its own try stack for error handling.  If a throw
makes it to the top try in a thread, then the entire program is
aborted.  Data errors caught during decoding and decompression
permit the process to continue with subsequent files.
parent 7ff80327
......@@ -4,14 +4,16 @@ LDFLAGS=-lz
ZOPFLI=zopfli/src/zopfli/
# use gcc and gmake on Solaris
pigz: pigz.o yarn.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
pigz: pigz.o yarn.o try.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
$(CC) $(LDFLAGS) -o pigz $^ -lpthread -lm
ln -f pigz unpigz
pigz.o: pigz.c yarn.h ${ZOPFLI}deflate.h ${ZOPFLI}util.h
pigz.o: pigz.c yarn.h try.h ${ZOPFLI}deflate.h ${ZOPFLI}util.h
yarn.o: yarn.c yarn.h
try.o: try.c try.h
${ZOPFLI}deflate.o: ${ZOPFLI}deflate.c ${ZOPFLI}deflate.h ${ZOPFLI}blocksplitter.h ${ZOPFLI}lz77.h ${ZOPFLI}squeeze.h ${ZOPFLI}tree.h ${ZOPFLI}zopfli.h ${ZOPFLI}cache.h ${ZOPFLI}hash.h ${ZOPFLI}util.h
${ZOPFLI}blocksplitter.o: ${ZOPFLI}blocksplitter.c ${ZOPFLI}blocksplitter.h ${ZOPFLI}deflate.h ${ZOPFLI}lz77.h ${ZOPFLI}squeeze.h ${ZOPFLI}tree.h ${ZOPFLI}util.h ${ZOPFLI}zopfli.h ${ZOPFLI}cache.h ${ZOPFLI}hash.h
......@@ -32,21 +34,24 @@ ${ZOPFLI}katajainen.o: ${ZOPFLI}katajainen.c ${ZOPFLI}katajainen.h
dev: pigz pigzt pigzn
pigzt: pigzt.o yarnt.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
pigzt: pigzt.o yarnt.o try.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
$(CC) $(LDFLAGS) -o pigzt $^ -lpthread -lm
pigzt.o: pigz.c yarn.h
pigzt.o: pigz.c yarn.h try.h
$(CC) $(CFLAGS) -DDEBUG -g -c -o pigzt.o pigz.c
yarnt.o: yarn.c yarn.h
$(CC) $(CFLAGS) -DDEBUG -g -c -o yarnt.o yarn.c
pigzn: pigzn.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
pigzn: pigzn.o tryn.o ${ZOPFLI}deflate.o ${ZOPFLI}blocksplitter.o ${ZOPFLI}tree.o ${ZOPFLI}lz77.o ${ZOPFLI}cache.o ${ZOPFLI}hash.o ${ZOPFLI}util.o ${ZOPFLI}squeeze.o ${ZOPFLI}katajainen.o
$(CC) $(LDFLAGS) -o pigzn $^ -lm
pigzn.o: pigz.c
pigzn.o: pigz.c try.h
$(CC) $(CFLAGS) -DDEBUG -DNOTHREAD -g -c -o pigzn.o pigz.c
tryn.o: try.c try.h
$(CC) $(CFLAGS) -DDEBUG -DNOTHREAD -g -c -o tryn.o try.c
test: pigz
./pigz -kf pigz.c ; ./pigz -t pigz.c.gz
./pigz -kfb 32 pigz.c ; ./pigz -t pigz.c.gz
......
......@@ -308,7 +308,8 @@
/* atoi(), getenv() */
#include <stdarg.h> /* va_start(), va_end(), va_list */
#include <string.h> /* memset(), memchr(), memcpy(), strcmp(), strcpy() */
/* strncpy(), strlen(), strcat(), strrchr() */
/* strncpy(), strlen(), strcat(), strrchr(),
strerror() */
#include <errno.h> /* errno, EEXIST */
#include <assert.h> /* assert() */
#include <time.h> /* ctime(), time(), time_t, mktime() */
......@@ -367,6 +368,8 @@
ZopfliInitOptions(),
ZopfliOptions */
#include "try.h" /* try, catch, always, throw, drop, punt, ball_t */
/* for local functions and globals */
#define local static
......@@ -524,16 +527,6 @@ local int complain(char *fmt, ...)
return 0;
}
/* exit with error, delete output file if in the middle of writing it */
local int bail(char *why, char *what)
{
if (g.outd != -1 && g.outf != NULL)
unlink(g.outf);
complain("abort: %s%s", why, what);
exit(1);
return 0;
}
#ifdef DEBUG
/* memory tracking */
......@@ -635,6 +628,30 @@ local void zlib_free(voidpf opaque, voidpf address)
#define ZALLOC zlib_alloc
#define ZFREE zlib_free
#else /* !DEBUG */
#define MALLOC malloc
#define REALLOC realloc
#define FREE free
#define OPAQUE Z_NULL
#define ZALLOC Z_NULL
#define ZFREE Z_NULL
#endif
/* assured memory allocation */
local void *alloc(void *ptr, size_t size)
{
ptr = REALLOC(ptr, size);
if (ptr == NULL)
throw(ENOMEM, "not enough memory");
return ptr;
}
#if DEBUG
/* logging */
/* starting time of day for tracing */
local struct timeval start;
......@@ -677,18 +694,12 @@ local void log_add(char *fmt, ...)
char msg[MAXMSG];
gettimeofday(&now, NULL);
me = MALLOC(sizeof(struct log));
if (me == NULL)
bail("not enough memory", "");
me = alloc(NULL, sizeof(struct log));
me->when = now;
va_start(ap, fmt);
vsnprintf(msg, MAXMSG, fmt, ap);
va_end(ap);
me->msg = MALLOC(strlen(msg) + 1);
if (me->msg == NULL) {
FREE(me);
bail("not enough memory", "");
}
me->msg = alloc(NULL, strlen(msg) + 1);
strcpy(me->msg, msg);
me->next = NULL;
#ifndef NOTHREAD
......@@ -790,18 +801,31 @@ local void log_dump(void)
#else /* !DEBUG */
#define MALLOC malloc
#define REALLOC realloc
#define FREE free
#define OPAQUE Z_NULL
#define ZALLOC Z_NULL
#define ZFREE Z_NULL
#define log_dump()
#define Trace(x)
#endif
/* abort or catch termination signal */
local void cut_short(int sig)
{
if (sig == SIGINT)
Trace(("termination by user"));
if (g.outd != -1 && g.outf != NULL)
unlink(g.outf);
RELEASE(g.outf);
log_dump();
_exit(sig < 0 ? -sig : ECANCELED);
}
/* common code for catch block of top routine in the thread */
#define THREADABORT(ball) \
do { \
complain("abort: %s", (ball).why); \
drop(ball); \
cut_short(-(ball).code); \
} while (0)
/* compute next size up by multiplying by about 2**(1/3) and rounding to the
next power of 2 if close (three applications results in doubling) -- if
small, go up to at least 16, if overflow, go to max size_t value */
......@@ -830,23 +854,17 @@ local inline size_t vmemcpy(char **mem, size_t *size, size_t off,
void *cpy, size_t len)
{
size_t need;
char *bigger;
need = off + len;
if (need < off)
bail("overflow", "");
throw(EOVERFLOW, "overflow");
if (need > *size) {
need = grow(need);
if (off)
bigger = REALLOC(*mem, need);
else {
if (off == 0) {
RELEASE(*mem);
*size = 0;
bigger = MALLOC(need);
}
if (bigger == NULL)
bail("not enough memory", "");
*mem = bigger;
*mem = alloc(*mem, need);
*size = need;
}
memcpy(*mem + off, cpy, len);
......@@ -871,7 +889,7 @@ local size_t readn(int desc, unsigned char *buf, size_t len)
while (len) {
ret = read(desc, buf, len);
if (ret < 0)
bail("read error on ", g.inf);
throw(errno, "read error on %s (%s)", g.inf, strerror(errno));
if (ret == 0)
break;
buf += ret;
......@@ -888,10 +906,8 @@ local void writen(int desc, unsigned char *buf, size_t len)
while (len) {
ret = write(desc, buf, len);
if (ret < 1) {
complain("write error code %d", errno);
bail("write error on ", g.outf);
}
if (ret < 1)
throw(errno, "write error on %s (%s)", g.outf, strerror(errno));
buf += ret;
len -= ret;
}
......@@ -1246,13 +1262,9 @@ local struct space *get_space(struct pool *pool)
pool->limit--;
pool->made++;
release(pool->have);
space = MALLOC(sizeof(struct space));
if (space == NULL)
bail("not enough memory", "");
space = alloc(NULL, sizeof(struct space));
space->use = new_lock(1); /* initially one user */
space->buf = MALLOC(pool->size);
if (space->buf == NULL)
bail("not enough memory", "");
space->buf = alloc(NULL, pool->size);
space->size = pool->size;
space->len = 0;
space->pool = pool; /* remember the pool this belongs to */
......@@ -1267,12 +1279,10 @@ local void grow_space(struct space *space)
/* compute next size up */
more = grow(space->size);
if (more == space->size)
bail("not enough memory", "");
throw(EOVERFLOW, "overflow");
/* reallocate the buffer */
space->buf = REALLOC(space->buf, more);
if (space->buf == NULL)
bail("not enough memory", "");
space->buf = alloc(space->buf, more);
space->size = more;
}
......@@ -1290,6 +1300,8 @@ local void drop_space(struct space *space)
int use;
struct pool *pool;
if (space == NULL)
return;
possess(space->use);
use = peek_lock(space->use);
assert(use != 0);
......@@ -1462,214 +1474,223 @@ local void compress_thread(void *dummy)
int bits; /* deflate pending bits */
#endif
struct space *temp = NULL; /* temporary space for zopfli input */
int ret; /* zlib return code */
z_stream strm; /* deflate stream */
ball_t err; /* error information from throw() */
(void)dummy;
/* initialize the deflate stream for this thread */
strm.zfree = ZFREE;
strm.zalloc = ZALLOC;
strm.opaque = OPAQUE;
if (deflateInit2(&strm, 6, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK)
bail("not enough memory", "");
/* keep looking for work */
for (;;) {
/* get a job (like I tell my son) */
possess(compress_have);
wait_for(compress_have, NOT_TO_BE, 0);
job = compress_head;
assert(job != NULL);
if (job->seq == -1)
break;
compress_head = job->next;
if (job->next == NULL)
compress_tail = &compress_head;
twist(compress_have, BY, -1);
/* got a job -- initialize and set the compression level (note that if
deflateParams() is called immediately after deflateReset(), there is
no need to initialize the input/output for the stream) */
Trace(("-- compressing #%ld", job->seq));
if (g.level <= 9) {
(void)deflateReset(&strm);
(void)deflateParams(&strm, g.level, Z_DEFAULT_STRATEGY);
}
else {
temp = get_space(&out_pool);
temp->len = 0;
}
/* set dictionary if provided, release that input or dictionary buffer
(not NULL if g.setdict is true and if this is not the first work
unit) */
if (job->out != NULL) {
len = job->out->len;
left = len < DICT ? len : DICT;
if (g.level <= 9)
deflateSetDictionary(&strm, job->out->buf + (len - left),
left);
try {
/* initialize the deflate stream for this thread */
strm.zfree = ZFREE;
strm.zalloc = ZALLOC;
strm.opaque = OPAQUE;
ret = deflateInit2(&strm, 6, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
if (ret == Z_MEM_ERROR)
throw(ENOMEM, "not enough memory");
if (ret != Z_OK)
throw(EINVAL, "internal error");
/* keep looking for work */
for (;;) {
/* get a job (like I tell my son) */
possess(compress_have);
wait_for(compress_have, NOT_TO_BE, 0);
job = compress_head;
assert(job != NULL);
if (job->seq == -1)
break;
compress_head = job->next;
if (job->next == NULL)
compress_tail = &compress_head;
twist(compress_have, BY, -1);
/* got a job -- initialize and set the compression level (note that
if deflateParams() is called immediately after deflateReset(),
there is no need to initialize input/output for the stream) */
Trace(("-- compressing #%ld", job->seq));
if (g.level <= 9) {
(void)deflateReset(&strm);
(void)deflateParams(&strm, g.level, Z_DEFAULT_STRATEGY);
}
else {
memcpy(temp->buf, job->out->buf + (len - left), left);
temp->len = left;
if (temp == NULL)
temp = get_space(&out_pool);
temp->len = 0;
}
drop_space(job->out);
}
/* set up input and output */
job->out = get_space(&out_pool);
if (g.level <= 9) {
strm.next_in = job->in->buf;
strm.next_out = job->out->buf;
}
else
memcpy(temp->buf + temp->len, job->in->buf, job->in->len);
/* compress each block, either flushing or finishing */
next = job->lens == NULL ? NULL : job->lens->buf;
left = job->in->len;
job->out->len = 0;
do {
/* decode next block length from blocks list */
len = next == NULL ? 128 : *next++;
if (len < 128) /* 64..32831 */
len = (len << 8) + (*next++) + 64;
else if (len == 128) /* end of list */
len = left;
else if (len < 192) /* 1..63 */
len &= 0x3f;
else if (len < 224){ /* 32832..2129983 */
len = ((len & 0x1f) << 16) + (*next++ << 8);
len += *next++ + 32832U;
}
else { /* 2129984..539000895 */
len = ((len & 0x1f) << 24) + (*next++ << 16);
len += *next++ << 8;
len += *next++ + 2129984UL;
/* set dictionary if provided, release that input or dictionary
buffer (not NULL if g.setdict is true and if this is not the
first work unit) */
if (job->out != NULL) {
len = job->out->len;
left = len < DICT ? len : DICT;
if (g.level <= 9)
deflateSetDictionary(&strm, job->out->buf + (len - left),
left);
else {
memcpy(temp->buf, job->out->buf + (len - left), left);
temp->len = left;
}
drop_space(job->out);
}
left -= len;
/* set up input and output */
job->out = get_space(&out_pool);
if (g.level <= 9) {
/* run MAXP2-sized amounts of input through deflate -- this
loop is needed for those cases where the unsigned type is
smaller than the size_t type, or when len is close to the
limit of the size_t type */
while (len > MAXP2) {
strm.avail_in = MAXP2;
deflate_engine(&strm, job->out, Z_NO_FLUSH);
len -= MAXP2;
strm.next_in = job->in->buf;
strm.next_out = job->out->buf;
}
else
memcpy(temp->buf + temp->len, job->in->buf, job->in->len);
/* compress each block, either flushing or finishing */
next = job->lens == NULL ? NULL : job->lens->buf;
left = job->in->len;
job->out->len = 0;
do {
/* decode next block length from blocks list */
len = next == NULL ? 128 : *next++;
if (len < 128) /* 64..32831 */
len = (len << 8) + (*next++) + 64;
else if (len == 128) /* end of list */
len = left;
else if (len < 192) /* 1..63 */
len &= 0x3f;
else if (len < 224){ /* 32832..2129983 */
len = ((len & 0x1f) << 16) + (*next++ << 8);
len += *next++ + 32832U;
}
else { /* 2129984..539000895 */
len = ((len & 0x1f) << 24) + (*next++ << 16);
len += *next++ << 8;
len += *next++ + 2129984UL;
}
left -= len;
if (g.level <= 9) {
/* run MAXP2-sized amounts of input through deflate -- this
loop is needed for those cases where the unsigned type
is smaller than the size_t type, or when len is close to
the limit of the size_t type */
while (len > MAXP2) {
strm.avail_in = MAXP2;
deflate_engine(&strm, job->out, Z_NO_FLUSH);
len -= MAXP2;
}
/* run the last piece through deflate -- end on a byte
boundary, using a sync marker if necessary, or finish the
deflate stream if this is the last block */
strm.avail_in = (unsigned)len;
if (left || job->more) {
/* run the last piece through deflate -- end on a byte
boundary, using a sync marker if necessary, or finish
the deflate stream if this is the last block */
strm.avail_in = (unsigned)len;
if (left || job->more) {
#if ZLIB_VERNUM >= 0x1260
deflate_engine(&strm, job->out, Z_BLOCK);
/* add enough empty blocks to get to a byte boundary */
(void)deflatePending(&strm, Z_NULL, &bits);
if (bits & 1)
deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
else if (bits & 7) {
do { /* add static empty blocks */
bits = deflatePrime(&strm, 10, 2);
assert(bits == Z_OK);
(void)deflatePending(&strm, Z_NULL, &bits);
} while (bits & 7);
deflate_engine(&strm, job->out, Z_BLOCK);
}
/* add enough empty blocks to get to a byte boundary */
(void)deflatePending(&strm, Z_NULL, &bits);
if (bits & 1)
deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
else if (bits & 7) {
do { /* add static empty blocks */
bits = deflatePrime(&strm, 10, 2);
assert(bits == Z_OK);
(void)deflatePending(&strm, Z_NULL, &bits);
} while (bits & 7);
deflate_engine(&strm, job->out, Z_BLOCK);
}
#else
deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
#endif
}
else
deflate_engine(&strm, job->out, Z_FINISH);
}
else
deflate_engine(&strm, job->out, Z_FINISH);
}
else {
/* compress len bytes using zopfli, bring to byte boundary */
unsigned char bits, *out;
size_t outsize;
out = NULL;
outsize = 0;
bits = 0;
ZopfliDeflatePart(&g.zopts, 2, !(left || job->more),
temp->buf, temp->len, temp->len + len,
&bits, &out, &outsize);
assert(job->out->len + outsize + 5 <= job->out->size);
memcpy(job->out->buf + job->out->len, out, outsize);
free(out);
job->out->len += outsize;
if (left || job->more) {
bits &= 7;
if (bits & 1) {
if (bits == 7)
else {
/* compress len bytes using zopfli, end at byte boundary */
unsigned char bits, *out;
size_t outsize;
out = NULL;
outsize = 0;
bits = 0;
ZopfliDeflatePart(&g.zopts, 2, !(left || job->more),
temp->buf, temp->len, temp->len + len,
&bits, &out, &outsize);
assert(job->out->len + outsize + 5 <= job->out->size);
memcpy(job->out->buf + job->out->len, out, outsize);
free(out);
job->out->len += outsize;
if (left || job->more) {
bits &= 7;
if (bits & 1) {
if (bits == 7)
job->out->buf[job->out->len++] = 0;
job->out->buf[job->out->len++] = 0;
job->out->buf[job->out->len++] = 0;
job->out->buf[job->out->len++] = 0;
job->out->buf[job->out->len++] = 0xff;
job->out->buf[job->out->len++] = 0xff;
}
else if (bits) {
do {
job->out->buf[job->out->len - 1] += 2 << bits;
job->out->buf[job->out->len++] = 0;
bits += 2;
} while (bits < 8);
job->out->buf[job->out->len++] = 0xff;
job->out->buf[job->out->len++] = 0xff;
}
else if (bits) {
do {
job->out->buf[job->out->len - 1] += 2 << bits;
job->out->buf[job->out->len++] = 0;
bits += 2;
} while (bits < 8);
}
}
temp->len += len;
}
temp->len += len;
}
} while (left);
if (g.level > 9)
drop_space(temp);
if (job->lens != NULL) {
} while (left);
drop_space(job->lens);
job->lens = NULL;
}
Trace(("-- compressed #%ld%s", job->seq, job->more ? "" : " (last)"));
Trace(("-- compressed #%ld%s", job->seq,
job->more ? "" : " (last)"));
/* reserve input buffer until check value has been calculated */
use_space(job->in);
/* reserve input buffer until check value has been calculated */
use_space(job->in);
/* insert write job in list in sorted order, alert write thread */
possess(write_first);
prior = &write_head;
while ((here = *prior) != NULL) {
if (here->seq > job->seq)
break;
prior = &(here->next);
}
job->next = here;
*prior = job;
twist(write_first, TO, write_head->seq);
/* calculate the check value in parallel with writing, alert the write
thread that the calculation is complete, and drop this usage of the
input buffer */
len = job->in->len;
next = job->in->buf;
check = CHECK(0L, Z_NULL, 0);
while (len > MAXP2) {
check = CHECK(check, next, MAXP2);
len -= MAXP2;
next += MAXP2;
/* insert write job in list in sorted order, alert write thread */
possess(write_first);
prior = &write_head;
while ((here = *prior) != NULL) {
if (here->seq > job->seq)
break;
prior = &(here->next);
}
job->next = here;
*prior = job;
twist(write_first, TO, write_head->seq);
/* calculate the check value in parallel with writing, alert the
write thread that the calculation is complete, and drop this
usage of the input buffer */
len = job->in->len;
next = job->in->buf;
check = CHECK(0L, Z_NULL, 0);
while (len > MAXP2) {
check = CHECK(check, next, MAXP2);
len -= MAXP2;
next += MAXP2;
}
check = CHECK(check, next, (unsigned)len);
drop_space(job->in);
job->check = check;
Trace(("-- checked #%ld%s", job->seq, job->more ? "" : " (last)"));
possess(job->calc);
twist(job->calc, TO, 1);
/* done with that one -- go find another job */
}
check = CHECK(check, next, (unsigned)len);
drop_space(job->in);
job->check = check;
Trace(("-- checked #%ld%s", job->seq, job->more ? "" : " (last)"));
possess(job->calc);
twist(job->calc, TO, 1);
/* done with that one -- go find another job */
/* found job with seq == -1 -- return to join */
drop_space(temp);
release(compress_have);
(void)deflateEnd(&strm);
}
catch (err) {
THREADABORT(err);
}
/* found job with seq == -1 -- free deflate memory and return to join */
release(compress_have);
(void)deflateEnd(&strm);
}
/* collect the write jobs off of the list in sequence order and write out the
......@@ -1685,63 +1706,69 @@ local void write_thread(void *dummy)
unsigned long ulen; /* total uncompressed size (overflow ok) */
unsigned long clen; /* total compressed size (overflow ok) */
unsigned long check; /* check value of uncompressed data */
ball_t err; /* error information from throw() */
(void)dummy;
/* build and write header */
Trace(("-- write thread running"));
head = put_header();
try {
/* build and write header */
Trace(("-- write thread running"));
head = put_header();
/* process output of compress threads until end of input */
ulen = clen = 0;
check = CHECK(0L, Z_NULL, 0);
seq = 0;
do {
/* get next write job in order */
possess(write_first);
wait_for(write_first, TO_BE, seq);
job = write_head;
write_head = job->next;
twist(write_first, TO, write_head == NULL ? -1 : write_head->seq);
/* update lengths, save uncompressed length for COMB */
more = job->more;
len = job->in->len;
drop_space(job->in);
ulen += (unsigned long)len;
clen += (unsigned long)(job->out->len);
/* write the compressed data and drop the output buffer */
Trace(("-- writing #%ld", seq));
writen(g.outd, job->out->buf, job->out->len);
drop_space(job->out);
Trace(("-- wrote #%ld%s", seq, more ? "" : " (last)"));
/* wait for check calculation to complete, then combine, once
the compress thread is done with the input, release it */
possess(job->calc);
wait_for(job->calc, TO_BE, 1);
release(job->calc);
check = COMB(check, job->check, len);
/* free the job */
free_lock(job->calc);
FREE(job);
/* get the next buffer in sequence */
seq++;
} while (more);
/* process output of compress threads until end of input */
ulen = clen = 0;
check = CHECK(0L, Z_NULL, 0);
seq = 0;
do {
/* get next write job in order */
possess(write_first);
wait_for(write_first, TO_BE, seq);
job = write_head;
write_head = job->next;
twist(write_first, TO, write_head == NULL ? -1 : write_head->seq);
/* update lengths, save uncompressed length for COMB */
more = job->more;
len = job->in->len;
drop_space(job->in);
ulen += (unsigned long)len;
clen += (unsigned long)(job->out->len);
/* write the compressed data and drop the output buffer */
Trace(("-- writing #%ld", seq));