From 6e1f4f4edf4e5d61a465ee841b380d1e720d17c0 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Thu, 13 Jun 2024 14:57:15 +0200 Subject: [PATCH] Fixed processing of the squeeze.max_xlock_time parameter. 1. So far, the value of the parameter was not passed to the worker at all. 2. The worker coding was such that the parameter was only checked during logical decoding, but not when applying the data changes. --- concurrent.c | 41 ++++++++++++++++++++++++++++++++++++----- pg_squeeze.c | 4 ++++ pg_squeeze.h | 2 ++ worker.c | 19 +++++++++++++++---- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/concurrent.c b/concurrent.c index 60b1719..b833832 100644 --- a/concurrent.c +++ b/concurrent.c @@ -25,7 +25,8 @@ extern PGDLLIMPORT int wal_segment_size; static void apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, ScanKey key, - int nkeys, IndexInsertState *iistate); + int nkeys, IndexInsertState *iistate, + struct timeval *must_complete); static bool processing_time_elapsed(struct timeval *utmost); static void plugin_startup(LogicalDecodingContext *ctx, @@ -64,6 +65,16 @@ process_concurrent_changes(LogicalDecodingContext *ctx, bool done; dstate = (DecodingOutputState *) ctx->output_writer_private; + + /* + * If some changes could not be applied due to time constraint, make sure + * the tuplestore is empty before we insert new tuples into it. + */ + if (dstate->nchanges > 0) + apply_concurrent_changes(dstate, rel_dst, ident_key, + ident_key_nentries, iistate, NULL); + Assert(dstate->nchanges == 0); + done = false; while (!done) { @@ -87,7 +98,11 @@ process_concurrent_changes(LogicalDecodingContext *ctx, * non-trivial. */ apply_concurrent_changes(dstate, rel_dst, ident_key, - ident_key_nentries, iistate); + ident_key_nentries, iistate, must_complete); + + if (processing_time_elapsed(must_complete)) + /* Like above. */ + return false; } return true; @@ -189,7 +204,8 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, */ static void apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, - ScanKey key, int nkeys, IndexInsertState *iistate) + ScanKey key, int nkeys, IndexInsertState *iistate, + struct timeval *must_complete) { TupleTableSlot *slot; #if PG_VERSION_NUM >= 120000 @@ -240,6 +256,9 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, bool isnull[1]; Datum values[1]; + Assert(dstate->nchanges > 0); + dstate->nchanges--; + /* Get the change from the single-column tuple. */ #if PG_VERSION_NUM >= 120000 tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree); @@ -483,10 +502,22 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, Assert(shouldFree); pfree(tup_change); #endif + + /* + * If there is a limit on the time of completion, check it + * now. However, make sure the loop does not break if tup_old was set + * in the previous iteration. In such a case we could not resume the + * processing in the next call. + */ + if (must_complete && tup_old == NULL && + processing_time_elapsed(must_complete)) + /* The next call will process the remaining changes. */ + break; } - tuplestore_clear(dstate->tstore); - dstate->nchanges = 0; + /* If we could not apply all the changes, the next call will do. */ + if (dstate->nchanges == 0) + tuplestore_clear(dstate->tstore); PopActiveSnapshot(); diff --git a/pg_squeeze.c b/pg_squeeze.c index 9f32089..529c3e6 100644 --- a/pg_squeeze.c +++ b/pg_squeeze.c @@ -3193,6 +3193,10 @@ perform_final_merge(Oid relid_src, Oid *indexes_src, int nindexes, cat_state, rel_dst, ident_key, ident_key_nentries, iistate, AccessExclusiveLock, NULL); + + /* No time constraint, all changes must have been processed. */ + Assert(((DecodingOutputState *) + ctx->output_writer_private)->nchanges == 0); } return success; diff --git a/pg_squeeze.h b/pg_squeeze.h index 7caf1fb..aa59905 100644 --- a/pg_squeeze.h +++ b/pg_squeeze.h @@ -33,6 +33,8 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" +extern int squeeze_max_xlock_time; + typedef enum { PG_SQUEEZE_CHANGE_INSERT, diff --git a/worker.c b/worker.c index dea2622..3f949c5 100644 --- a/worker.c +++ b/worker.c @@ -92,6 +92,7 @@ typedef struct WorkerTask NameData relname; NameData indname; /* clustering index */ NameData tbspname; /* destination tablespace */ + int max_xlock_time; /* * index destination tablespaces. @@ -166,12 +167,14 @@ typedef struct TaskDetails ArrayType *ind_tbsps; bool last_try; bool skip_analyze; + int max_xlock_time; } TaskDetails; static void init_task_details(TaskDetails *task, int32 task_id, Name relschema, Name relname, Name cl_index, Name rel_tbsp, ArrayType *ind_tbsps, - bool last_try, bool skip_analyze); + bool last_try, bool skip_analyze, + int max_xlock_time); static void squeeze_handle_error_app(ErrorData *edata, TaskDetails *td); static void release_task(void); @@ -502,6 +505,7 @@ squeeze_table_new(PG_FUNCTION_ARGS) memcpy(task->ind_tbsps, ind_tbsps, VARSIZE(ind_tbsps)); else SET_VARSIZE(task->ind_tbsps, 0); + task->max_xlock_time = squeeze_max_xlock_time; task_id = task->id; LWLockRelease(task->lock); @@ -1046,7 +1050,7 @@ process_tasks(MemoryContext task_cxt) } init_task_details(tasks, 0, relschema, relname, cl_index, rel_tbsp, - ind_tbsps, false, false); + ind_tbsps, false, false, task->max_xlock_time); MemoryContextSwitchTo(oldcxt); /* No other worker should pick this task. */ @@ -1176,7 +1180,10 @@ LIMIT %d", TASK_BATCH_SIZE); init_task_details(&tasks[i], task_id, relschema, relname, cl_index, rel_tbsp, ind_tbsps, last_try, - skip_analyze); + skip_analyze, + /* XXX Should max_xlock_time be added to + * squeeze.tables ? */ + 0); } MemoryContextSwitchTo(oldcxt); @@ -1293,6 +1300,8 @@ LIMIT %d", TASK_BATCH_SIZE); if (strlen(NameStr(td->rel_tbsp)) > 0) rel_tbsp = &td->rel_tbsp; + squeeze_max_xlock_time = td->max_xlock_time; + /* Perform the actual work. */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); @@ -1529,7 +1538,8 @@ run_command(char *command, int rc) static void init_task_details(TaskDetails *task, int32 task_id, Name relschema, Name relname, Name cl_index, Name rel_tbsp, - ArrayType *ind_tbsps, bool last_try, bool skip_analyze) + ArrayType *ind_tbsps, bool last_try, bool skip_analyze, + int max_xlock_time) { memset(task, 0, sizeof(TaskDetails)); task->id = task_id; @@ -1542,6 +1552,7 @@ init_task_details(TaskDetails *task, int32 task_id, Name relschema, task->ind_tbsps = ind_tbsps; task->last_try = last_try; task->skip_analyze = skip_analyze; + task->max_xlock_time = max_xlock_time; } #define ACTIVE_WORKERS_RES_ATTRS 7