Skip to content

Commit

Permalink
Simplify the cleanup after crash.
Browse files Browse the repository at this point in the history
No special code path is needed for the scheduler worker. Furthermore, we do
not need a special task and a worker to process that.
  • Loading branch information
Antonin Houska committed Sep 19, 2024
1 parent 6473543 commit 3ec74c0
Showing 1 changed file with 8 additions and 98 deletions.
106 changes: 8 additions & 98 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ static void drop_replication_slots(void);
static void cleanup_after_server_start(void);
static void cleanup_repl_origins(void);
static void cleanup_repl_slots(void);
static void start_cleanup_worker(MemoryContext task_cxt);
static Snapshot build_historic_snapshot(SnapBuild *builder);
static void process_task_internal(MemoryContext task_cxt);

Expand Down Expand Up @@ -941,20 +940,18 @@ squeeze_worker_main(Datum main_arg)
* replication slots and/or origins that other workers could not remove
* due to server crash. Do that while holding the exclusive lock - that
* also ensures that the other workers wait for the cleanup to finish
* instead of complaining about the existing slots / origins.
* before they create new slots / origins, which we might then drop
* accidentally.
*
* If no "standalone" squeeze worker performed the cleanup yet, the
* scheduler must do it now because it'll also create replication slots /
* origins. Those could be dropped by one of the new workers if that
* worker was to perform the cleanup.
*/
if (!am_i_scheduler && !workerData->cleanup_done)
if (!workerData->cleanup_done)
{
cleanup_after_server_start();
workerData->cleanup_done = true;

/* Are we assigned a "cleanup-only" task? */
if (!OidIsValid(MyWorkerTask->dbid))
{
LWLockRelease(workerData->lock);
ereport(DEBUG1, (errmsg("cleanup-only task completed")));
goto done;
}
}

for (i = 0; i < workerData->nslots; i++)
Expand Down Expand Up @@ -1073,27 +1070,12 @@ scheduler_worker_loop(void)
long delay = 0L;
int i;
MemoryContext sched_cxt, old_cxt;
bool cleanup_done;

/* Context for allocations which cannot be freed too early. */
sched_cxt = AllocSetContextCreate(TopMemoryContext,
"pg_squeeze scheduler context",
ALLOCSET_DEFAULT_SIZES);

/*
* This lock does not eliminate all the possible race conditions: e.g. if
* multiple schedulers (one per database) are launched at the same time,
* multiple clean-up workers can be launched. Nevertheless, it makes sense
* as the worker also uses this lock to examine and set the field.
*/
LWLockAcquire(workerData->lock, LW_EXCLUSIVE);
cleanup_done = workerData->cleanup_done;
LWLockRelease(workerData->lock);

/* Do we need to do cleanup first? */
if (!cleanup_done)
start_cleanup_worker(sched_cxt);

while (!got_sigterm)
{
StringInfoData query;
Expand Down Expand Up @@ -1863,78 +1845,6 @@ cleanup_repl_slots(void)
}
}

static void
start_cleanup_worker(MemoryContext task_cxt)
{
WorkerTask *task;
bool task_exists;
int task_idx;
NameData dummy_name;
SqueezeWorker *worker;
MemoryContext old_cxt;
bool registered;

/*
* Create a worker to perform the initial cleanup.
*
* We must be sure that the cleanup has finished before we start to create
* replication slots for other workers, otherwise the "cleanup worker"
* could drop them too.
*/
squeezeWorkerCount = 1;
squeezeWorkers = (SqueezeWorker *) MemoryContextAllocZero(task_cxt,
squeezeWorkerCount *
sizeof(SqueezeWorker));
task = get_unused_task(InvalidOid, NULL, NULL, &task_idx, &task_exists);
Assert(!task_exists);
if (task == NULL)
/*
* This is unlikely to happen, but possible if too many "standalone"
* workers have been started after our check of the 'cleanup_done'
* flag.
*/
ereport(ERROR,
(errmsg("the task queue is currently full")));

/*
* No specific information needed here. Setting dummy values explicitly
* seem a good practice though.
*/
NameStr(dummy_name)[0] = '\0';
initialize_worker_task(task, -1, &dummy_name, &dummy_name, NULL,
false, false, 0);

worker = squeezeWorkers;
StartTransactionCommand();
/*
* The handle (and possibly other allocations) must survive the current
* transaction.
*/
old_cxt = MemoryContextSwitchTo(task_cxt);
registered = start_worker_internal(false, task_idx, &worker->handle);
MemoryContextSwitchTo(old_cxt);
if (!registered)
{
/*
* The worker could not even get registered, so it won't set its
* status to WTS_UNUSED. Make sure the task does not leak.
*/
release_task(worker->task);

ereport(ERROR,
(errmsg("squeeze worker could not start")),
(errhint("consider increasing \"max_worker_processes\" or decreasing \"squeeze.workers_per_database\"")));

}
CommitTransactionCommand();

/* Wait until the cleanup is done. */
cleanup_workers_and_tasks(false);

if (!workerData->cleanup_done)
ereport(ERROR, (errmsg("failed to perform the initial cleanup")));
}

/*
* Wrapper for SnapBuildInitialSnapshot().
*
Expand Down

0 comments on commit 3ec74c0

Please sign in to comment.