From 7cabc724b9759a0655f3b9362a1bb0a6a7de4f70 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Thu, 20 Jun 2024 17:28:27 +0200 Subject: [PATCH] Attempt to simplify the use of the WorkerTask structure. Once the task state is set to WTS_INIT, only the worker should change it. The only exception is that the worker could not be launched. --- pg_squeeze.h | 40 +++++--- worker.c | 264 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 180 insertions(+), 124 deletions(-) diff --git a/pg_squeeze.h b/pg_squeeze.h index ac8e2df..e9b0be5 100644 --- a/pg_squeeze.h +++ b/pg_squeeze.h @@ -301,9 +301,10 @@ typedef struct WorkerSlot * Use this when setting / clearing the fields above. * * Note that, when setting, workerData->lock in exclusive mode must be - * held in addition. This is to ensure the maximum number of workers is - * not exceeded when multiple workers search for a slot concurrently. On - * the other hand, the spinlock is sufficient to clear the fields. + * held in addition. This is to ensure the maximum number of workers per + * database is not exceeded when multiple workers search for a slot + * concurrently. On the other hand, the spinlock is sufficient to clear + * the fields. * * Note that we use MemSet() to reset 'progress', which is hopefully * o.k. to do under spinlock. XXX Consider using atomics for the @@ -353,27 +354,38 @@ typedef enum typedef struct WorkerTask { /* - * State from the perspective of the requesting backend or scheduler - * worker. + * State of the task. + * + * The "requester" (i.e. regular backend or squeeze scheduler) sets the + * state to WTS_INIT, the scheduler worker then sets it to WTS_IN_PROGRESS + * and eventually to WTS_UNUSED. However, in order to avoid leak, the + * requester can set it to WTS_UNUSED too, if it's sure that the worker + * failed to start. */ - bool in_use; + WorkerTaskState worker_state; + /* See the comments of exit_if_requested(). */ bool exit_requested; - /* Worker that performs the task sets this field. */ - WorkerTaskState worker_state; /* * Use this when setting / clearing the fields above. * - * Note that, when setting "in_use" to true or "worker_state" to WTS_INIT, - * workerData->lock in exclusive mode must be held in addition. This is to - * ensure that at most one task exists per table. On the other hand, the - * spinlock is sufficient both to clear "in_use" and to adjust - * "worker_state" to states other than INIT. + * Note that, when setting "worker_state" to WTS_INIT, workerData->lock + * must be held in exclusive mode in addition. This is because, when + * "allocating" the task using this status, we need to check if no other + * task exists for the same database and relation. */ slock_t mutex; - /* Change of these requires workerData->lock in exclusive mode. */ + /* + * Details of the task. + * + * Only the requester should change these fields, after he has "allocated" + * the task by setting the state to WTS_INIT. Therefore, no locking is + * required, except for "dbid", "relschema" and "relname" - those require + * workerData->lock in exclusive mode because they are used to check task + * uniqueness (i.e. no more than one worker per table). + */ Oid dbid; NameData relschema; NameData relname; diff --git a/worker.c b/worker.c index b1726fc..9c4fc37 100644 --- a/worker.c +++ b/worker.c @@ -156,7 +156,8 @@ static int squeezeWorkerSlotCount = 0; #define REPL_PLUGIN_NAME "pg_squeeze" static void interrupt_worker(WorkerTask *task); -static void release_task(WorkerTask *task, bool worker); +static void initialize_task(WorkerTask *task); +static void release_task(WorkerTask *task); static void squeeze_handle_error_app(ErrorData *edata, WorkerTask *task); static WorkerTask *get_unused_task(Oid dbid, Name relschema, Name relname, @@ -174,10 +175,10 @@ static void worker_sigterm(SIGNAL_ARGS); static void scheduler_worker_loop(void); static void cleanup_workers_and_tasks(bool interrupt); static void wait_for_worker_shutdown(SqueezeWorker *worker); -static void process_task(int task_idx); +static void process_task(void); static void create_replication_slots(int nslots, MemoryContext mcxt); static void drop_replication_slots(void); -static void cleanup_after_server_start(int task_idx); +static void cleanup_after_server_start(void); static void cleanup_repl_origins(void); static void cleanup_repl_slots(void); static Snapshot build_historic_snapshot(SnapBuild *builder); @@ -253,14 +254,9 @@ squeeze_worker_shmem_startup(void) WorkerTask *task; task = &workerData->tasks[i]; - task->in_use = false; - task->exit_requested = false; - task->worker_state = WTS_UNUSED; - task->error_msg[0] = '\0'; - task->dbid = InvalidOid; - NameStr(task->relschema)[0] = '\0'; - NameStr(task->relname)[0] = '\0'; SpinLockInit(&task->mutex); + + initialize_task(task); } workerData->lock = &locks->lock; @@ -308,7 +304,7 @@ worker_shmem_shutdown(int code, Datum arg) } if (MyWorkerTask) - release_task(MyWorkerTask, true); + release_task(MyWorkerTask); if (am_i_scheduler) /* @@ -473,10 +469,7 @@ squeeze_table_new(PG_FUNCTION_ARGS) * The worker could not even get registered, so it won't set its * status to WTS_UNUSED. Make sure the task does not leak. */ - release_task(task, true); - - /* Do what the task requester is responsible for. */ - release_task(task, false); + release_task(task); ereport(ERROR, (errmsg("squeeze worker could not start")), @@ -496,7 +489,6 @@ squeeze_table_new(PG_FUNCTION_ARGS) */ interrupt_worker(task); - release_task(task, false); PG_RE_THROW(); } PG_END_TRY(); @@ -517,8 +509,6 @@ squeeze_table_new(PG_FUNCTION_ARGS) if (strlen(task->error_msg) > 0) error_msg = pstrdup(task->error_msg); - release_task(task, false); - if (error_msg) ereport(ERROR, (errmsg("%s", error_msg))); @@ -554,35 +544,52 @@ get_unused_task(Oid dbid, Name relschema, Name relname, int *task_idx, LWLockAcquire(workerData->lock, LW_EXCLUSIVE); for (i = 0; i < NUM_WORKER_TASKS; i++) { + WorkerTaskState worker_state; bool needs_check = false; task = &workerData->tasks[i]; SpinLockAcquire(&task->mutex); - if (task->in_use || task->worker_state != WTS_UNUSED) - { - /* - * Consider tasks which might be in progress possible duplicates - * of the task we're going to submit. - */ - needs_check = true; - } - else if (result == NULL) - { - /* Result candidate */ - result = task; - res_idx = i; - } + worker_state = task->worker_state; /* - * String comparisons shouldn't take place under spinlock. (Those - * fields aren't protected by the spinlock anyway.) + * String comparisons shouldn't take place under spinlock, but the + * spinlock is actually not necessary. Once we have released it, the + * squeeze worker can set the state to UNUSED, so we might report a + * duplicate task incorrectly. That's not perfect but should not + * happen too often. (If the task is already UNUSED, no one should + * change it while we are holding the LW lock.) */ SpinLockRelease(&task->mutex); + /* + * Stop looking for an unused task and checking duplicates if a + * duplicate was seen. + */ + if (!*duplicate) + { + if (worker_state != WTS_UNUSED) + { + /* + * Consider tasks which might be in progress for possible + * duplicates of the task we're going to submit. + */ + needs_check = true; + } + else if (result == NULL) + { + /* Result candidate */ + result = task; + res_idx = i; + } + } + if (needs_check) { /* * The strings are only set while workerData->lock is held in * exclusive mode (see below), so we can safely check them here. + * + * Spinlock not needed to access ->dbid because the worker should + * never change it (even when exiting). */ if (task->dbid == dbid && strcmp(NameStr(task->relschema), NameStr(*relschema)) == 0 && @@ -591,29 +598,43 @@ get_unused_task(Oid dbid, Name relschema, Name relname, int *task_idx, result = NULL; res_idx = -1; *duplicate = true; + } + } - goto done; + /* + * If the task became UNUSED recently, it might still contain obsolete + * information, and even own resources. + */ + if (worker_state == WTS_UNUSED && OidIsValid(task->dbid)) + { + if (task->repl_slot.snap_seg) + { + dsm_detach(task->repl_slot.snap_seg); + task->repl_slot.snap_seg = NULL; + task->repl_slot.snap_handle = DSM_HANDLE_INVALID; } + + initialize_task(task); } } - if (result == NULL) + if (result == NULL || *duplicate) goto done; /* - * No one else should be interested in this task, so don't use spinlock - * below. + * Make sure that no other backend / scheduler can use the task. * - * Make sure that no other backend can use the task. + * As long as we hold the LW lock, no one else should be currently trying + * to allocate this task, so no spinlock is needed. */ - result->in_use = true; - /* Set the initial state for the worker. */ result->worker_state = WTS_INIT; - /* Initialize the fields we use to check uniqueness of the task. */ + /* + * While holding the LW lock, initialize the fields we use to check + * uniqueness of the task. + */ result->dbid = dbid; namestrcpy(&result->relschema, NameStr(*relschema)); namestrcpy(&result->relname, NameStr(*relname)); - done: LWLockRelease(workerData->lock); *task_idx = res_idx; @@ -677,7 +698,7 @@ initialize_worker_task(WorkerTask *task, int task_id, Name indname, * The number of scheduler workers per database is limited by the * squeeze_workers_per_database configuration variable. * - * The return value tells whether we could least register the worker. + * The return value tells whether we could at least register the worker. */ static bool start_worker_internal(bool scheduler, int task_idx, @@ -719,8 +740,7 @@ start_worker_internal(bool scheduler, int task_idx, */ return false; - if (*handle == NULL) - return false; + Assert(*handle != NULL); return true; } @@ -861,6 +881,18 @@ squeeze_worker_main(Datum main_arg) task_idx = con.task_idx; } + /* + * Initialize MyWorkerTask as soon as possible so that + * worker_shmem_shutdown() can release it in case of ERROR. + */ + if (task_idx >= 0) + { + Assert(!am_i_scheduler); + Assert(task_idx < NUM_WORKER_TASKS); + + MyWorkerTask = &workerData->tasks[task_idx]; + } + found_scheduler = false; nworkers = 0; /* @@ -886,7 +918,7 @@ squeeze_worker_main(Datum main_arg) */ if (!am_i_scheduler && !workerData->cleanup_done) { - cleanup_after_server_start(task_idx); + cleanup_after_server_start(); workerData->cleanup_done = true; } @@ -972,16 +1004,9 @@ squeeze_worker_main(Datum main_arg) if (am_i_scheduler) scheduler_worker_loop(); else - process_task(task_idx); + process_task(); done: - if (task_idx >= 0) - { - /* Make sure that worker_shmem_shutdown() releases the task. */ - Assert(task_idx < NUM_WORKER_TASKS); - MyWorkerTask = &workerData->tasks[task_idx]; - } - proc_exit(0); } @@ -1231,11 +1256,14 @@ scheduler_worker_loop(void) WorkerTask *task = &workerData->tasks[task_idx]; /* - * worker_shmem_shutdown() will call - * release_task(worker=false) but we need to do it here on - * behalf of the workers which will never start. + * worker_shmem_shutdown() will call release_task() but we + * need to do it here on behalf of the workers which will + * never start. + * + * get_unused_task() will detach the shared segments where + * they exist. */ - release_task(task, true); + release_task(task); } PG_RE_THROW(); @@ -1278,7 +1306,7 @@ scheduler_worker_loop(void) * set its status to WTS_UNUSED. Make sure the task does * not leak. */ - release_task(worker->task, true); + release_task(worker->task); ereport(ERROR, (errmsg("squeeze worker could not start")), @@ -1339,13 +1367,18 @@ cleanup_workers_and_tasks(bool interrupt) /* The reset of sched_cxt will free the memory. */ squeezeWorkers = NULL; - /* Release the corresponding tasks. */ + /* Release the shared memory segments. */ foreach(lc, task_idxs) { int task_idx = lfirst_int(lc); WorkerTask *task = &workerData->tasks[task_idx]; - release_task(task, false); + if (task->repl_slot.snap_seg) + { + dsm_detach(task->repl_slot.snap_seg); + task->repl_slot.snap_seg = NULL; + task->repl_slot.snap_handle = DSM_HANDLE_INVALID; + } } task_idxs = NIL; @@ -1377,11 +1410,13 @@ wait_for_worker_shutdown(SqueezeWorker *worker) } static void -process_task(int task_idx) +process_task(void) { MemoryContext task_cxt; ErrorData *edata; + Assert(MyWorkerTask != NULL); + /* * Memory context for auxiliary per-task allocations. */ @@ -1389,9 +1424,6 @@ process_task(int task_idx) "pg_squeeze task context", ALLOCSET_DEFAULT_SIZES); - Assert(task_idx < NUM_WORKER_TASKS); - MyWorkerTask = &workerData->tasks[task_idx]; - squeeze_max_xlock_time = MyWorkerTask->max_xlock_time; /* Process the assigned task. */ @@ -1550,7 +1582,7 @@ create_replication_slots(int nslots, MemoryContext mcxt) * Unfortunately the API is such that CreateDecodingContext() assumes * need_full_snapshot=false, so the worker won't be able to create the * snapshot for the initial load. Therefore we serialize the snapshot - * here and pass the name to the worker via shared memory. + * here and pass it to the worker via shared memory. */ snapshot = build_historic_snapshot(ctx->snapshot_builder); snap_size = EstimateSnapshotSpace(snapshot); @@ -1647,7 +1679,7 @@ drop_replication_slots(void) * launched the worker. ERROR is supposed to terminate the worker. */ static void -cleanup_after_server_start(int task_idx) +cleanup_after_server_start(void) { PG_TRY(); { @@ -1658,8 +1690,7 @@ cleanup_after_server_start(int task_idx) { ErrorData *edata; - Assert(task_idx >= 0); - MyWorkerTask = &workerData->tasks[task_idx]; + Assert(MyWorkerTask != NULL); /* * The worker should exit pretty soon, it's o.k. to use @@ -1730,7 +1761,6 @@ cleanup_repl_slots(void) int slotno; List *slot_names = NIL; - ereport(ERROR, (errmsg("test"))); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (slotno = 0; slotno < max_replication_slots; slotno++) { @@ -1872,10 +1902,10 @@ process_task_internal(MemoryContext task_cxt) } /* - * Once the backend sets "in_use" and the worker is launched, only the - * worker is expected to change the task, so access it w/o locking. + * Once the task is allocated and the worker is launched, only the worker + * is expected to change the task, so access it w/o locking. */ - Assert(task->in_use && task->worker_state == WTS_INIT); + Assert(task->worker_state == WTS_INIT); task->worker_state = WTS_IN_PROGRESS; relschema = &task->relschema; @@ -2103,48 +2133,62 @@ interrupt_worker(WorkerTask *task) } static void -release_task(WorkerTask *task, bool worker) +initialize_task(WorkerTask *task) +{ + task->worker_state = WTS_UNUSED; + task->exit_requested = false; + task->dbid = InvalidOid; + NameStr(task->relschema)[0] = '\0'; + NameStr(task->relname)[0] = '\0'; + NameStr(task->indname)[0] = '\0'; + NameStr(task->tbspname)[0] = '\0'; + task->max_xlock_time = 0; + task->task_id = -1; + task->last_try = false; + task->skip_analyze = false; + memset(task->ind_tbsps, 0, sizeof(task->ind_tbsps)); + + NameStr(task->repl_slot.name)[0] = '\0'; + task->repl_slot.confirmed_flush = InvalidXLogRecPtr; + task->repl_slot.snap_handle = DSM_HANDLE_INVALID; + task->repl_slot.snap_seg = NULL; + task->repl_slot.snap_private = NULL; + + task->error_msg[0] = '\0'; +} + +/* + * The squeeze worker should call this before exiting. + */ +static void +release_task(WorkerTask *task) { SpinLockAcquire(&task->mutex); - if (worker) + + task->worker_state = WTS_UNUSED; + Assert(task == MyWorkerTask || MyWorkerTask == NULL); + + /* + * The "standalone" worker might have used its private memory for the + * snapshot. + */ + if (task->repl_slot.snap_private) { - /* Called from squeeze worker (or on its behalf). */ - task->worker_state = WTS_UNUSED; - Assert(task == MyWorkerTask || MyWorkerTask == NULL); - task->exit_requested = false; + Assert(am_i_standalone); /* - * The "standalone" worker might have used its private memory for the - * snapshot. + * Do not call pfree() when holding spinlock. The worker should + * only process a single task anyway, so it's not really a leak. */ - if (task->repl_slot.snap_private) - { - Assert(am_i_standalone); - /* - * Do not call pfree() when holding spinlock. The worker should - * only process a single task anyway, so it's not a real leak. - */ - task->repl_slot.snap_private = NULL; - } - /* - * Do not care about detaching from the shared memory: - * setup_decoding() runs in a transaction, so the resource owner of - * that transaction will take care. - */ - - MyWorkerTask = NULL; - } - else - { - /* Called from backend or from the scheduler worker. */ - Assert(task->in_use); - task->in_use = false; - if (task->repl_slot.snap_seg) - { - dsm_detach(task->repl_slot.snap_seg); - task->repl_slot.snap_seg = NULL; - task->repl_slot.snap_handle = DSM_HANDLE_INVALID; - } + task->repl_slot.snap_private = NULL; } + /* + * Do not care about detaching from the shared memory: + * setup_decoding() runs in a transaction, so the resource owner of + * that transaction will take care. + */ + + MyWorkerTask = NULL; + /* Let others to see the WTS_UNUSED state. */ SpinLockRelease(&task->mutex); }