Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7305: Align snapshot modes for DB2 #142

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public StreamingChangeEventSource<Db2Partition, Db2OffsetContext> getStreamingCh
dispatcher,
errorHandler,
clock,
schema);
schema,
snapshotterService);
}

@Override
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@

import com.ibm.db2.jcc.DB2Driver;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -113,6 +116,7 @@ public class Db2Connection extends JdbcConnection {
*/
public Db2Connection(JdbcConfiguration config) {
super(config, FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);
this.logPositionValidator = this::validateLogPosition;
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
}
Expand Down Expand Up @@ -562,6 +566,31 @@ public TableId createTableId(String databaseName, String schemaName, String tabl
return new TableId(null, schemaName, tableName);
}

public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {

final Lsn storedLsn = ((Db2OffsetContext) offset).getChangePosition().getCommitLsn();

String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", CDC_SCHEMA);

try {
final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1));

if (oldestScn == null) {
return false;
}

LOGGER.trace("Oldest SCN in logs is '{}'", oldestScn);
return storedLsn == null || Lsn.valueOf(oldestScn).compareTo(storedLsn) < 0;
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}

public <T> T singleOptionalValue(String query, ResultSetExtractor<T> extractor) throws SQLException {
return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null);
}

private PreparedStatement createPreparedStatement(String query) {
try {
LOGGER.trace("Creating prepared statement '{}'", query);
Expand Down
143 changes: 119 additions & 24 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,61 @@ public class Db2ConnectorConfig extends HistorizedRelationalDatabaseConnectorCon
*/
public enum SnapshotMode implements EnumeratedValue {

/**
* Performs a snapshot of data and schema upon each connector start.
*/
ALWAYS("always"),

/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL("initial", true, true),
INITIAL("initial"),

/**
* Perform a snapshot of data and schema upon initial startup of a connector and stop after initial consistent snapshot.
* Perform a snapshot of data and schema upon initial startup of a connector but does not transition to streaming.
*/
INITIAL_ONLY("initial_only", true, false),
INITIAL_ONLY("initial_only"),

/**
* Perform a snapshot of the schema but no data upon initial startup of a connector.
* @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}}
*/
SCHEMA_ONLY("schema_only", false, true);
SCHEMA_ONLY("schema_only"),

/**
* Perform a snapshot of the schema but no data upon initial startup of a connector.
*/
NO_DATA("no_data"),

/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/
RECOVERY("recovery"),

/**
* Perform a snapshot when it is needed.
*/
WHEN_NEEDED("when_needed"),

/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
CUSTOM("custom");

private final String value;
private final boolean includeData;
private final boolean shouldStream;

SnapshotMode(String value, boolean includeData, boolean shouldStream) {
SnapshotMode(String value) {
this.value = value;
this.includeData = includeData;
this.shouldStream = shouldStream;
}

@Override
public String getValue() {
return value;
}

/**
* Whether this snapshotting mode should include the actual data or just the
* schema of captured tables.
*/
public boolean includeData() {
return includeData;
}

/**
* Whether the snapshot mode is followed by streaming.
*/
public boolean shouldStream() {
return shouldStream;
}

/**
* Determine if the supplied value is one of the predefined options.
*
Expand Down Expand Up @@ -130,6 +140,70 @@ public static SnapshotMode parse(String value, String defaultValue) {
}
}

/**
* The set of predefined snapshot locking mode options.
*/
public enum SnapshotLockingMode implements EnumeratedValue {

/**
* This mode will use exclusive lock TABLOCKX
*/
EXCLUSIVE("exclusive"),

/**
* This mode will avoid using ANY table locks during the snapshot process.
* This mode should be used carefully only when no schema changes are to occur.
*/
NONE("none"),

CUSTOM("custom");

private final String value;

SnapshotLockingMode(String value) {
this.value = value;
}

@Override
public String getValue() {
return value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotLockingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotLockingMode option : SnapshotLockingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotLockingMode parse(String value, String defaultValue) {
SnapshotLockingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}

/**
* The set of predefined snapshot isolation mode options.
*
Expand Down Expand Up @@ -242,6 +316,20 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
+ "In '" + SnapshotIsolationMode.READ_UNCOMMITTED.getValue()
+ "' mode neither table nor row-level locks are acquired, but connector does not guarantee snapshot consistency.");

public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
.withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.EXCLUSIVE)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 2))
.withDescription(
"Controls how the connector holds locks on tables while performing the schema snapshot when `snapshot.isolation.mode` is `REPEATABLE_READ` or `EXCLUSIVE`. The 'exclusive' "
+ "which means the connector will hold a table lock for exclusive table access for just the initial portion of the snapshot "
+ "while the database schemas and other metadata are being read. The remaining work in a snapshot involves selecting all rows from "
+ "each table, and this is done using a flashback query that requires no locks. However, in some cases it may be desirable to avoid "
+ "locks entirely which can be done by specifying 'none'. This mode is only safe to use if no schema changes are happening while the "
+ "snapshot is taken.");

public static final Field QUERY_FETCH_SIZE = CommonConnectorConfig.QUERY_FETCH_SIZE
.withDescription(
"The maximum number of records that should be loaded into memory while streaming. A value of '0' uses the default JDBC fetch size. The default value is '10000'.")
Expand Down Expand Up @@ -290,6 +378,8 @@ protected static ConfigDef configDef() {
private final SnapshotMode snapshotMode;
private final SnapshotIsolationMode snapshotIsolationMode;

private final SnapshotLockingMode snapshotLockingMode;

public Db2ConnectorConfig(Configuration config) {
super(
Db2Connector.class,
Expand All @@ -303,6 +393,7 @@ public Db2ConnectorConfig(Configuration config) {
this.databaseName = config.getString(DATABASE_NAME);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotIsolationMode = SnapshotIsolationMode.parse(config.getString(SNAPSHOT_ISOLATION_MODE), SNAPSHOT_ISOLATION_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
}

public String getDatabaseName() {
Expand All @@ -313,6 +404,10 @@ public SnapshotIsolationMode getSnapshotIsolationMode() {
return this.snapshotIsolationMode;
}

public SnapshotLockingMode getSnapshotLockingMode() {
return this.snapshotLockingMode;
}

public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.db2.snapshot.Db2SnapshotLockProvider;
import io.debezium.connector.db2.snapshot.Db2SnapshotterServiceProvider;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
Expand All @@ -35,6 +37,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -86,24 +89,22 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config

Offsets<Db2Partition, Db2OffsetContext> previousOffsets = getPreviousOffsets(new Db2Partition.Provider(connectorConfig),
new Db2OffsetContext.Loader(connectorConfig));
final Db2Partition partition = previousOffsets.getTheOnlyPartition();
final Db2OffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

// Manual Bean Registration
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection());
connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters);
connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsets);

// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

if (previousOffset != null) {
schema.recover(partition, previousOffset);
}
validateAndLoadSchemaHistory(connectorConfig, metadataConnection, previousOffsets, schema,
snapshotterService.getSnapshotter());

taskContext = new Db2TaskContext(connectorConfig, schema);

Expand Down Expand Up @@ -228,4 +229,12 @@ private static Configuration applyFetchSizeToJdbcConfig(Configuration config) {
}
return config;
}

@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {

super.registerServiceProviders(serviceRegistry);
serviceRegistry.registerServiceProvider(new Db2SnapshotLockProvider());
serviceRegistry.registerServiceProvider(new Db2SnapshotterServiceProvider());
}
}
Loading
Loading