diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java index 8e59cd6..d31604a 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -404,8 +405,8 @@ public SnapshotIsolationMode getSnapshotIsolationMode() { return this.snapshotIsolationMode; } - public SnapshotLockingMode getSnapshotLockingMode() { - return this.snapshotLockingMode; + public Optional getSnapshotLockingMode() { + return Optional.of(this.snapshotLockingMode); } public SnapshotMode getSnapshotMode() { diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index 1466803..0c2f165 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -21,8 +21,6 @@ import io.debezium.config.Field; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.common.BaseSourceTask; -import io.debezium.connector.db2.snapshot.Db2SnapshotLockProvider; -import io.debezium.connector.db2.snapshot.Db2SnapshotterServiceProvider; import io.debezium.document.DocumentReader; import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; @@ -37,7 +35,6 @@ import io.debezium.relational.TableId; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; -import io.debezium.service.spi.ServiceRegistry; import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; @@ -230,11 +227,4 @@ private static Configuration applyFetchSizeToJdbcConfig(Configuration config) { return config; } - @Override - protected void registerServiceProviders(ServiceRegistry serviceRegistry) { - - super.registerServiceProviders(serviceRegistry); - serviceRegistry.registerServiceProvider(new Db2SnapshotLockProvider()); - serviceRegistry.registerServiceProvider(new Db2SnapshotterServiceProvider()); - } } diff --git a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java index 8b23a1e..7ca4ff4 100644 --- a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java @@ -11,7 +11,6 @@ import java.sql.Savepoint; import java.sql.Statement; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -32,7 +31,6 @@ import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; import io.debezium.snapshot.SnapshotterService; -import io.debezium.spi.snapshot.Snapshotter; import io.debezium.util.Clock; public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -51,41 +49,6 @@ public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConn this.jdbcConnection = connectionFactory.mainConnection(); } - @Override - public SnapshottingTask getSnapshottingTask(Db2Partition partition, Db2OffsetContext previousOffset) { - - final Snapshotter snapshotter = snapshotterService.getSnapshotter(); - - List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); - Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); - - boolean offsetExists = previousOffset != null; - boolean snapshotInProgress = false; - - if (offsetExists) { - snapshotInProgress = previousOffset.isSnapshotRunning(); - } - - if (offsetExists && !previousOffset.isSnapshotRunning()) { - LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); - } - - boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress); - boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress); - - if (shouldSnapshotData && shouldSnapshotSchema) { - LOGGER.info("According to the connector configuration both schema and data will be snapshot."); - } - else if (shouldSnapshotSchema) { - LOGGER.info("According to the connector configuration only schema will be snapshot."); - } - - return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData, - dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, - false); - } - @Override protected SnapshotContext prepare(Db2Partition partition, boolean onDemand) { return new Db2SnapshotContext(partition, jdbcConnection.getRealDatabaseName(), onDemand); @@ -125,7 +88,7 @@ else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXC } Optional lockingStatement = snapshotterService.getSnapshotLock().tableLockingStatement(connectorConfig.snapshotLockTimeout(), - Set.of(quoteTableName(tableId))); + quoteTableName(tableId)); if (lockingStatement.isPresent()) { LOGGER.info("Locking table {}", tableId); diff --git a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java index 3a4610a..25465ac 100644 --- a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java @@ -115,11 +115,6 @@ public void init(Db2OffsetContext offsetContext) { public void execute(ChangeEventSourceContext context, Db2Partition partition, Db2OffsetContext offsetContext) throws InterruptedException { - if (!snapshotterService.getSnapshotter().shouldStream()) { - LOGGER.info("Streaming is not enabled in current configuration"); - return; - } - final Metronome metronome = Metronome.sleeper(pollInterval, clock); final Queue schemaChangeCheckpoints = new PriorityQueue<>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn())); try { diff --git a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java b/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java deleted file mode 100644 index 5fac906..0000000 --- a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.db2.snapshot; - -import io.debezium.bean.StandardBeanNames; -import io.debezium.bean.spi.BeanRegistry; -import io.debezium.connector.db2.Db2ConnectorConfig; -import io.debezium.service.spi.ServiceProvider; -import io.debezium.snapshot.SnapshotLockProvider; -import io.debezium.snapshot.spi.SnapshotLock; - -/** - * An implementation of the {@link ServiceProvider} contract for the {@link SnapshotLock}. - * - * @author Mario Fiore Vitale - */ -public class Db2SnapshotLockProvider extends SnapshotLockProvider { - - @Override - public String snapshotLockingMode(BeanRegistry beanRegistry) { - - Db2ConnectorConfig sqlServerConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, Db2ConnectorConfig.class); - - return sqlServerConnectorConfig.getSnapshotLockingMode().getValue(); - } - -} diff --git a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java b/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java deleted file mode 100644 index c597673..0000000 --- a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.db2.snapshot; - -import io.debezium.bean.StandardBeanNames; -import io.debezium.bean.spi.BeanRegistry; -import io.debezium.connector.db2.Db2ConnectorConfig; -import io.debezium.snapshot.SnapshotterServiceProvider; - -public class Db2SnapshotterServiceProvider extends SnapshotterServiceProvider { - - @Override - public String snapshotMode(BeanRegistry beanRegistry) { - - Db2ConnectorConfig mySqlConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, Db2ConnectorConfig.class); - - return mySqlConnectorConfig.getSnapshotMode().getValue(); - } -} diff --git a/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java b/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java index dbd08a0..2f58272 100644 --- a/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java +++ b/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java @@ -8,7 +8,6 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.Set; import io.debezium.connector.db2.Db2ConnectorConfig; import io.debezium.snapshot.spi.SnapshotLock; @@ -26,9 +25,7 @@ public void configure(Map properties) { } @Override - public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { - - String tableId = tableIds.iterator().next(); // For Db2 we expect just one table at time. + public Optional tableLockingStatement(Duration lockTimeout, String tableId) { return Optional.of("SELECT * FROM " + tableId + " WHERE 0=1 WITH CS"); } diff --git a/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java b/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java index 2a1d1a1..ab4294d 100644 --- a/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java +++ b/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java @@ -8,7 +8,6 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.Set; import io.debezium.connector.db2.Db2ConnectorConfig; import io.debezium.snapshot.spi.SnapshotLock; @@ -26,7 +25,7 @@ public void configure(Map properties) { } @Override - public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { + public Optional tableLockingStatement(Duration lockTimeout, String tableId) { return Optional.empty(); }