/
yarn.c
398 lines (348 loc) · 13.2 KB
1
/* yarn.c -- generic thread operations implemented using pthread functions
2
3
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
4
5
6
7
8
9
10
11
* For conditions of distribution and use, see copyright notice in yarn.h
*/
/* Basic thread operations implemented using the POSIX pthread library. All
pthread references are isolated within this module to allow alternate
implementations with other thread libraries. See yarn.h for the description
of these operations. */
12
13
14
15
/* Version history:
1.0 19 Oct 2008 First version
1.1 26 Oct 2008 No need to set the stack size -- remove
Add yarn_abort() function for clean-up on error exit
16
17
1.2 19 Dec 2011 (changes reversed in 1.3)
1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
18
Update thread portability #defines per IEEE 1003.1-2008
19
Fix documentation in yarn.h for yarn_prefix
20
21
1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr
Accept and do nothing for NULL argument to free_lock()
22
1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel()
23
Normalize the code style
24
25
1.6 3 Apr 2019 Add debugging information to fail() error messages
1.7 12 Apr 2020 Fix use after free bug in ignition()
26
27
*/
28
// For thread portability.
29
30
31
32
#define _XOPEN_SOURCE 700
#define _POSIX_C_SOURCE 200809L
#define _THREAD_SAFE
33
// Use large file functions if available.
34
#define _FILE_OFFSET_BITS 64
35
36
37
38
39
40
41
42
43
44
45
46
// External libraries and entities referenced.
#include <stdio.h> // fprintf(), stderr
#include <stdlib.h> // exit(), malloc(), free(), NULL
#include <pthread.h> // pthread_t, pthread_create(), pthread_join(),
// pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
// PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
// pthread_self(), pthread_equal(),
// pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
// pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
// pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
// pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy()
47
#include <errno.h> // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN
48
49
// Interface definition.
50
51
#include "yarn.h"
52
53
// Constants.
#define local static // for non-exported functions and globals
54
55
// Error handling external globals, resettable by application.
56
57
58
char *yarn_prefix = "yarn";
void (*yarn_abort)(int) = NULL;
59
60
// Immediately exit -- use for errors that shouldn't ever happen.
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
local void fail(int err, char const *file, long line, char const *func) {
fprintf(stderr, "%s: ", yarn_prefix);
switch (err) {
case EPERM:
fputs("already unlocked", stderr);
break;
case ESRCH:
fputs("no such thread", stderr);
break;
case EDEADLK:
fputs("resource deadlock", stderr);
break;
case ENOMEM:
fputs("out of memory", stderr);
break;
case EBUSY:
fputs("can't destroy locked resource", stderr);
break;
case EINVAL:
fputs("invalid request", stderr);
break;
case EAGAIN:
fputs("resource unavailable", stderr);
break;
default:
fprintf(stderr, "internal error %d", err);
}
88
fprintf(stderr, " (%s:%ld:%s)\n", file, line, func);
89
90
if (yarn_abort != NULL)
yarn_abort(err);
91
exit(err);
92
93
}
94
95
// Memory handling routines provided by user. If none are provided, malloc()
// and free() are used, which are therefore assumed to be thread-safe.
96
97
98
99
100
typedef void *(*malloc_t)(size_t);
typedef void (*free_t)(void *);
local malloc_t my_malloc_f = malloc;
local free_t my_free = free;
101
102
// Use user-supplied allocation routines instead of malloc() and free().
void yarn_mem(malloc_t lease, free_t vacate) {
103
104
105
106
my_malloc_f = lease;
my_free = vacate;
}
107
// Memory allocation that cannot fail (from the point of view of the caller).
108
local void *my_malloc(size_t size, char const *file, long line) {
109
110
111
void *block;
if ((block = my_malloc_f(size)) == NULL)
112
fail(ENOMEM, file, line, "malloc");
113
114
115
return block;
}
116
// -- Lock functions --
117
118
119
120
121
122
123
struct lock_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
long value;
};
124
125
126
127
128
129
130
131
lock *new_lock_(long initial, char const *file, long line) {
lock *bolt = my_malloc(sizeof(struct lock_s), file, line);
int ret = pthread_mutex_init(&(bolt->mutex), NULL);
if (ret)
fail(ret, file, line, "mutex_init");
ret = pthread_cond_init(&(bolt->cond), NULL);
if (ret)
fail(ret, file, line, "cond_init");
132
133
134
135
bolt->value = initial;
return bolt;
}
136
137
138
139
void possess_(lock *bolt, char const *file, long line) {
int ret = pthread_mutex_lock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_lock");
140
141
}
142
143
144
145
void release_(lock *bolt, char const *file, long line) {
int ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
146
147
}
148
149
void twist_(lock *bolt, enum twist_op op, long val,
char const *file, long line) {
150
151
152
153
if (op == TO)
bolt->value = val;
else if (op == BY)
bolt->value += val;
154
155
156
157
158
159
int ret = pthread_cond_broadcast(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_broadcast");
ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
160
161
162
163
}
#define until(a) while(!(a))
164
165
void wait_for_(lock *bolt, enum wait_op op, long val,
char const *file, long line) {
166
switch (op) {
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
case TO_BE:
until (bolt->value == val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case NOT_TO_BE:
until (bolt->value != val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_MORE_THAN:
until (bolt->value > val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_LESS_THAN:
until (bolt->value < val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
194
195
196
}
}
197
long peek_lock(lock *bolt) {
198
199
200
return bolt->value;
}
201
void free_lock_(lock *bolt, char const *file, long line) {
202
203
if (bolt == NULL)
return;
204
205
206
207
208
209
int ret = pthread_cond_destroy(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_destroy");
ret = pthread_mutex_destroy(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_destroy");
210
211
212
my_free(bolt);
}
213
// -- Thread functions (uses the lock functions above) --
214
215
216
struct thread_s {
pthread_t id;
217
218
int done; // true if this thread has exited
thread *next; // for list of all launched threads
219
220
};
221
222
// List of threads launched but not joined, count of threads exited but not
// joined (incremented by ignition() just before exiting).
223
224
225
local lock threads_lock = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER,
226
0 // number of threads exited but not joined
227
};
228
local thread *threads = NULL; // list of extant threads
229
230
// Structure in which to pass the probe and its payload to ignition().
231
232
233
struct capsule {
void (*probe)(void *);
void *payload;
234
235
char const *file;
long line;
236
237
};
238
// Mark the calling thread as done and alert join_all().
239
240
local void reenter(void *arg) {
struct capsule *capsule = arg;
241
242
// find this thread in the threads list by matching the thread id
243
244
245
246
pthread_t me = pthread_self();
possess_(&(threads_lock), capsule->file, capsule->line);
thread **prior = &(threads);
thread *match;
247
248
249
250
251
252
while ((match = *prior) != NULL) {
if (pthread_equal(match->id, me))
break;
prior = &(match->next);
}
if (match == NULL)
253
fail(ESRCH, capsule->file, capsule->line, "reenter lost");
254
255
// mark this thread as done and move it to the head of the list
256
257
258
match->done = 1;
if (threads != match) {
*prior = match->next;
259
match->next = threads;
260
261
262
threads = match;
}
263
// update the count of threads to be joined and alert join_all()
264
twist_(&(threads_lock), BY, +1, capsule->file, capsule->line);
265
266
267
268
// free the capsule resource, even if the thread is cancelled (though yarn
// doesn't use pthread_cancel() -- you never know)
my_free(capsule);
269
270
}
271
272
273
274
275
// All threads go through this routine. Just before a thread exits, it marks
// itself as done in the threads list and alerts join_all() so that the thread
// resources can be released. Use a cleanup stack so that the marking occurs
// even if the thread is cancelled.
local void *ignition(void *arg) {
276
277
struct capsule *capsule = arg;
278
// run reenter() before leaving
279
pthread_cleanup_push(reenter, arg);
280
281
// execute the requested function with argument
282
283
capsule->probe(capsule->payload);
284
// mark this thread as done, letting join_all() know, and free capsule
285
286
pthread_cleanup_pop(1);
287
// exit thread
288
289
290
return NULL;
}
291
292
// Not all POSIX implementations create threads as joinable by default, so that
// is made explicit here.
293
294
thread *launch_(void (*probe)(void *), void *payload,
char const *file, long line) {
295
296
297
298
// construct the requested call and argument for the ignition() routine
// (allocated instead of automatic so that we're sure this will still be
// there when ignition() actually starts up -- ignition() will free this
// allocation)
299
struct capsule *capsule = my_malloc(sizeof(struct capsule), file, line);
300
301
capsule->probe = probe;
capsule->payload = payload;
302
303
capsule->file = file;
capsule->line = line;
304
305
306
// assure this thread is in the list before join_all() or ignition() looks
// for it
307
possess_(&(threads_lock), file, line);
308
309
// create the thread and call ignition() from that thread
310
311
312
313
314
315
316
317
318
319
320
321
322
323
thread *th = my_malloc(sizeof(struct thread_s), file, line);
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (ret)
fail(ret, file, line, "attr_init");
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (ret)
fail(ret, file, line, "attr_setdetachstate");
ret = pthread_create(&(th->id), &attr, ignition, capsule);
if (ret)
fail(ret, file, line, "create");
ret = pthread_attr_destroy(&attr);
if (ret)
fail(ret, file, line, "attr_destroy");
324
325
// put the thread in the threads list for join_all()
326
th->done = 0;
327
th->next = threads;
328
threads = th;
329
release_(&(threads_lock), file, line);
330
331
332
return th;
}
333
void join_(thread *ally, char const *file, long line) {
334
// wait for thread to exit and return its resources
335
336
337
int ret = pthread_join(ally->id, NULL);
if (ret)
fail(ret, file, line, "join");
338
339
// find the thread in the threads list
340
341
342
possess_(&(threads_lock), file, line);
thread **prior = &(threads);
thread *match;
343
344
345
346
347
348
while ((match = *prior) != NULL) {
if (match == ally)
break;
prior = &(match->next);
}
if (match == NULL)
349
fail(ESRCH, file, line, "join lost");
350
351
// remove thread from list and update exited count, free thread
352
353
354
if (match->done)
threads_lock.value--;
*prior = match->next;
355
release_(&(threads_lock), file, line);
356
357
358
my_free(ally);
}
359
360
361
362
// This implementation of join_all() only attempts to join threads that have
// announced that they have exited (see ignition()). When there are many
// threads, this is faster than waiting for some random thread to exit while a
// bunch of other threads have already exited.
363
int join_all_(char const *file, long line) {
364
// grab the threads list and initialize the joined count
365
366
int count = 0;
possess_(&(threads_lock), file, line);
367
368
// do until threads list is empty
369
while (threads != NULL) {
370
// wait until at least one thread has reentered
371
wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line);
372
373
// find the first thread marked done (should be at or near the top)
374
375
thread **prior = &(threads);
thread *match;
376
377
378
379
380
381
while ((match = *prior) != NULL) {
if (match->done)
break;
prior = &(match->next);
}
if (match == NULL)
382
fail(ESRCH, file, line, "join_all lost");
383
384
385
// join the thread (will be almost immediate), remove from the threads
// list, update the reenter count, and free the thread
386
387
388
int ret = pthread_join(match->id, NULL);
if (ret)
fail(ret, file, line, "join");
389
390
391
392
393
394
threads_lock.value--;
*prior = match->next;
my_free(match);
count++;
}
395
// let go of the threads list and return the number of threads joined
396
release_(&(threads_lock), file, line);
397
398
return count;
}