Skip to content

Commit

Permalink
Double-check replication slot's effective_xmin before use.
Browse files Browse the repository at this point in the history
Paranoia.

Also move related code in create_replication_slots() to more suitable place.
  • Loading branch information
Antonin Houska committed Jun 24, 2024
1 parent 8791c89 commit c1a7abd
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
9 changes: 9 additions & 0 deletions pg_squeeze.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,15 @@ setup_decoding(Oid relid, TupleDesc tup_desc, Snapshot *snap_hist)
*/
ReplicationSlotAcquire(NameStr(repl_slot->name), true);

/*
* This should not really happen, but if it did, the initial load could
* miss some data.
*/
if (!TransactionIdIsValid(MyReplicationSlot->effective_xmin))
ereport(ERROR,
(errmsg("replication slot \"%s\" has invalid effective_xmin",
NameStr(repl_slot->name))));

/*
* It's pretty unlikely for some client to have consumed data changes
* (accidentally?) before this worker could acquire the slot, but it's
Expand Down
15 changes: 8 additions & 7 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1598,13 +1598,6 @@ create_replication_slots(int nslots, MemoryContext mcxt)
*/
Assert(!ctx->fast_forward);

SpinLockAcquire(&slot->mutex);
Assert(TransactionIdIsValid(slot->effective_xmin) &&
!TransactionIdIsValid(slot->data.xmin));
/* Prevent ReplicationSlotRelease() from clearing effective_xmin. */
slot->data.xmin = slot->effective_xmin;
SpinLockRelease(&slot->mutex);

/*
* Bring the snapshot builder into the SNAPBUILD_CONSISTENT state so
* that the worker can get its snapshot and start decoding
Expand Down Expand Up @@ -1654,6 +1647,14 @@ create_replication_slots(int nslots, MemoryContext mcxt)
* Done for now, the worker will have to setup the context on its own.
*/
FreeDecodingContext(ctx);

/* Prevent ReplicationSlotRelease() from clearing effective_xmin. */
SpinLockAcquire(&slot->mutex);
Assert(TransactionIdIsValid(slot->effective_xmin) &&
!TransactionIdIsValid(slot->data.xmin));
slot->data.xmin = slot->effective_xmin;
SpinLockRelease(&slot->mutex);

ReplicationSlotRelease();
}

Expand Down

0 comments on commit c1a7abd

Please sign in to comment.