-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_pool.c
535 lines (470 loc) · 15.8 KB
/
thread_pool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
#include <assert.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <thread_pool.h>
/*
* Macro creates a variable used to enable or disable print statements
* depending on if the application was compiled in either debug or released mode
*/
#ifdef NDEBUG
#define DEBUG_PRINT 0
#define DEBUG_STATIC static
#else
#define DEBUG_PRINT 1
#define DEBUG_STATIC
#endif // NDEBUG
/*
* Enable printing debug messages when in debug mode. To print a non text
* replacement you have to use debug_print("%s\n", "My text") otherwise
* it is used just like any other printf variant
* debug_print("My test %s\n", "more text");
*
* The debug_print_err adds the file and line number to the output for more
* information when wanting to debug.
*/
#define debug_print_err(fmt, ...) \
do { if (DEBUG_PRINT) fprintf(stderr, "%s:%d:%s(): " fmt, __FILE__, \
__LINE__, __func__, __VA_ARGS__); } while (0)
#define debug_print(fmt, ...) \
do { if (DEBUG_PRINT) fprintf(stderr, fmt, __VA_ARGS__); } while (0)
// Worker objects is a struct containing a pointer to the thpool and the
// thread itself
typedef struct worker_t
{
int id;
thrd_t thread;
thpool_t * thpool;
} worker_t;
// A job is a queued up object containing the function that a woken thread
// should perform.
typedef struct job_t job_t;
struct job_t
{
job_t * next_job;
void (* job_function)(void * job_arg);
void * job_arg;
};
// The work queue contains a list of first come, first served jobs that are
// consumed by the thread pool. When the queue is empty, the threads will
// block until a new job is enqueued.
typedef struct work_queue_t
{
job_t * job_head;
job_t * job_tail;
atomic_uint_fast64_t job_count;
mtx_t queue_access_mutex;
} work_queue_t;
// The main structure contains pointers to the mutexes and atomic variables
// that maintain synchronization between all the threads
struct thpool_t
{
uint8_t thread_count;
atomic_uint_fast8_t workers_alive;
atomic_uint_fast8_t workers_working;
atomic_uint_fast8_t thpool_active;
worker_t ** workers;
work_queue_t * work_queue;
mtx_t run_mutex;
cnd_t run_cond;
mtx_t wait_mutex;
cnd_t wait_cond;
};
typedef enum util_verify_t
{
UV_VALID_ALLOC,
UV_INVALID_ALLOC
} util_verify_t;
static void thread_pool(worker_t * worker);
static job_t * thpool_dequeue_job(thpool_t * thpool);
static util_verify_t verify_alloc(void * ptr);
static void thpool_cleanup(thpool_t ** thpool_ptr);
static int8_t init_workers(thpool_t * thpool);
/*!
* @brief Create a threadpool with the `thread_count` number of threads in the
* pool. This function will block until all threads have been initialized. The
* initialized threads will execute their work function indefinitely until
* the destroy function is called. The threads will remain blocking in their
* work function until a job is enqueued. As soon as a job is enqueued, a thread
* will be unblocked to execute the task in the queue.
*
* @param thread_count Number of threads to spawn. Amount must be greater than
* 0.
* @return Pointer to the threadpool object, or NULL if a failure occured.
*/
thpool_t * thpool_init(uint8_t thread_count)
{
// Return NULL if 0 was passed in
if (0 == thread_count)
{
debug_print("%s", "Attempted to allocate thpool with 0 threads\n");
goto null_ret;
}
// Initialize the thread pool
thpool_t * thpool = (thpool_t *)calloc(1, sizeof(thpool_t));
if (UV_INVALID_ALLOC == verify_alloc(thpool))
{
goto null_ret;
}
thpool->thread_count = thread_count;
thpool->thpool_active = 1;
// Init the work queue
work_queue_t * work_queue = (work_queue_t *)calloc(1, sizeof(work_queue_t));
if (UV_INVALID_ALLOC == verify_alloc(work_queue))
{
goto cleanup;
}
thpool->work_queue = work_queue;
// Initialize the mutexes
int result = mtx_init(&thpool->run_mutex, mtx_plain);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to init run_mutex\n");
goto cleanup;
}
result = cnd_init(&thpool->run_cond);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to init run_cond\n");
goto cleanup;
}
result = mtx_init(&work_queue->queue_access_mutex, mtx_plain);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to init queue_access\n");
goto cleanup;
}
result = mtx_init(&thpool->wait_mutex, mtx_plain);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to init wait_mutex\n");
goto cleanup;
}
result = cnd_init(&thpool->wait_cond);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to init wait_cond\n");
goto cleanup;
}
// Init the workers; this will make them start executing the work function
result = init_workers(thpool);
if (-1 == result)
{
goto cleanup;
}
//Block until all threads have been initialized
while (thread_count != atomic_load(&thpool->workers_alive))
{
debug_print("[THPOOL] Waiting for threads to init [%d/%d]\n",
atomic_load(&thpool->workers_alive),
thread_count);
usleep(200000);
}
debug_print("%s\n", "[THPOOL] Thread pool ready\n");
return thpool;
cleanup:
thpool_destroy(&thpool);
null_ret:
return NULL;
}
static int8_t init_workers(thpool_t * thpool)
{
// Init the array of workers
thpool->workers = (worker_t **)calloc(thpool->thread_count, sizeof(worker_t *));
if (UV_INVALID_ALLOC == verify_alloc(thpool->workers))
{
return -1;
}
int result;
for (uint8_t i = 0; i < thpool->thread_count; i++)
{
thpool->workers[i] = (worker_t *)malloc(sizeof(worker_t));
worker_t * worker = thpool->workers[i];
if (UV_INVALID_ALLOC == verify_alloc(worker))
{
return -1;
}
worker->thpool = thpool;
worker->id = i;
result = thrd_create(& (worker->thread),
(thrd_start_t)thread_pool,
worker);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to create a thread for the thread pool\n");
return -1;
}
result = thrd_detach(worker->thread);
if (thrd_success != result)
{
debug_print_err("%s", "Unable to detach thread\n");
return -1;
}
}
return 0;
}
/*!
* @brief The function will block until all jobs have been consumed and all
* threads return to their block position waiting for jobs to be enqueued.
*
* @param thpool Pointer to the thread pool object
*/
void thpool_wait(thpool_t * thpool)
{
if (NULL == thpool)
{
return;
}
mtx_lock(&thpool->wait_mutex);
while ((0 != atomic_load(&thpool->workers_working)) || (0 != atomic_load(&thpool->work_queue->job_count)))
{
debug_print("\n[THPOOL] Waiting for threadpool to finish "
"[Workers working: %d] || [Jobs in queue: %ld]\n",
atomic_load(&thpool->workers_working),
atomic_load(&thpool->work_queue->job_count));
cnd_wait(&thpool->wait_cond, &thpool->wait_mutex);
}
debug_print("%s\n", "[THPOOL] Finished waiting...");
mtx_unlock(&thpool->wait_mutex);
}
/*!
* @brief Function will signal the threads to exit the work function. The
* threads already executing their task will be left to finish their task.
* This will cause the function to block until all threads have exited.
*
* @param thpool Reference to the thread pool object **Note the double pointer**
*/
void thpool_destroy(thpool_t ** thpool_ptr)
{
if (NULL == thpool_ptr)
{
return;
}
thpool_t * thpool = * thpool_ptr;
if (NULL == thpool)
{
return;
}
// Set running bool to false and set the queue to have a value of 1
// instructing the threads to look for work.
atomic_fetch_sub(&thpool->thpool_active, 1);
// update all the threads until they are done with their tasks
while (0 != atomic_load(&thpool->workers_alive))
{
debug_print("\n[THPOOL] Broadcasting threads to exit...\n"
"Workers still alive: %d\n",
atomic_load(&thpool->workers_alive));
cnd_broadcast(&thpool->run_cond);
usleep(200000);
}
thpool_cleanup(thpool_ptr);
}
/*!
* @brief Enqueue a job into the job queue. As soon as a job is enqueued, the
* thread pool will signal the threads to start consuming tasks.
*
* @param thpool Pointer to the thread pool object
* @param job_function Function the thread will execute
* @param job_arg Void pointer passed to the function that the thread will
* execute.
* @return THP_SUCCESS for successful enqueue otherwise THP_FAILURE
*/
thpool_status thpool_enqueue_job(thpool_t * thpool, void (* job_function)(void *), void * job_arg)
{
if ((NULL == thpool) || (NULL == job_function))
{
return THP_FAILURE;
}
job_t * job = (job_t *)malloc(sizeof(job_t));
if (UV_INVALID_ALLOC == verify_alloc(job))
{
return THP_FAILURE;
}
job->job_arg = job_arg;
job->job_function = job_function;
job->next_job = NULL;
work_queue_t * work_queue = thpool->work_queue;
mtx_lock(&work_queue->queue_access_mutex);
// If job queue is empty then assign the new job as the head and tail
if (0 == atomic_load(&work_queue->job_count))
{
work_queue->job_head = job;
work_queue->job_tail = job;
}
else // If work queue HAS jobs already
{
work_queue->job_tail->next_job = job;
work_queue->job_tail = job;
}
atomic_fetch_add(&work_queue->job_count, 1);
// Signal at least one thread that the run condition has changed
// indicating that a new job has been added to the queue
debug_print("[THPOOL] New job enqueued. Total jobs in queue: %ld\n",
atomic_load(&work_queue->job_count));
mtx_unlock(&work_queue->queue_access_mutex);
cnd_signal(&thpool->run_cond);
return THP_SUCCESS;
}
/*!
* @brief Remove a job from the job queue
* @param thpool Pointer to the thpool object
* @return Pointer to the next job object
*/
static job_t * thpool_dequeue_job(thpool_t * thpool)
{
mtx_lock(&thpool->work_queue->queue_access_mutex);
work_queue_t * work_queue = thpool->work_queue;
job_t * work = work_queue->job_head;
// If there is only one job queued, then prep the queue to point to "none"
if (1 == atomic_load(&work_queue->job_count))
{
work_queue->job_head = NULL;
work_queue->job_tail = NULL;
atomic_fetch_sub(&work_queue->job_count, 1);
}
// Else if the queue has more than on task left, then update head to
// point to the next item
else if (atomic_load(&work_queue->job_count) > 1)
{
work_queue->job_head = work->next_job;
atomic_fetch_sub(&work_queue->job_count, 1);
}
mtx_unlock(&work_queue->queue_access_mutex);
// Signal the threadpool that there are tasks in the queue
cnd_signal(&thpool->run_cond);
return work;
}
/*!
* @brief Function where all threads in the thread pool live. All threads will
* block while the work queue is empty and the thread pool is active. When
* ever one of these conditions are no longer true, the thread will wake up.
* @param worker Pointer to the worker object
*/
static void thread_pool(worker_t * worker)
{
// Increment the number of threads alive. This is useful to indicate that
// a thread has successfully init
atomic_fetch_add(&worker->thpool->workers_alive, 1);
thpool_t * thpool = worker->thpool;
while (1 == atomic_load(&thpool->thpool_active))
{
mtx_lock(&thpool->run_mutex);
// Block while the work queue is empty
while ((0 == atomic_load(&thpool->work_queue->job_count)) && (1 == atomic_load(&thpool->thpool_active)))
{
debug_print("[THPOOL] Thread %d waiting for a job...[threads: %d || working: %d]\n",
worker->id, thpool->workers_alive, thpool->workers_working);
cnd_wait(&thpool->run_cond, &thpool->run_mutex);
}
// As soon as the thread wakes up, unlock the run lock
// We do not need it locked for operation
mtx_unlock(&thpool->run_mutex);
// Second check to make sure that the woken up thread should
// execute work logic
if (0 == atomic_load(&thpool->thpool_active))
{
// If there is no more work, signal the wait_cond about no work
// being available incase it is waiting for the queue to be empty
if (0 == atomic_load(&thpool->work_queue->job_count))
{
cnd_signal(&thpool->wait_cond);
}
break;
}
// Before beginning work, increment the working thread count
atomic_fetch_add(&thpool->workers_working, 1);
debug_print("[THPOOL] Thread %d activated, starting work..."
"[threads: %d || working: %d]\n",
worker->id,
atomic_load(&thpool->workers_alive),
atomic_load(&thpool->workers_working));
/*
* Fetch a job and execute it
*/
job_t * job = thpool_dequeue_job(thpool);
if (NULL != job)
{
job->job_function(job->job_arg);
free(job);
}
// Decrement threads working before going back to blocking
atomic_fetch_sub(&thpool->workers_working, 1);
debug_print("[THPOOL] Thread %d finished work... [threads: %d || working: %d]\n",
worker->id,
atomic_load(&thpool->workers_alive),
atomic_load(&thpool->workers_working));
// If there is no more work, signal the wait_cond about no work
// being available incase it is waiting for the queue to be empty
if (0 == atomic_load(&thpool->work_queue->job_count))
{
cnd_signal(&thpool->wait_cond);
}
}
debug_print("[THPOOL] Thread %d is exiting...[threads: %d || working: %d]\n",
worker->id,
atomic_load(&thpool->workers_alive),
atomic_load(&thpool->workers_working));
atomic_fetch_sub(&thpool->workers_alive, 1);
return;
}
/*!
* @brief Perform the destruction of the thread pool object by NULL-ing all
* references
*
* @param thpool_ptr Reference to the thread pool object
*/
static void thpool_cleanup(thpool_t ** thpool_ptr)
{
thpool_t * thpool = *thpool_ptr;
if (NULL == thpool)
{
return;
}
// Free any jobs left in the queue that have not been consumed
if (NULL != thpool->work_queue)
{
job_t * job = thpool->work_queue->job_head;
job_t * temp;
while (NULL != job)
{
temp = job->next_job;
job->next_job = NULL;
free(job);
job = temp;
}
mtx_destroy(&thpool->work_queue->queue_access_mutex);
free(thpool->work_queue);
thpool->work_queue = NULL;
}
// Free the threads
if (NULL != thpool->workers)
{
for (uint8_t i = 0; i < thpool->thread_count; i++)
{
worker_t * worker = thpool->workers[i];
if (NULL != worker)
{
worker->thpool = NULL;
free(worker);
}
}
free(thpool->workers);
thpool->workers = NULL;
}
// Free the mutexes
mtx_destroy(&thpool->run_mutex);
cnd_destroy(&thpool->run_cond);
// Wipe the reference to the thpool itself to avoid use after free
free(thpool);
*thpool_ptr = NULL;
}
static util_verify_t verify_alloc(void * ptr)
{
if (NULL == ptr)
{
debug_print("%s", "[!] Unable to allocate memory");
return UV_INVALID_ALLOC;
}
return UV_VALID_ALLOC;
}