Skip to content

Commit

Permalink
DBZ-7305 Add tests for Always, When Needed and Custom snapshot mode f…
Browse files Browse the repository at this point in the history
…or DB2 connector
  • Loading branch information
mfvitale committed Mar 5, 2024
1 parent ca1e526 commit 645033e
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 1 deletion.
81 changes: 81 additions & 0 deletions src/test/java/io/debezium/connector/db2/CustomTestSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.connector.db2.snapshot.query.SelectAllSnapshotQuery;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;

/**
* This is a small class used in PostgresConnectorIT to test a custom snapshot
*
* It is tightly coupled to the test there, but needs to be placed here in order
* to allow for class loading to work
*/
public class CustomTestSnapshot extends SelectAllSnapshotQuery implements Snapshotter, BeanRegistryAware {

private boolean hasState;

@Override
public String name() {
return CustomTestSnapshot.class.getName();
}

@Override
public void injectBeanRegistry(BeanRegistry beanRegistry) {

Offsets<Db2Partition, Db2OffsetContext> db2OffsetContextOffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
for (Db2OffsetContext offset : db2OffsetContextOffsets.getOffsets().values()) {
hasState = offset != null;
}
}

@Override
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
return true;
}

@Override
public boolean shouldStream() {
return true;
}

@Override
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}

@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}

@Override
public boolean shouldSnapshotOnDataError() {
return false;
}

@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {

if (!hasState && tableId.contains("TABLEB")) {
return Optional.empty();
}
else {
String query = snapshotSelectColumns.stream()
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId));

return Optional.of(query);
}
}
}
186 changes: 186 additions & 0 deletions src/test/java/io/debezium/connector/db2/Db2ConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.debezium.connector.db2.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.db2.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.db2.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.junit.Assert.assertNull;
Expand All @@ -17,6 +18,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -27,6 +29,7 @@
import org.junit.Rule;
import org.junit.Test;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.db2.Db2ConnectorConfig.SnapshotMode;
import io.debezium.connector.db2.util.TestHelper;
Expand All @@ -41,9 +44,12 @@
import io.debezium.junit.Flaky;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Testing;

import junit.framework.TestCase;

