From 74ca49f8db88b4e808a5b08c936f7df45051128b Mon Sep 17 00:00:00 2001 From: James Johnston Date: Sat, 27 Jan 2024 03:56:15 +0000 Subject: [PATCH] DBZ-5071 Correctly handle NULL values in incremental snapshots This makes changes in the Db2 connector for the corresponding commit in the main Debezium project. The main thing we have to do is override the nullsSortLast function in Db2Connection to define how NULLs sort in Db2. We also have to update the test suite, since the base test suite now includes additional tests for working with keys that are nullable. --- .../debezium/connector/db2/Db2Connection.java | 7 ++++ .../connector/db2/IncrementalSnapshotIT.java | 33 +++++++++++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index d134a96..b94427f 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -416,6 +416,13 @@ public String connectionString() { return connectionString(URL_PATTERN); } + @Override + public Optional nullsSortLast() { + // "The null value is higher than all other values" + // https://www.ibm.com/docs/en/db2/11.5?topic=subselect-order-by-clause + return Optional.of(true); + } + @Override public String quotedTableIdString(TableId tableId) { StringBuilder quoted = new StringBuilder(); diff --git a/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java b/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java index a138e73..77c9fc4 100644 --- a/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java +++ b/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java @@ -21,6 +21,7 @@ import io.debezium.junit.Flaky; import io.debezium.junit.SkipTestRule; import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.history.SchemaHistory; import io.debezium.util.Testing; @@ -39,15 +40,18 @@ public void before() throws SQLException { TestHelper.disableDbCdc(connection); TestHelper.disableTableCdc(connection, "A"); TestHelper.disableTableCdc(connection, "B"); + TestHelper.disableTableCdc(connection, "A42"); TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL"); connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); connection.execute( "DROP TABLE IF EXISTS a", "DROP TABLE IF EXISTS b", + "DROP TABLE IF EXISTS a42", "DROP TABLE IF EXISTS debezium_signal"); connection.execute( "CREATE TABLE a (pk int not null, aa int, primary key (pk))", "CREATE TABLE b (pk int not null, aa int, primary key (pk))", + "CREATE TABLE a42 (pk1 int, pk2 int, pk3 int, pk4 int, aa int)", "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"); TestHelper.enableDbCdc(connection); @@ -65,11 +69,13 @@ public void after() throws SQLException { TestHelper.disableDbCdc(connection); TestHelper.disableTableCdc(connection, "A"); TestHelper.disableTableCdc(connection, "B"); + TestHelper.disableTableCdc(connection, "A42"); TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL"); connection.rollback(); connection.execute( "DROP TABLE IF EXISTS a", "DROP TABLE IF EXISTS b", + "DROP TABLE IF EXISTS a42", "DROP TABLE IF EXISTS debezium_signal"); connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION"); @@ -91,6 +97,12 @@ protected void populateTables() throws SQLException { TestHelper.enableTableCdc(connection, "B"); } + @Override + protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException { + super.populate4PkTable(connection, tableName); + TestHelper.enableTableCdc((Db2Connection) connection, tableName.replaceAll(".*\\.", "")); + } + @Override protected Class connectorClass() { return Db2Connector.class; @@ -111,6 +123,11 @@ protected List topicNames() { return List.of(topicName(), "testdb.DB2INST1.B"); } + @Override + protected String noPKTopicName() { + return "testdb.DB2INST1.A42"; + } + @Override protected String tableName() { return "DB2INST1.A"; @@ -121,6 +138,11 @@ protected List tableNames() { return List.of(tableName(), "DB2INST1.B"); } + @Override + protected String noPKTableName() { + return "DB2INST1.A42"; + } + @Override protected String signalTableName() { return "DEBEZIUM_SIGNAL"; @@ -130,6 +152,11 @@ protected String getSignalTypeFieldName() { return "TYPE"; } + @Override + protected String returnedIdentifierName(String queriedID) { + return queriedID.toUpperCase(); + } + protected void sendAdHocSnapshotSignal() throws SQLException { connection.execute( String.format( @@ -143,7 +170,8 @@ protected Builder config() { return TestHelper.defaultConfig() .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) .with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL") - .with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250); + .with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250) + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4"); } @Override @@ -159,7 +187,8 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL") .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl); + .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl) + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4"); } @Override