Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robj/trunk race hack #635

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 104 additions & 9 deletions src/trunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3152,6 +3152,87 @@ trunk_inc_branch_range(trunk_handle *spl,
}
}

static inline void
trunk_perform_gc_tasks(trunk_handle *spl, bool immediate)
{
uint64 my_idx = spl->gc_task_queue_head;
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
uint64 enqueue_time = task->enqueue_time;
int i = 0;
while (i < 2 && enqueue_time != 0
&& (immediate
|| TRUNK_GC_DELAY < platform_timestamp_elapsed(enqueue_time)))
{
if (__sync_bool_compare_and_swap(&task->enqueue_time, enqueue_time, 0)) {
__sync_fetch_and_add(&spl->gc_task_queue_head, 1);
switch (task->type) {
case TRUNK_GC_TYPE_ROUTING_FILTER_ZAP:
routing_filter_zap(spl->cc, &task->args.filter);
break;
case TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE:
btree_dec_ref_range(
spl->cc,
&spl->cfg.btree_cfg,
task->args.btree_dec_ref_range.root_addr,
key_buffer_key(&task->args.btree_dec_ref_range.min_key),
key_buffer_key(&task->args.btree_dec_ref_range.max_key));
key_buffer_deinit(&task->args.btree_dec_ref_range.min_key);
key_buffer_deinit(&task->args.btree_dec_ref_range.max_key);
break;
default:
platform_default_log("Unknown GC task type %d\n", task->type);
break;
}
}

my_idx = spl->gc_task_queue_head;
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
task = &spl->gc_task_queue[my_idx];
enqueue_time = task->enqueue_time;
i++;
}
}

static inline void
trunk_enqueue_routing_filter_zap(trunk_handle *spl, routing_filter *filter)
{
trunk_perform_gc_tasks(spl, FALSE);

uint64 my_idx = __sync_fetch_and_add(&spl->gc_task_queue_tail, 1);
platform_assert(my_idx - spl->gc_task_queue_head < TRUNK_GC_TASK_QUEUE_SIZE);
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
platform_assert(task->enqueue_time == 0);

task->type = TRUNK_GC_TYPE_ROUTING_FILTER_ZAP;
task->args.filter = *filter;
task->enqueue_time = platform_get_timestamp();
}

static inline void
trunk_enqueue_btree_dec_ref_range(trunk_handle *spl,
uint64 btree_root_addr,
key start_key,
key end_key)
{
trunk_perform_gc_tasks(spl, FALSE);

uint64 my_idx = __sync_fetch_and_add(&spl->gc_task_queue_tail, 1);
platform_assert(my_idx - spl->gc_task_queue_head < TRUNK_GC_TASK_QUEUE_SIZE);
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
platform_assert(task->enqueue_time == 0);

task->type = TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE;
task->args.btree_dec_ref_range.root_addr = btree_root_addr;
key_buffer_init_from_key(
&task->args.btree_dec_ref_range.min_key, spl->heap_id, start_key);
key_buffer_init_from_key(
&task->args.btree_dec_ref_range.max_key, spl->heap_id, end_key);
task->enqueue_time = platform_get_timestamp();
}

