Skip to content

Commit

Permalink
Fixed processing of the squeeze.max_xlock_time parameter.
Browse files Browse the repository at this point in the history
1. So far, the value of the parameter was not passed to the worker at all.

2. The worker coding was such that the parameter was only checked during
logical decoding, but not when applying the data changes.
  • Loading branch information
Antonin Houska committed Jun 13, 2024
1 parent 55223a2 commit a3a07e1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
41 changes: 36 additions & 5 deletions concurrent.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ extern PGDLLIMPORT int wal_segment_size;

static void apply_concurrent_changes(DecodingOutputState *dstate,
Relation relation, ScanKey key,
int nkeys, IndexInsertState *iistate);
int nkeys, IndexInsertState *iistate,
struct timeval *must_complete);
static bool processing_time_elapsed(struct timeval *utmost);

static void plugin_startup(LogicalDecodingContext *ctx,
Expand Down Expand Up @@ -64,6 +65,16 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
bool done;

dstate = (DecodingOutputState *) ctx->output_writer_private;

/*
* If some changes could not be applied due to time constraint, make sure
* the tuplestore is empty before we insert new tuples into it.
*/
if (dstate->nchanges > 0)
apply_concurrent_changes(dstate, rel_dst, ident_key,
ident_key_nentries, iistate, NULL);
Assert(dstate->nchanges == 0);

done = false;
while (!done)
{
Expand All @@ -87,7 +98,11 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
* non-trivial.
*/
apply_concurrent_changes(dstate, rel_dst, ident_key,
ident_key_nentries, iistate);
ident_key_nentries, iistate, must_complete);

if (processing_time_elapsed(must_complete))
/* Like above. */
return false;
}

return true;
Expand Down Expand Up @@ -204,7 +219,8 @@ decode_concurrent_changes(LogicalDecodingContext *ctx,
*/
static void
apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
ScanKey key, int nkeys, IndexInsertState *iistate)
ScanKey key, int nkeys, IndexInsertState *iistate,
struct timeval *must_complete)
{
TupleTableSlot *slot;
TupleTableSlot *ind_slot;
Expand Down Expand Up @@ -245,6 +261,9 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
bool isnull[1];
Datum values[1];

Assert(dstate->nchanges > 0);
dstate->nchanges--;

/* Get the change from the single-column tuple. */
tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
Expand Down Expand Up @@ -465,10 +484,22 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
Assert(shouldFree);
pfree(tup_change);

/*
* If there is a limit on the time of completion, check it
* now. However, make sure the loop does not break if tup_old was set
* in the previous iteration. In such a case we could not resume the
* processing in the next call.
*/
if (must_complete && tup_old == NULL &&
processing_time_elapsed(must_complete))
/* The next call will process the remaining changes. */
break;
}

tuplestore_clear(dstate->tstore);
dstate->nchanges = 0;
/* If we could not apply all the changes, the next call will do. */
if (dstate->nchanges == 0)
tuplestore_clear(dstate->tstore);

PopActiveSnapshot();

Expand Down
4 changes: 4 additions & 0 deletions pg_squeeze.c
Original file line number Diff line number Diff line change
Expand Up @@ -3000,6 +3000,10 @@ perform_final_merge(Oid relid_src, Oid *indexes_src, int nindexes,
cat_state, rel_dst, ident_key,
ident_key_nentries, iistate,
AccessExclusiveLock, NULL);

/* No time constraint, all changes must have been processed. */
Assert(((DecodingOutputState *)
ctx->output_writer_private)->nchanges == 0);
}

return success;
Expand Down
3 changes: 3 additions & 0 deletions pg_squeeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "utils/resowner.h"
#include "utils/snapmgr.h"

extern int squeeze_max_xlock_time;

typedef enum
{
PG_SQUEEZE_CHANGE_INSERT,
Expand Down Expand Up @@ -371,6 +373,7 @@ typedef struct WorkerTask

NameData indname; /* clustering index */
NameData tbspname; /* destination tablespace */
int max_xlock_time;

/*
* Fields of the squeeze.tasks table.
Expand Down
15 changes: 11 additions & 4 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ static WorkerTask *get_unused_task(Oid dbid, Name relschema, Name relname,
int *task_idx, bool *duplicate);
static void initialize_worker_task(WorkerTask *task, int task_id, Name indname,
Name tbspname, ArrayType *ind_tbsps,
bool last_try, bool skip_analyze);
bool last_try, bool skip_analyze,
int max_xlock_time);
static bool start_worker_internal(bool scheduler, int task_idx,
BackgroundWorkerHandle **handle);

Expand Down Expand Up @@ -440,7 +441,7 @@ squeeze_table_new(PG_FUNCTION_ARGS)

/* Fill-in the remaining task information. */
initialize_worker_task(task, -1, indname, tbspname, ind_tbsps, false,
true);
true, squeeze_max_xlock_time);
/*
* Unlike scheduler_worker_loop() we cannot build the snapshot here, the
* worker will do. (It will also create the replication slot.) This is
Expand Down Expand Up @@ -608,7 +609,7 @@ get_unused_task(Oid dbid, Name relschema, Name relname, int *task_idx,
static void
initialize_worker_task(WorkerTask *task, int task_id, Name indname,
Name tbspname, ArrayType *ind_tbsps, bool last_try,
bool skip_analyze)
bool skip_analyze, int max_xlock_time)
{
StringInfoData buf;

Expand Down Expand Up @@ -649,6 +650,7 @@ initialize_worker_task(WorkerTask *task, int task_id, Name indname,
task->error_msg[0] = '\0';
task->last_try = last_try;
task->skip_analyze = skip_analyze;
task->max_xlock_time = max_xlock_time;
}

/*
Expand Down Expand Up @@ -1146,7 +1148,10 @@ scheduler_worker_loop(void)

/* Fill the task. */
initialize_worker_task(task, task_id, cl_index, rel_tbsp,
ind_tbsps, last_try, skip_analyze);
ind_tbsps, last_try, skip_analyze,
/* XXX Should max_xlock_time be added to
* squeeze.tables ? */
0);

/* The list must survive SPI_finish(). */
old_cxt = MemoryContextSwitchTo(sched_cxt);
Expand Down Expand Up @@ -1353,6 +1358,8 @@ process_task(int task_id)
Assert(task_id < NUM_WORKER_TASKS);
MyWorkerTask = &workerData->tasks[task_id];

squeeze_max_xlock_time = MyWorkerTask->max_xlock_time;

/* Process the assigned task. */
PG_TRY();
{
Expand Down

0 comments on commit a3a07e1

Please sign in to comment.