From b88a0e9b60addda2a147ef4954499b7d95811f06 Mon Sep 17 00:00:00 2001 From: Mark Adler Date: Sun, 18 Dec 2016 10:21:14 -0800 Subject: [PATCH] Fix bug in --list operation due to concurrent i/o on input. This also cleans up the termination of the read thread for those cases where the entire input was not read, but the read thread is no longer needed. --- pigz.c | 83 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/pigz.c b/pigz.c index 1aa61d3..a0286c5 100644 --- a/pigz.c +++ b/pigz.c @@ -2472,7 +2472,8 @@ local void single_compress(int reset) /* --- decompression --- */ #ifndef NOTHREAD -/* parallel read thread */ +/* parallel read thread -- if the state is 1, then read a buffer and set the + state to 0 when done, if the state is > 1, then end this thread */ local void load_read(void *dummy) { size_t len; @@ -2484,7 +2485,11 @@ local void load_read(void *dummy) try { do { possess(g.load_state); - wait_for(g.load_state, TO_BE, 1); + wait_for(g.load_state, NOT_TO_BE, 0); + if (peek_lock(g.load_state) > 1) { + release(g.load_state); + break; + } g.in_len = len = readn(g.ind, g.in_which ? g.in_buf : g.in_buf2, BUF); Trace(("-- decompress read thread read %lu bytes", len)); @@ -2496,6 +2501,17 @@ local void load_read(void *dummy) } Trace(("-- exited decompress read thread")); } + +/* wait for load_read() to complete the current read operation -- if the + load_read() thread is not active, then return immediately */ +local void load_wait(void) +{ + if (g.in_which == -1) + return; + possess(g.load_state); + wait_for(g.load_state, TO_BE, 0); + release(g.load_state); +} #endif /* load() is called when the input has been consumed in order to provide more @@ -2524,9 +2540,7 @@ local size_t load(void) } /* wait for the previously requested read to complete */ - possess(g.load_state); - wait_for(g.load_state, TO_BE, 0); - release(g.load_state); + load_wait(); /* set up input buffer with the data just read */ g.in_next = g.in_which ? g.in_buf : g.in_buf2; @@ -2568,6 +2582,33 @@ local size_t load(void) return g.in_left; } +/* terminate the load() operation -- empty buffer, mark end, close file (if not + stdin), and free the name obtained from the header, if any */ +local void load_end(void) +{ +#ifndef NOTHREAD + /* if the read thread is running, then end it */ + if (g.in_which != -1) { + /* wait for the previously requested read to complete and send the thread a + message to exit */ + possess(g.load_state); + wait_for(g.load_state, TO_BE, 0); + twist(g.load_state, TO, 2); + + /* join the thread (which has exited or will very shortly) and clean up */ + join(g.load_thread); + free_lock(g.load_state); + g.in_which = -1; + } +#endif + g.in_left = 0; + g.in_short = 1; + g.in_eof = 1; + if (g.ind != 0) + close(g.ind); + RELEASE(g.hname); +} + /* initialize for reading new input */ local void in_init(void) { @@ -2983,13 +3024,18 @@ local void list_info(void) /* read header information and position input after header */ method = get_header(1); if (method < 0) { - RELEASE(g.hname); if (method != -1 && g.verbosity > 1) complain(method != -6 ? "skipping: %s not compressed" : "skipping: %s corrupted: invalid header crc", g.inf); return; } +#ifndef NOTHREAD + /* wait for read thread to complete current read() operation, to permit + seeking and reading on g.ind here in the main thread */ + load_wait(); +#endif + /* list zip file */ if (g.form > 1) { g.in_tot = g.zip_clen; @@ -3080,7 +3126,6 @@ local void list_info(void) /* list information about contents */ show_info(method, check, len, 0); - RELEASE(g.hname); } /* --- copy input to output (when acting like cat) --- */ @@ -3771,7 +3816,6 @@ local void process(char *path) SET_BINARY_MODE(g.ind); /* if decoding or testing, try to read gzip header */ - RELEASE(g.hname); if (g.decode) { in_init(); method = get_header(1); @@ -3779,9 +3823,7 @@ local void process(char *path) /* gzip -cdf acts like cat on uncompressed input */ !(method == -2 && g.force && g.pipeout && g.decode != 2 && !g.list)) { - RELEASE(g.hname); - if (g.ind != 0) - close(g.ind); + load_end(); if (method != -1) complain(method < 0 ? method != -6 ? "skipping: %s is not compressed" : @@ -3810,9 +3852,7 @@ local void process(char *path) drop(err); outb(&g, NULL, 0); } - RELEASE(g.hname); - if (g.ind != 0) - close(g.ind); + load_end(); return; } } @@ -3820,9 +3860,7 @@ local void process(char *path) /* if requested, just list information about input file */ if (g.list) { list_info(); - RELEASE(g.hname); - if (g.ind != 0) - close(g.ind); + load_end(); return; } @@ -3886,9 +3924,7 @@ local void process(char *path) if (g.outd < 0 && errno == EEXIST) { complain("skipping: %s exists", g.outf); RELEASE(g.outf); - RELEASE(g.hname); - if (g.ind != 0) - close(g.ind); + load_end(); return; } @@ -3897,7 +3933,6 @@ local void process(char *path) throw(errno, "write error on %s (%s)", g.outf, strerror(errno)); } SET_BINARY_MODE(g.outd); - RELEASE(g.hname); /* process ind to outd */ if (g.verbosity > 1) @@ -3937,8 +3972,7 @@ local void process(char *path) } /* finish up, copy attributes, set times, delete original */ - if (g.ind != 0) - close(g.ind); + load_end(); if (g.outd != -1 && g.outd != 1) { if (close(g.outd)) throw(errno, "write error on %s (%s)", g.outf, strerror(errno)); @@ -4314,6 +4348,9 @@ int main(int argc, char **argv) /* initialize globals */ g.inf = NULL; g.inz = 0; +#ifndef NOTHREAD + g.in_which = -1; +#endif g.outf = NULL; g.first = 1; g.hname = NULL;