diff --git a/pg_squeeze.h b/pg_squeeze.h index e9b0be5..71b9b1e 100644 --- a/pg_squeeze.h +++ b/pg_squeeze.h @@ -329,12 +329,14 @@ typedef struct ReplSlotStatus /* A copy of the same field of ReplicationSlotPersistentData. */ XLogRecPtr confirmed_flush; - /* Shared memory to pass the initial snapshot to the worker. */ + /* + * Shared memory to pass the initial snapshot to the worker. Only needed + * by the scheduler. + */ dsm_handle snap_handle; - /* Only needed by the scheduler. */ dsm_segment *snap_seg; - /* The snapshot in the backend private memory. */ + /* The snapshot in the squeeze worker private memory. */ char *snap_private; } ReplSlotStatus; diff --git a/worker.c b/worker.c index 103b322..8fe5415 100644 --- a/worker.c +++ b/worker.c @@ -278,8 +278,6 @@ squeeze_worker_shmem_startup(void) LWLockRelease(AddinShmemInitLock); } -static List *task_idxs = NIL; - /* Mark this worker's slot unused. */ static void worker_shmem_shutdown(int code, Datum arg) @@ -603,17 +601,19 @@ get_unused_task(Oid dbid, Name relschema, Name relname, int *task_idx, /* * If the task became UNUSED recently, it might still contain obsolete - * information, and even own resources. + * information because the worker only sets the status when exiting. + * (This clean-up shouldn't be necessary because the caller will + * initialize it when we return it next time, but it seems a good + * practice.) */ if (worker_state == WTS_UNUSED && OidIsValid(task->dbid)) { - if (task->repl_slot.snap_seg) - { - dsm_detach(task->repl_slot.snap_seg); - task->repl_slot.snap_seg = NULL; - task->repl_slot.snap_handle = DSM_HANDLE_INVALID; - } - + /* + * Note that the scheduler worker should have detached from the + * DSM segment pointed to by task->repl_slot.seg, by calling + * drop_replication_slots(). (The "standalone" worker should not + * have set it.) + */ initialize_task(task); } } @@ -1038,6 +1038,7 @@ scheduler_worker_loop(void) long delay = 0L; int i; MemoryContext sched_cxt, old_cxt; + List *task_idxs = NIL; /* Context for allocations which cannot be freed too early. */ sched_cxt = AllocSetContextCreate(TopMemoryContext, @@ -1336,7 +1337,6 @@ cleanup_workers_and_tasks(bool interrupt) { SqueezeWorker *worker; int i; - ListCell *lc; if (interrupt) { @@ -1367,21 +1367,6 @@ cleanup_workers_and_tasks(bool interrupt) /* The reset of sched_cxt will free the memory. */ squeezeWorkers = NULL; - /* Release the shared memory segments. */ - foreach(lc, task_idxs) - { - int task_idx = lfirst_int(lc); - WorkerTask *task = &workerData->tasks[task_idx]; - - if (task->repl_slot.snap_seg) - { - dsm_detach(task->repl_slot.snap_seg); - task->repl_slot.snap_seg = NULL; - task->repl_slot.snap_handle = DSM_HANDLE_INVALID; - } - } - task_idxs = NIL; - /* Drop the replication slots. */ if (squeezeWorkerSlotCount > 0) drop_replication_slots(); @@ -1661,6 +1646,14 @@ drop_replication_slots(void) /* nowait=false, i.e. wait */ ReplicationSlotDrop(NameStr(slot->name), false); } + + /* Detach from the shared memory segment. */ + if (slot->snap_seg) + { + dsm_detach(slot->snap_seg); + slot->snap_seg = NULL; + slot->snap_handle = DSM_HANDLE_INVALID; + } } squeezeWorkerSlotCount = 0;