Skip to content

Commit

Permalink
Only drop_replication_slots() should release the snapshot memory.
Browse files Browse the repository at this point in the history
  • Loading branch information
Antonin Houska committed Jun 21, 2024
1 parent e0359e7 commit 9072d9f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 29 deletions.
8 changes: 5 additions & 3 deletions pg_squeeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,14 @@ typedef struct ReplSlotStatus
/* A copy of the same field of ReplicationSlotPersistentData. */
XLogRecPtr confirmed_flush;

/* Shared memory to pass the initial snapshot to the worker. */
/*
* Shared memory to pass the initial snapshot to the worker. Only needed
* by the scheduler.
*/
dsm_handle snap_handle;
/* Only needed by the scheduler. */
dsm_segment *snap_seg;

/* The snapshot in the backend private memory. */
/* The snapshot in the squeeze worker private memory. */
char *snap_private;
} ReplSlotStatus;

Expand Down
45 changes: 19 additions & 26 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ squeeze_worker_shmem_startup(void)
LWLockRelease(AddinShmemInitLock);
}

static List *task_idxs = NIL;

/* Mark this worker's slot unused. */
static void
worker_shmem_shutdown(int code, Datum arg)
Expand Down Expand Up @@ -603,17 +601,19 @@ get_unused_task(Oid dbid, Name relschema, Name relname, int *task_idx,

/*
* If the task became UNUSED recently, it might still contain obsolete
* information, and even own resources.
* information because the worker only sets the status when exiting.
* (This clean-up shouldn't be necessary because the caller will
* initialize it when we return it next time, but it seems a good
* practice.)
*/
if (worker_state == WTS_UNUSED && OidIsValid(task->dbid))
{
if (task->repl_slot.snap_seg)
{
dsm_detach(task->repl_slot.snap_seg);
task->repl_slot.snap_seg = NULL;
task->repl_slot.snap_handle = DSM_HANDLE_INVALID;
}

/*
* Note that the scheduler worker should have detached from the
* DSM segment pointed to by task->repl_slot.seg, by calling
* drop_replication_slots(). (The "standalone" worker should not
* have set it.)
*/
initialize_task(task);
}
}
Expand Down Expand Up @@ -1038,6 +1038,7 @@ scheduler_worker_loop(void)
long delay = 0L;
int i;
MemoryContext sched_cxt, old_cxt;
List *task_idxs = NIL;

/* Context for allocations which cannot be freed too early. */
sched_cxt = AllocSetContextCreate(TopMemoryContext,
Expand Down Expand Up @@ -1336,7 +1337,6 @@ cleanup_workers_and_tasks(bool interrupt)
{
SqueezeWorker *worker;
int i;
ListCell *lc;

if (interrupt)
{
Expand Down Expand Up @@ -1367,21 +1367,6 @@ cleanup_workers_and_tasks(bool interrupt)
/* The reset of sched_cxt will free the memory. */
squeezeWorkers = NULL;

/* Release the shared memory segments. */
foreach(lc, task_idxs)
{
int task_idx = lfirst_int(lc);
WorkerTask *task = &workerData->tasks[task_idx];

if (task->repl_slot.snap_seg)
{
dsm_detach(task->repl_slot.snap_seg);
task->repl_slot.snap_seg = NULL;
task->repl_slot.snap_handle = DSM_HANDLE_INVALID;
}
}
task_idxs = NIL;

/* Drop the replication slots. */
if (squeezeWorkerSlotCount > 0)
drop_replication_slots();
Expand Down Expand Up @@ -1661,6 +1646,14 @@ drop_replication_slots(void)
/* nowait=false, i.e. wait */
ReplicationSlotDrop(NameStr(slot->name), false);
}

/* Detach from the shared memory segment. */
if (slot->snap_seg)
{
dsm_detach(slot->snap_seg);
slot->snap_seg = NULL;
slot->snap_handle = DSM_HANDLE_INVALID;
}
}

squeezeWorkerSlotCount = 0;
Expand Down

0 comments on commit 9072d9f

Please sign in to comment.