Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix bug in --list operation due to concurrent i/o on input.
This also cleans up the termination of the read thread for those
cases where the entire input was not read, but the read thread is
no longer needed.
  • Loading branch information
madler committed Dec 25, 2017
1 parent 06e899b commit b88a0e9
Showing 1 changed file with 60 additions and 23 deletions.
83 changes: 60 additions & 23 deletions pigz.c
Expand Up @@ -2472,7 +2472,8 @@ local void single_compress(int reset)
/* --- decompression --- */

#ifndef NOTHREAD
/* parallel read thread */
/* parallel read thread -- if the state is 1, then read a buffer and set the
state to 0 when done, if the state is > 1, then end this thread */
local void load_read(void *dummy)
{
size_t len;
Expand All @@ -2484,7 +2485,11 @@ local void load_read(void *dummy)
try {
do {
possess(g.load_state);
wait_for(g.load_state, TO_BE, 1);
wait_for(g.load_state, NOT_TO_BE, 0);
if (peek_lock(g.load_state) > 1) {
release(g.load_state);
break;
}
g.in_len = len = readn(g.ind, g.in_which ? g.in_buf : g.in_buf2,
BUF);
Trace(("-- decompress read thread read %lu bytes", len));
Expand All @@ -2496,6 +2501,17 @@ local void load_read(void *dummy)
}
Trace(("-- exited decompress read thread"));
}

/* wait for load_read() to complete the current read operation -- if the
load_read() thread is not active, then return immediately */
local void load_wait(void)
{
if (g.in_which == -1)
return;
possess(g.load_state);
wait_for(g.load_state, TO_BE, 0);
release(g.load_state);
}
#endif

/* load() is called when the input has been consumed in order to provide more
Expand Down Expand Up @@ -2524,9 +2540,7 @@ local size_t load(void)
}

/* wait for the previously requested read to complete */
possess(g.load_state);
wait_for(g.load_state, TO_BE, 0);
release(g.load_state);
load_wait();

/* set up input buffer with the data just read */
g.in_next = g.in_which ? g.in_buf : g.in_buf2;
Expand Down Expand Up @@ -2568,6 +2582,33 @@ local size_t load(void)
return g.in_left;
}

/* terminate the load() operation -- empty buffer, mark end, close file (if not
stdin), and free the name obtained from the header, if any */
local void load_end(void)
{
#ifndef NOTHREAD
/* if the read thread is running, then end it */
if (g.in_which != -1) {
/* wait for the previously requested read to complete and send the thread a
message to exit */
possess(g.load_state);
wait_for(g.load_state, TO_BE, 0);
twist(g.load_state, TO, 2);

/* join the thread (which has exited or will very shortly) and clean up */
join(g.load_thread);
free_lock(g.load_state);
g.in_which = -1;
}
#endif
g.in_left = 0;
g.in_short = 1;
g.in_eof = 1;
if (g.ind != 0)
close(g.ind);
RELEASE(g.hname);
}

/* initialize for reading new input */
local void in_init(void)
{
Expand Down Expand Up @@ -2983,13 +3024,18 @@ local void list_info(void)
/* read header information and position input after header */
method = get_header(1);
if (method < 0) {
RELEASE(g.hname);
if (method != -1 && g.verbosity > 1)
complain(method != -6 ? "skipping: %s not compressed" :
"skipping: %s corrupted: invalid header crc", g.inf);
return;
}

#ifndef NOTHREAD
/* wait for read thread to complete current read() operation, to permit
seeking and reading on g.ind here in the main thread */
load_wait();
#endif

/* list zip file */
if (g.form > 1) {
g.in_tot = g.zip_clen;
Expand Down Expand Up @@ -3080,7 +3126,6 @@ local void list_info(void)

/* list information about contents */
show_info(method, check, len, 0);
RELEASE(g.hname);
}

/* --- copy input to output (when acting like cat) --- */
Expand Down Expand Up @@ -3771,17 +3816,14 @@ local void process(char *path)
SET_BINARY_MODE(g.ind);

/* if decoding or testing, try to read gzip header */
RELEASE(g.hname);
if (g.decode) {
in_init();
method = get_header(1);
if (method != 8 && method != 257 &&
/* gzip -cdf acts like cat on uncompressed input */
!(method == -2 && g.force && g.pipeout && g.decode != 2 &&
!g.list)) {
RELEASE(g.hname);
if (g.ind != 0)
close(g.ind);
load_end();
if (method != -1)
complain(method < 0 ?
method != -6 ? "skipping: %s is not compressed" :
Expand Down Expand Up @@ -3810,19 +3852,15 @@ local void process(char *path)
drop(err);
outb(&g, NULL, 0);
}
RELEASE(g.hname);
if (g.ind != 0)
close(g.ind);
load_end();
return;
}
}

/* if requested, just list information about input file */
if (g.list) {
list_info();
RELEASE(g.hname);
if (g.ind != 0)
close(g.ind);
load_end();
return;
}

Expand Down Expand Up @@ -3886,9 +3924,7 @@ local void process(char *path)
if (g.outd < 0 && errno == EEXIST) {
complain("skipping: %s exists", g.outf);
RELEASE(g.outf);
RELEASE(g.hname);
if (g.ind != 0)
close(g.ind);
load_end();
return;
}

Expand All @@ -3897,7 +3933,6 @@ local void process(char *path)
throw(errno, "write error on %s (%s)", g.outf, strerror(errno));
}
SET_BINARY_MODE(g.outd);
RELEASE(g.hname);

/* process ind to outd */
if (g.verbosity > 1)
Expand Down Expand Up @@ -3937,8 +3972,7 @@ local void process(char *path)
}

/* finish up, copy attributes, set times, delete original */
if (g.ind != 0)
close(g.ind);
load_end();
if (g.outd != -1 && g.outd != 1) {
if (close(g.outd))
throw(errno, "write error on %s (%s)", g.outf, strerror(errno));
Expand Down Expand Up @@ -4314,6 +4348,9 @@ int main(int argc, char **argv)
/* initialize globals */
g.inf = NULL;
g.inz = 0;
#ifndef NOTHREAD
g.in_which = -1;
#endif
g.outf = NULL;
g.first = 1;
g.hname = NULL;
Expand Down

0 comments on commit b88a0e9

Please sign in to comment.