diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index d134a96..b94427f 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -416,6 +416,13 @@ public String connectionString() { return connectionString(URL_PATTERN); } + @Override + public Optional nullsSortLast() { + // "The null value is higher than all other values" + // https://www.ibm.com/docs/en/db2/11.5?topic=subselect-order-by-clause + return Optional.of(true); + } + @Override public String quotedTableIdString(TableId tableId) { StringBuilder quoted = new StringBuilder(); diff --git a/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java b/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java index a138e73..77c9fc4 100644 --- a/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java +++ b/src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java @@ -21,6 +21,7 @@ import io.debezium.junit.Flaky; import io.debezium.junit.SkipTestRule; import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.history.SchemaHistory; import io.debezium.util.Testing; @@ -39,15 +40,18 @@ public void before() throws SQLException { TestHelper.disableDbCdc(connection); TestHelper.disableTableCdc(connection, "A"); TestHelper.disableTableCdc(connection, "B"); + TestHelper.disableTableCdc(connection, "A42"); TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL"); connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); connection.execute( "DROP TABLE IF EXISTS a", "DROP TABLE IF EXISTS b", + "DROP TABLE IF EXISTS a42", "DROP TABLE IF EXISTS debezium_signal"); connection.execute( "CREATE TABLE a (pk int not null, aa int, primary key (pk))", "CREATE TABLE b (pk int not null, aa int, primary key (pk))", + "CREATE TABLE a42 (pk1 int, pk2 int, pk3 int, pk4 int, aa int)", "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"); TestHelper.enableDbCdc(connection); @@ -65,11 +69,13 @@ public void after() throws SQLException { TestHelper.disableDbCdc(connection); TestHelper.disableTableCdc(connection, "A"); TestHelper.disableTableCdc(connection, "B"); + TestHelper.disableTableCdc(connection, "A42"); TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL"); connection.rollback(); connection.execute( "DROP TABLE IF EXISTS a", "DROP TABLE IF EXISTS b", + "DROP TABLE IF EXISTS a42", "DROP TABLE IF EXISTS debezium_signal"); connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION"); @@ -91,6 +97,12 @@ protected void populateTables() throws SQLException { TestHelper.enableTableCdc(connection, "B"); } + @Override + protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException { + super.populate4PkTable(connection, tableName); + TestHelper.enableTableCdc((Db2Connection) connection, tableName.replaceAll(".*\\.", "")); + } + @Override protected Class connectorClass() { return Db2Connector.class; @@ -111,6 +123,11 @@ protected List topicNames() { return List.of(topicName(), "testdb.DB2INST1.B"); } + @Override + protected String noPKTopicName() { + return "testdb.DB2INST1.A42"; + } + @Override protected String tableName() { return "DB2INST1.A"; @@ -121,6 +138,11 @@ protected List tableNames() { return List.of(tableName(), "DB2INST1.B"); } + @Override + protected String noPKTableName() { + return "DB2INST1.A42"; + } + @Override protected String signalTableName() { return "DEBEZIUM_SIGNAL"; @@ -130,6 +152,11 @@ protected String getSignalTypeFieldName() { return "TYPE"; } + @Override + protected String returnedIdentifierName(String queriedID) { + return queriedID.toUpperCase(); + } + protected void sendAdHocSnapshotSignal() throws SQLException { connection.execute( String.format( @@ -143,7 +170,8 @@ protected Builder config() { return TestHelper.defaultConfig() .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) .with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL") - .with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250); + .with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250) + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4"); } @Override @@ -159,7 +187,8 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL") .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl); + .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl) + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4"); } @Override