static inline void
trunk_zap_branch_range(trunk_handle *spl,
trunk_branch *branch,
Expand All @@ -3163,8 +3244,10 @@ trunk_zap_branch_range(trunk_handle *spl,
platform_assert((key_is_null(start_key) && key_is_null(end_key))
|| (type != PAGE_TYPE_MEMTABLE && !key_is_null(start_key)));
platform_assert(branch->root_addr != 0, "root_addr=%lu", branch->root_addr);
btree_dec_ref_range(
spl->cc, &spl->cfg.btree_cfg, branch->root_addr, start_key, end_key);
trunk_enqueue_btree_dec_ref_range(
spl, branch->root_addr, start_key, end_key);
// btree_dec_ref_range(
// spl->cc, &spl->cfg.btree_cfg, branch->root_addr, start_key, end_key);
}

/*
Expand Down Expand Up @@ -3914,8 +3997,9 @@ trunk_dec_filter(trunk_handle *spl, routing_filter *filter)
if (filter->addr == 0) {
return;
}
cache *cc = spl->cc;
routing_filter_zap(cc, filter);
trunk_enqueue_routing_filter_zap(spl, filter);
// cache *cc = spl->cc;
// routing_filter_zap(cc, filter);
}

/*
Expand Down Expand Up @@ -4885,13 +4969,15 @@ trunk_branch_iterator_deinit(trunk_handle *spl,
if (itor->root_addr == 0) {
return;
}
cache *cc = spl->cc;
btree_config *btree_cfg = &spl->cfg.btree_cfg;
key min_key = itor->min_key;
key max_key = itor->max_key;
key min_key = itor->min_key;
key max_key = itor->max_key;
btree_iterator_deinit(itor);
if (should_dec_ref) {
btree_dec_ref_range(cc, btree_cfg, itor->root_addr, min_key, max_key);
trunk_enqueue_btree_dec_ref_range(spl, itor->root_addr, min_key, max_key);
// cache *cc = spl->cc;
// btree_config *btree_cfg = &spl->cfg.btree_cfg;
// btree_dec_ref_range(cc, btree_cfg, itor->root_addr, min_key,
// max_key);
}
}

Expand Down Expand Up @@ -7872,6 +7958,10 @@ trunk_prepare_for_shutdown(trunk_handle *spl)
platform_status rc = task_perform_until_quiescent(spl->ts);
platform_assert_status_ok(rc);

while (spl->gc_task_queue_head < spl->gc_task_queue_tail) {
trunk_perform_gc_tasks(spl, TRUE);
}

// destroy memtable context (and its memtables)
memtable_context_destroy(spl->heap_id, spl->mt_ctxt);

Expand Down Expand Up @@ -7934,6 +8024,11 @@ trunk_destroy(trunk_handle *spl)
srq_deinit(&spl->srq);
trunk_prepare_for_shutdown(spl);
trunk_for_each_node(spl, trunk_node_destroy, NULL);

while (spl->gc_task_queue_head < spl->gc_task_queue_tail) {
trunk_perform_gc_tasks(spl, TRUE);
}

mini_unkeyed_dec_ref(spl->cc, spl->mini.meta_head, PAGE_TYPE_TRUNK, FALSE);
// clear out this splinter table from the meta page.
allocator_remove_super_addr(spl->al, spl->id);
Expand Down
34 changes: 34 additions & 0 deletions src/trunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ typedef struct trunk_compacted_memtable {
trunk_compact_bundle_req *req;
} trunk_compacted_memtable;

/* The trunk_gc infrastructure is to compensate for a race condition in this
* version of the trunk. The problem is that the trunk can under some
* circumstances garbage-collect a branch or filter while there still exist
* readers that could access it. A new version of the trunk without the race
* will be merged soon. In the meantime, the gc system just delays garbage
* collection of branches and filters by 10 seconds, which should be pleanty
* enough time to ensure that any old readers will have finished. */

typedef enum trunk_gc_type {
TRUNK_GC_TYPE_ROUTING_FILTER_ZAP,
TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE,
} trunk_gc_type;

typedef struct trunk_gc_task {
uint64_t enqueue_time;
trunk_gc_type type;
union {
routing_filter filter;
struct {
uint64 root_addr;
key_buffer min_key;
key_buffer max_key;
} btree_dec_ref_range;
} args;
} trunk_gc_task;

#define TRUNK_GC_TASK_QUEUE_SIZE (10 * 1024)
#define TRUNK_GC_DELAY (10ULL * 1000 * 1000 * 1000) // 10s

struct trunk_handle {
volatile uint64 root_addr;
uint64 super_block_idx;
Expand Down Expand Up @@ -224,6 +253,11 @@ struct trunk_handle {
// space rec queue
srq srq;

// gc
uint64 gc_task_queue_head;
uint64 gc_task_queue_tail;
trunk_gc_task gc_task_queue[TRUNK_GC_TASK_QUEUE_SIZE];

trunk_compacted_memtable compacted_memtable[/*cfg.mt_cfg.max_memtables*/];
};

Expand Down