diff --git a/README.md b/README.md index 8b41bbe..dcbc5f3 100644 --- a/README.md +++ b/README.md @@ -3,14 +3,18 @@ Debezium operator provides an easy way to run the Debezium Server on Kubernetes or Openshift. ## Installation steps -The debezium operator currently support per namespace installation. To install the operator to your kubernetes cluster, simply create the descriptors available in the `k8` directory. + +The debezium operator currently support per namespace installation. To install the operator to your kubernetes cluster, +simply create the descriptors available in the `k8` directory. ```bash kubectl create -f k8/ -n $NAMESPACE ``` ### Quickstart Example -The `exmaples/postgres` directory contains an example deployment of debezium server with PostgreSQL source and kafka sink. + +The `exmaples/postgres` directory contains an example deployment of debezium server with PostgreSQL source and kafka +sink. ```bash # Install Strimzi Kafka operator @@ -21,6 +25,7 @@ kubectl create -f examples/postgres/ -n $NAMESPACE ``` ## DebeziumServerSpec Reference + ```yaml spec: version: String @@ -42,21 +47,23 @@ spec: header: type: String transforms: - - name: String - type: String + - type: String predicate: String negate: Boolean + config: # other transformation properties predicates: - - name: String + name: type: String + config: # other preticate properties sink: type: String config: - # other sink properties + # other sink properties source: class: String config: - # other source connector properties -``` \ No newline at end of file + # other source connector properties +``` + diff --git a/src/main/java/io/debezium/operator/config/ConfigMapping.java b/src/main/java/io/debezium/operator/config/ConfigMapping.java index dd7c4f5..155bd35 100644 --- a/src/main/java/io/debezium/operator/config/ConfigMapping.java +++ b/src/main/java/io/debezium/operator/config/ConfigMapping.java @@ -9,12 +9,12 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; /** @@ -27,7 +27,7 @@ public final class ConfigMapping { public static ConfigMapping from(Map properties) { var config = ConfigMapping.empty(); - config.put(properties); + config.putAll(properties); return config; } @@ -56,38 +56,62 @@ public String getAsString() { } public void rootValue(Object value) { - put(null, value); - } - - public void put(String key, ConfigMappable resource) { - var config = resource.asConfiguration(); - put(key, config.getAsMap()); + putInternal(value); } public void put(String key, Object value) { putInternal(value, key); } - public void put(ConfigMappable resource) { - var resourceConfig = resource.asConfiguration(); - put(resourceConfig.getAsMap()); + public void putAll(ConfigMappable resource) { + putAll(resource.asConfiguration()); } - public void put(Map props) { - props.forEach(this::put); + public void putAll(String key, ConfigMappable resource) { + putAll(key, resource.asConfiguration()); } - public void put(String key, Map props) { - props.forEach((subKey, value) -> putInternal(value, key, subKey)); + public void putAll(ConfigMapping config) { + config.getAsMap().forEach((key, value) -> putInternal(value, key)); + } + + public void putAll(String key, ConfigMapping config) { + config.getAsMap().forEach((subKey, value) -> putInternal(value, key, subKey)); + } + + public void putAll(Map props) { + props.forEach((key, value) -> putInternal(value, key)); + } + + + public void putList(String key, List items, String name) { + if (items.isEmpty()) { + return; + } + + record NamedItem(String name, ConfigMappable item) {} + + var named = IntStream. + range(0, items.size()) + .mapToObj(i -> new NamedItem(name + i, items.get(i))) + .toList(); + + named.stream() + .map(NamedItem::name) + .reduce((x, y) -> String.join(",")) + .ifPresent(names ->put(key, names )); + + + named.forEach(item -> putAll(key + "." + item.name, item.item)); + } - public void put(String key, Collection items, Function nameExtractor) { - items.stream() - .map(nameExtractor) + public void putMap(String key, Map items) { + items.keySet().stream() .reduce((x, y) -> String.join(",")) .ifPresent(names -> put(key, names)); - items.forEach(item -> put(key, item)); + items.forEach( (name, item) -> putAll(key + "." + name, item)); } diff --git a/src/main/java/io/debezium/operator/model/DebeziumServerSpec.java b/src/main/java/io/debezium/operator/model/DebeziumServerSpec.java index 7d3679f..3062b1f 100644 --- a/src/main/java/io/debezium/operator/model/DebeziumServerSpec.java +++ b/src/main/java/io/debezium/operator/model/DebeziumServerSpec.java @@ -6,6 +6,7 @@ package io.debezium.operator.model; import java.util.List; +import java.util.Map; import io.debezium.operator.config.ConfigMappable; import io.debezium.operator.config.ConfigMapping; @@ -20,7 +21,7 @@ public class DebeziumServerSpec implements ConfigMappable { private Quarkus quarkus; private Runtime runtime; private List transforms; - private List predicates; + private Map predicates; public DebeziumServerSpec() { this.storage = new Storage(); @@ -30,7 +31,7 @@ public DebeziumServerSpec() { this.quarkus = new Quarkus(); this.runtime = new Runtime(); this.transforms = List.of(); - this.predicates = List.of(); + this.predicates = Map.of(); } public String getImage() { @@ -105,26 +106,26 @@ public void setTransforms(List transforms) { this.transforms = transforms; } - public List getPredicates() { + public Map getPredicates() { return predicates; } - public void setPredicates(List predicates) { + public void setPredicates(Map predicates) { this.predicates = predicates; } @Override public ConfigMapping asConfiguration() { var dbzConfig = ConfigMapping.prefixed("debezium"); - dbzConfig.put("source", source); - dbzConfig.put("sink", sink); - dbzConfig.put("format", format); - dbzConfig.put("transforms", transforms, Transformation::getName); - dbzConfig.put("predicates", predicates, Predicate::getName); + dbzConfig.putAll("source", source); + dbzConfig.putAll("sink", sink); + dbzConfig.putAll("format", format); + dbzConfig.putList("transforms", transforms, "t"); + dbzConfig.putMap("predicates", predicates); var config = ConfigMapping.empty(); - config.put("quarkus", quarkus); - config.put(dbzConfig.getAsMap()); + config.putAll("quarkus", quarkus); + config.putAll(dbzConfig); return config; } diff --git a/src/main/java/io/debezium/operator/model/Format.java b/src/main/java/io/debezium/operator/model/Format.java index 09c49cb..281c746 100644 --- a/src/main/java/io/debezium/operator/model/Format.java +++ b/src/main/java/io/debezium/operator/model/Format.java @@ -47,9 +47,9 @@ public void setHeader(FormatType header) { @Override public ConfigMapping asConfiguration() { var config = ConfigMapping.empty(); - config.put("key", key); - config.put("value", value); - config.put("header", header); + config.putAll("key", key); + config.putAll("value", value); + config.putAll("header", header); return config; } } diff --git a/src/main/java/io/debezium/operator/model/FormatType.java b/src/main/java/io/debezium/operator/model/FormatType.java index b15cd6a..c4ee3f1 100644 --- a/src/main/java/io/debezium/operator/model/FormatType.java +++ b/src/main/java/io/debezium/operator/model/FormatType.java @@ -38,7 +38,7 @@ public void setType(String type) { @Override public ConfigMapping asConfiguration() { var config = ConfigMapping.empty(); - config.put(this.config); + config.putAll(this.config); config.rootValue(type); return config; } diff --git a/src/main/java/io/debezium/operator/model/Predicate.java b/src/main/java/io/debezium/operator/model/Predicate.java index bd9df09..2271d28 100644 --- a/src/main/java/io/debezium/operator/model/Predicate.java +++ b/src/main/java/io/debezium/operator/model/Predicate.java @@ -10,7 +10,6 @@ public class Predicate implements ConfigMappable { - private String name; private String type; private ConfigProperties config; @@ -18,14 +17,6 @@ public Predicate() { this.config = new ConfigProperties(); } - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - public String getType() { return type; } @@ -44,9 +35,9 @@ public void setConfig(ConfigProperties config) { @Override public ConfigMapping asConfiguration() { - var config = ConfigMapping.prefixed(name); + var config = ConfigMapping.empty(); config.put("type", type); - config.put(this.config); + config.putAll(this.config); return config; } diff --git a/src/main/java/io/debezium/operator/model/Sink.java b/src/main/java/io/debezium/operator/model/Sink.java index f98ffd2..07c9157 100644 --- a/src/main/java/io/debezium/operator/model/Sink.java +++ b/src/main/java/io/debezium/operator/model/Sink.java @@ -37,7 +37,7 @@ public void setConfig(ConfigProperties config) { public ConfigMapping asConfiguration() { var config = ConfigMapping.empty(); config.put("type", type); - config.put(type, this.config); + config.putAll(type, this.config); return config; } } diff --git a/src/main/java/io/debezium/operator/model/Source.java b/src/main/java/io/debezium/operator/model/Source.java index 8f5def7..3124574 100644 --- a/src/main/java/io/debezium/operator/model/Source.java +++ b/src/main/java/io/debezium/operator/model/Source.java @@ -40,7 +40,7 @@ public void setConfig(ConfigProperties config) { public ConfigMapping asConfiguration() { var config = ConfigMapping.empty(); config.put("connector.class", sourceClass); - config.put(this.config); + config.putAll(this.config); return config; } } diff --git a/src/main/java/io/debezium/operator/model/Transformation.java b/src/main/java/io/debezium/operator/model/Transformation.java index f1d67a1..db7106f 100644 --- a/src/main/java/io/debezium/operator/model/Transformation.java +++ b/src/main/java/io/debezium/operator/model/Transformation.java @@ -10,20 +10,11 @@ public class Transformation implements ConfigMappable { - private String name; private String type; private String predicate; private boolean negate = false; private ConfigProperties config; - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - public String getType() { return type; } @@ -62,7 +53,7 @@ public ConfigMapping asConfiguration() { config.put("type", type); config.put("predicate", predicate); config.put("negate", negate); - config.put(this.config); + config.putAll(this.config); return config; } }