/**
* Integration test for the Debezium DB2 connector.
*
Expand Down Expand Up @@ -904,6 +910,186 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
}
}

@Test
public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception {

try {
Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "DB2INST1.ALWAYS_SNAPSHOT")
.with(Db2ConnectorConfig.SNAPSHOT_MODE_TABLES, "DB2INST1.ALWAYS_SNAPSHOT")
.with(Db2ConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(Db2ConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.build();

connection.execute("CREATE TABLE always_snapshot ("
+ " id INT PRIMARY KEY NOT NULL,"
+ " data VARCHAR(50) NOT NULL);");
connection.execute("INSERT INTO always_snapshot VALUES (1,'Test1');");
connection.execute("INSERT INTO always_snapshot VALUES (2,'Test2');");

TestHelper.enableTableCdc(connection, "ALWAYS_SNAPSHOT");

start(Db2Connector.class, config);

TestHelper.waitForCDC();

int expectedRecordCount = 2;
SourceRecords sourceRecords = consumeRecordsByTopic(expectedRecordCount);
assertThat(sourceRecords.recordsForTopic("testdb.DB2INST1.ALWAYS_SNAPSHOT")).hasSize(expectedRecordCount);
Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER);
TestCase.assertEquals(1, struct.get("id"));
TestCase.assertEquals("Test1", struct.get("data"));
struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(AFTER);
TestCase.assertEquals(2, struct.get("id"));
TestCase.assertEquals("Test2", struct.get("data"));

stopConnector();

connection.execute("DELETE FROM ALWAYS_SNAPSHOT WHERE id=1;");
connection.execute("INSERT INTO ALWAYS_SNAPSHOT VALUES (3,'Test3');");

start(Db2Connector.class, config);
TestHelper.waitForCDC();
sourceRecords = consumeRecordsByTopic(expectedRecordCount);

// Check we get up-to-date data in the snapshot.
assertThat(sourceRecords.recordsForTopic("testdb.DB2INST1.ALWAYS_SNAPSHOT")).hasSize(expectedRecordCount);
struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER);
TestCase.assertEquals(2, struct.get("id"));
TestCase.assertEquals("Test2", struct.get("data"));
struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(AFTER);
TestCase.assertEquals(3, struct.get("id"));
TestCase.assertEquals("Test3", struct.get("data"));
}
catch (Exception e) {
e.printStackTrace();
}
finally {
connection.execute("DROP TABLE ALWAYS_SNAPSHOT");
}
}

@Test
public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {

Configuration.Builder builder = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "DB2INST1.TABLEA")
.with(Db2ConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName());

Configuration config = builder.build();
// Start the connector ...
start(Db2Connector.class, config);

TestHelper.waitForSnapshotToBeCompleted();
// Poll for records ...
// Testing.Print.enable();
int recordCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(recordCount);
assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
stopConnector();

builder.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.RECOVERY);
config = builder.build();
start(Db2Connector.class, config);

TestHelper.waitForSnapshotToBeCompleted();

TestHelper.enableDbCdc(connection);
connection.execute("UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'");
TestHelper.refreshAndWait(connection);

connection.execute("INSERT INTO tablea VALUES (100,'100')");
connection.execute("INSERT INTO tablea VALUES (200,'200')");

TestHelper.refreshAndWait(connection);

recordCount = 2;
sourceRecords = consumeRecordsByTopic(recordCount);
assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
}

@Test
public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLException {

final String pkField = "ID";

Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.CUSTOM.getValue())
.with(Db2ConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "DB2INST1.TABLEA,DB2INST1.TABLEB")
.with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM)
.with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.build();

connection.execute("INSERT INTO tableb VALUES (1, '1');");

start(Db2Connector.class, config);
assertConnectorIsRunning();

SourceRecords actualRecords = consumeRecordsByTopic(2);

List<SourceRecord> s1recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
List<SourceRecord> s2recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEB");

if (s2recs != null) { // Sometimes the record is processed by the stream so filtering it out
s2recs = s2recs.stream().filter(r -> "r".equals(((Struct) r.value()).get("op")))
.collect(Collectors.toList());
}
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs).isNull();

SourceRecord record = s1recs.get(0);
VerifyRecord.isValidRead(record, pkField, 1);

TestHelper.enableDbCdc(connection);
connection.execute("UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'");
TestHelper.refreshAndWait(connection);

connection.execute("INSERT INTO tablea VALUES (2, '1');");
connection.execute("INSERT INTO tableb VALUES (2, '1');");

TestHelper.refreshAndWait(connection);

actualRecords = consumeRecordsByTopic(2);

s1recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
s2recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
record = s1recs.get(0);
VerifyRecord.isValidInsert(record, pkField, 2);
record = s2recs.get(0);
VerifyRecord.isValidInsert(record, pkField, 2);
stopConnector();

config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.CUSTOM.getValue())
.with(Db2ConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM)
.with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();
actualRecords = consumeRecordsByTopic(4);

s1recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
s2recs = actualRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
VerifyRecord.isValidRead(s1recs.get(0), pkField, 1);
VerifyRecord.isValidRead(s1recs.get(1), pkField, 2);
VerifyRecord.isValidRead(s2recs.get(0), pkField, 1);
VerifyRecord.isValidRead(s2recs.get(1), pkField, 2);
}

private void purgeDatabaseLogs() throws SQLException {
connection.execute("ALTER DATABASE testDB1 SET RECOVERY SIMPLE");
connection.execute("DBCC SHRINKFILE (testDB1, 1)");
}

private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ protected List<String> topicNames() {
return List.of(topicName(), "testdb.DB2INST1.B");
}

@Override
protected String noPKTopicName() {
return "testdb.DB2INST1.A42";
}

@Override
protected String tableName() {
return "DB2INST1.A";
Expand All @@ -133,6 +138,11 @@ protected List<String> tableNames() {
return List.of(tableName(), "DB2INST1.B");
}

@Override
protected String noPKTableName() {
return "DB2INST1.A42";
}

@Override
protected String signalTableName() {
return "DEBEZIUM_SIGNAL";
Expand All @@ -142,6 +152,11 @@ protected String getSignalTypeFieldName() {
return "TYPE";
}

@Override
protected String returnedIdentifierName(String queriedID) {
return queriedID.toUpperCase();
}

protected void sendAdHocSnapshotSignal() throws SQLException {
connection.execute(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestHelper {

public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
public static final String TEST_DATABASE = "testdb";
public static final int WAIT_FOR_CDC = 3 * 1000;
public static final int WAIT_FOR_CDC = 3 * 5000;

/**
* Key for schema parameter used to store a source column's type name.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.debezium.connector.db2.CustomTestSnapshot
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.debezium.connector.db2.CustomTestSnapshot

0 comments on commit 645033e

Please sign in to comment.