Skip to content

Commit

Permalink
Merge pull request google-research#1339 from axch/work-stealing-2
Browse files Browse the repository at this point in the history
Complete draft of work-stealing runtime
  • Loading branch information
axch authored Nov 17, 2023
2 parents be6befd + 77ed709 commit 941d7f8
Showing 1 changed file with 220 additions and 50 deletions.
270 changes: 220 additions & 50 deletions src/lib/work-stealing.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@

struct Work;

// A Task is a function pointer that consumes a Work* and returns a Work*
// The input is the `Work` Always passed a pointer to the containing Work struct
// A Task is a function pointer that consumes a Work* and returns a Work*.
// The input `Work` is always a pointer to the Work struct containing that
// Task, which it accepts in order to be able to deallocate it.
// Question: Do we also want to tell the task the thread id of the worker
// that's running it? Maybe to support thread-local accumulators for
// commutative reductions?
// Oh yeah, also to know which worker's queue to put more stuff onto.
// Trampoline: returns the next work to do, if ready, or NULL if not.
typedef struct Work* (*Task)(struct Work*);
//
// The return value is a trampoline: a `Task` returns the next work to do, if
// it's runnable, or NULL if there isn't one.
//
// `Task`s are internal to the work-stealing system; the client does not
// provide or consume `Task`s.
typedef struct Work* (*Task)(int thread_id, struct Work*);

typedef struct Work {
Task code;
atomic_int join_count;
void* args[];
} Work;

Work* EMPTY = -1;
Work* ABORT = -2;
Work* EMPTY = (Work*)-1;
Work* ABORT = (Work*)-2;

