diff --git a/internal/sql/pg/repl.go b/internal/sql/pg/repl.go index 456989a2b..23df4bd2e 100644 --- a/internal/sql/pg/repl.go +++ b/internal/sql/pg/repl.go @@ -501,7 +501,7 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog hasher.Reset() changesetWriter.fail() // discard changeset - // v2 Stream control messages. Not expected for kwil + // v2 Stream control messages. Only expected with large transactions. case *pglogrepl.StreamStartMessageV2: *inStream = true logger.Warnf(" [msg] StreamStartMessageV2: xid %d, first segment? %d", logicalMsg.Xid, logicalMsg.FirstSegment) diff --git a/internal/sql/pg/repl_msgs.go b/internal/sql/pg/repl_msgs.go index 7f0ff7661..d412e8254 100644 --- a/internal/sql/pg/repl_msgs.go +++ b/internal/sql/pg/repl_msgs.go @@ -14,6 +14,7 @@ const ( MessageTypeBeginPrepare pglogrepl.MessageType = 'b' MessageTypeCommitPrepared pglogrepl.MessageType = 'K' MessageTypeRollbackPrepared pglogrepl.MessageType = 'r' + MessageTypeStreamPrepare pglogrepl.MessageType = 'p' ) // msgTypeToString is helpful for debugging, but normally unused. @@ -21,6 +22,8 @@ func msgTypeToString(t pglogrepl.MessageType) string { //nolint:unused switch t { case MessageTypePrepare: return "Prepare" + case MessageTypeStreamPrepare: + return "Stream Prepared" case MessageTypeBeginPrepare: return "Begin Prepared" case MessageTypeCommitPrepared: @@ -39,7 +42,7 @@ func parseV3(data []byte, inStream bool) (m pglogrepl.Message, err error) { var decoder pglogrepl.MessageDecoder // v1 and v3 have same Decode signature (stream not relevant) switch msgType { - case MessageTypePrepare: + case MessageTypePrepare, MessageTypeStreamPrepare: // same encoding decoder = new(PrepareMessageV3) case MessageTypeBeginPrepare: decoder = new(BeginPrepareMessageV3)