/////////////////////////
// Work-stealing deque //
Expand All @@ -56,8 +62,13 @@ typedef struct {
void init(Deque* q, int size_hint) {
// This does not appear in https://fzn.fr/readings/ppopp13.pdf; I am imputing
// it.
atomic_init(&q->top, 0);
atomic_init(&q->bottom, 0);
// Initialize the buffer indices at 1 to prevent underflow. The buffer
// indices are of type `size_t`; the top index never decreases, and the bottom
// index is never less than the top index at rest. The smallest intermediate
// value ever used is `bottom-1`, inside `take`. Initializing `top` and
// `bottom` at 1 suffices to prevent this computation from underflowing.
atomic_init(&q->top, 1);
atomic_init(&q->bottom, 1);
Array* a = (Array*) malloc(sizeof(Array) + sizeof(Work*) * size_hint);
atomic_init(&a->size, size_hint);
atomic_init(&q->array, a);
Expand Down Expand Up @@ -147,7 +158,7 @@ Work* steal(Deque *q) {
// Worker loop //
/////////////////

#define nthreads 24
int thread_count;

Deque* thread_queues;

Expand All @@ -156,7 +167,7 @@ atomic_bool done;
// Trampoline: Returns the next item to work on, or NULL if there aren't any.
Work* do_one_work(int id, Work* work) {
printf("Worker %d running item %p\n", id, work);
return (*(work->code))(work);
return (*(work->code))(id, work);
}

void do_work(int id, Work* work) {
Expand Down Expand Up @@ -185,7 +196,7 @@ void* thread(void* payload) {
} else {
// No work in my own queue
Work* stolen = EMPTY;
for (int i = 0; i < nthreads; ++i) {
for (int i = 0; i < thread_count; ++i) {
if (i == id) continue;
stolen = steal(&thread_queues[i]);
if (stolen == ABORT) {
Expand All @@ -198,7 +209,7 @@ void* thread(void* payload) {
}
}
if (stolen == EMPTY) {
// Even though the queues we all empty when I tried them, somebody
// Even though the queues were all empty when I tried them, somebody
// might have added some more work since. Busy-wait until the global
// "done" flag is set.
if (atomic_load(&done)) {
Expand All @@ -215,63 +226,222 @@ void* thread(void* payload) {
return NULL;
}

////////////////////
// Client program //
////////////////////
///////////////////////////
// Dex codegen interface //
///////////////////////////

Work* print_task(Work* w) {
int* payload = (int*)w->args[0];
int item = *payload;
printf("Did item %p with payload %d\n", w, item);
Work* cont = (Work*)w->args[1];
free(payload);
free(w);
return join_work(cont);
// A (pointer to a) code-generated function.
// This should either return the result of calling `begin_pure_loop` or return `NULL`.
typedef Work* (*GenBlock)(int thread_id, void** env);

// A (pointer to a) code-generated function that is a loop body.
// This should either return the result of calling `begin_pure_loop` or return `NULL`.
typedef Work* (*GenLoopBody)(int thread_id, int iteration, void** env);

// Call this from Haskell once at the start of the process.
// The integer is the number of OS threads to spawn to run work-stealing.
void initialize_work_stealing(int nthreads);

// Call this from Haskell to run a top block with work-stealing. When this
// exits, the work-stealing system is stopped, and results are written to their
// proper `Dest`s.
void execute_top_block(GenBlock body, void** env);

// Call this from code-gen at the end of each top-level block.
void finish_work_stealing();

// Call this from code-gen to start a loop that you want work-stealing to
// parallelize.
// This assumes that the environment frame for the loop body and for the
// continuation is the same. That assumption isn't hard to change.
Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count);

/////////////////////////
// Dex codegen support //
/////////////////////////

Work* run_gen_block(int thread_id, Work* self) {
GenBlock body = (GenBlock)self->args[0];
void** env = (void**)self->args[1];
free(self);
return body(thread_id, env);
}

Work* done_task(Work* w) {
free(w);
atomic_store(&done, true);
return NULL;
// Return a `Work*` such that joining it `joiners` times is equivalent to joining
// the argument `cont` once.
// - `joiners` >= 1.
// - Do not use `cont` directly afterward, as this is allowed to mutate it.
Work* increase_cont_capacity(Work* cont, int joiners) {
// One way to achieve the goal is to just atomically increase the `join_count`
// of `cont` by `joiners - 1` and reuse it:
atomic_fetch_add(&cont->join_count, joiners - 1);
return cont;
// An alternative would be allocate a new `Work` with `join_count` equal to
// `joiners` and `task` to `join` the current `cont`. The advantage of this
// alternative is avoiding the atomic increment (on a potentially contentious
// variable if `cont` has many joiners already); the disadvantage is the
// allocation (which presumably entails some synchronization of its own), and
// an extra indirection at the end due to executing that mini-task.
}

int main(int argc, char **argv) {
Work* execute_pure_loop_task(int id, Work* self);

Work* execute_pure_loop(int thread_id, Work* cont, GenLoopBody body, void** env, int start_iter, int end_iter) {
if (end_iter - start_iter <= 1) {
// Few enough iterations; just do them.
for (int i = start_iter; i < end_iter; i++) {
do_work(thread_id, body(thread_id, i, env));
}
return join_work(cont);
} else {
// Create Works that represent schedulable pieces of the loop.
int branching_factor = 2;
div_t iters_per_branch = div(end_iter - start_iter, branching_factor);
int this_iter = start_iter;
Work* subcont = increase_cont_capacity(cont, branching_factor);
// Queue up all but one chunk of the loop
for (int i = 0; i < branching_factor - 1; i++) {
int next_iter = this_iter + iters_per_branch.quot;
if (i < iters_per_branch.rem) {
next_iter++;
}
Work* section = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*));
section->code = &execute_pure_loop_task;
section->join_count = 0;
section->args[0] = subcont;
section->args[1] = body;
section->args[2] = env;
// TODO Is just casting ok here, or do I have to heap-allocate these ints?
// gcc complains about the integer and the pointer having different sizes.
section->args[3] = (void*)this_iter;
section->args[4] = (void*)next_iter;
push(&thread_queues[thread_id], section);
this_iter = next_iter;
}
// Do the last chunk directly yourself
return execute_pure_loop(thread_id, subcont, body, env, this_iter, end_iter);
}
}

Work* execute_pure_loop_task(int id, Work* self) {
Work* cont = self->args[0];
GenLoopBody body = self->args[1];
void** env = self->args[2];
int start_iter = (int)self->args[3];
int end_iter = (int)self->args[4];
return execute_pure_loop(id, cont, body, env, start_iter, end_iter);
}

Work* begin_pure_loop(int thread_id, GenLoopBody body, GenBlock cont, void** env, int trip_count) {
// TODO: If the whole loop is smaller than the grain size for
// execute_pure_loop, I can avoid allocating the `Work` for the continuation
// too by just executing the iterations inline here.
Work* k = (Work*) malloc(sizeof(Work) + 5 * sizeof(int*));
k->code = &run_gen_block;
k->join_count = 1;
k->args[0] = cont;
k->args[1] = env;
return execute_pure_loop(thread_id, k, body, env, 0, trip_count);
}

pthread_t* the_threads;
int* tids;

void initialize_work_stealing(int nthreads) {
// Check that top and bottom are 64-bit so they never overflow
assert(sizeof(atomic_size_t) == 8);
pthread_t threads[nthreads];
int tids[nthreads];
the_threads = (pthread_t*) malloc(nthreads * sizeof(pthread_t));
tids = (int*) malloc(nthreads * sizeof(int));
thread_queues = (Deque*) malloc(nthreads * sizeof(Deque));
int nprints = 10;
atomic_store(&done, false);
Work* done_work = (Work*) malloc(sizeof(Work));
done_work->code = &done_task;
done_work->join_count = nthreads * nprints;
for (int i = 0; i < nthreads; ++i) {
tids[i] = i;
init(&thread_queues[i], 8);
for (int j = 0; j < nprints; ++j) {
Work* work = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*));
work->code = &print_task;
work->join_count = 0;
int* payload = malloc(sizeof(int));
*payload = 1000 * i + j;
work->args[0] = payload;
work->args[1] = done_work;
push(&thread_queues[i], work);
}
}
for (int i = 0; i < nthreads; ++i) {
if (pthread_create(&threads[i], NULL, thread, &tids[i]) != 0) {
thread_count = nthreads;
}

void execute_top_block(GenBlock body, void** env) {
Work* job = (Work*) malloc(sizeof(Work) + 2 * sizeof(int*));
job->code = &run_gen_block;
job->join_count = 0;
job->args[0] = body;
job->args[1] = env;
atomic_store(&done, false);
push(&thread_queues[0], job);
// TODO: Do we really want to start and kill all the threads for every top
// level block, or is there a way to suspend and reuse them?
for (int i = 0; i < thread_count; ++i) {
if (pthread_create(&the_threads[i], NULL, thread, &tids[i]) != 0) {
perror("failed to start the thread");
exit(EXIT_FAILURE);
}
}
for (int i = 0; i < nthreads; ++i) {
if (pthread_join(threads[i], NULL) != 0) {
for (int i = 0; i < thread_count; ++i) {
if (pthread_join(the_threads[i], NULL) != 0) {
perror("failed to join the thread");
exit(EXIT_FAILURE);
}
}
printf("Expect %d lines of output (including this one)\n", 2 * nthreads * nprints + nthreads + 2);
// We expect all the queues to be empty at this point. TODO: Check?
}

void finish_work_stealing() {
atomic_store(&done, true);
}

////////////////////
// Client program //
////////////////////

// A slightly silly program that iterates a single loop a synamic number of
// times, and has each loop iteration (and the coda) echo the trip count + 1,
// just to show that data can be mutated.

Work* gen_loop_body(int thread_id, int iteration, void** env) {
int* payload = (int*)env[0];
int item = *payload;
printf("Loop iteration %d on worker %d, payload %d\n",
iteration, thread_id, item);
return NULL;
}

Work* end_gen_block(int thread_id, void** env) {
int* payload = (int*)env[0];
int item = *payload;
printf("Finishing on worker %d, payload %d\n", thread_id, item);
free(payload);
free(env);
finish_work_stealing();
return NULL;
}

Work* start_gen_block(int thread_id, void** env) {
int* payload = (int*)env[0];
int item = *payload;
printf("Starting on worker %d, payload %d\n", thread_id, item);
*payload = item + 1;
return begin_pure_loop(thread_id, gen_loop_body, end_gen_block, env, item);
}

int main(int argc, char **argv) {
initialize_work_stealing(24);
void** env = malloc(sizeof(int*));
int* payload = malloc(sizeof(int));
int num_iters = 200;
*payload = num_iters;
env[0] = payload;
execute_top_block(&start_gen_block, env);
int expected_output_lines =
1 // "Starting"
+ 1 // "Finishing"
+ thread_count // "Worker n finished"
+ 1 // Expected line report
+ num_iters // Each loop iteration
+ 1 // "Worker running item" for the entry point
+ 1 // "Worker running item" for the end
+ (num_iters - 1) // "Worker running item" for itermediates in the loop tree
;
printf("Expect %d lines of output (including this one)\n",
expected_output_lines);
return 0;
}

0 comments on commit 941d7f8

Please sign in to comment.