Skip to content

Commit

Permalink
DBZ-6493 refactored structure of predicates and transforms to reflect…
Browse files Browse the repository at this point in the history
… core code base and cleaned up ConfigMapping
  • Loading branch information
jcechace committed Jun 2, 2023
1 parent e51ab27 commit 6af3a3e
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 66 deletions.
23 changes: 15 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
Debezium operator provides an easy way to run the Debezium Server on Kubernetes or Openshift.

## Installation steps
The debezium operator currently support per namespace installation. To install the operator to your kubernetes cluster, simply create the descriptors available in the `k8` directory.

The debezium operator currently support per namespace installation. To install the operator to your kubernetes cluster,
simply create the descriptors available in the `k8` directory.

```bash
kubectl create -f k8/ -n $NAMESPACE
```

### Quickstart Example
The `exmaples/postgres` directory contains an example deployment of debezium server with PostgreSQL source and kafka sink.

The `exmaples/postgres` directory contains an example deployment of debezium server with PostgreSQL source and kafka
sink.

```bash
# Install Strimzi Kafka operator
Expand All @@ -21,6 +25,7 @@ kubectl create -f examples/postgres/ -n $NAMESPACE
```

## DebeziumServerSpec Reference

```yaml
spec:
version: String
Expand All @@ -42,21 +47,23 @@ spec:
header:
type: String
transforms:
- name: String
type: String
- type: String
predicate: String
negate: Boolean
config:
# other transformation properties
predicates:
- name: String
name:
type: String
config:
# other preticate properties
sink:
type: String
config:
# other sink properties
# other sink properties
source:
class: String
config:
# other source connector properties
```
# other source connector properties
```

64 changes: 44 additions & 20 deletions src/main/java/io/debezium/operator/config/ConfigMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
Expand All @@ -27,7 +27,7 @@ public final class ConfigMapping {

public static ConfigMapping from(Map<String, ?> properties) {
var config = ConfigMapping.empty();
config.put(properties);
config.putAll(properties);
return config;
}

Expand Down Expand Up @@ -56,38 +56,62 @@ public String getAsString() {
}

public void rootValue(Object value) {
put(null, value);
}

public void put(String key, ConfigMappable resource) {
var config = resource.asConfiguration();
put(key, config.getAsMap());
putInternal(value);
}

public void put(String key, Object value) {
putInternal(value, key);
}

public void put(ConfigMappable resource) {
var resourceConfig = resource.asConfiguration();
put(resourceConfig.getAsMap());
public void putAll(ConfigMappable resource) {
putAll(resource.asConfiguration());
}

public void put(Map<String, ?> props) {
props.forEach(this::put);
public void putAll(String key, ConfigMappable resource) {
putAll(key, resource.asConfiguration());
}

public void put(String key, Map<String, ?> props) {
props.forEach((subKey, value) -> putInternal(value, key, subKey));
public void putAll(ConfigMapping config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key));
}

public void putAll(String key, ConfigMapping config) {
config.getAsMap().forEach((subKey, value) -> putInternal(value, key, subKey));
}

public void putAll(Map<String, ?> props) {
props.forEach((key, value) -> putInternal(value, key));
}


public <T extends ConfigMappable> void putList(String key, List<T> items, String name) {
if (items.isEmpty()) {
return;
}

record NamedItem(String name, ConfigMappable item) {}

var named = IntStream.
range(0, items.size())
.mapToObj(i -> new NamedItem(name + i, items.get(i)))
.toList();

named.stream()
.map(NamedItem::name)
.reduce((x, y) -> String.join(","))
.ifPresent(names ->put(key, names ));


named.forEach(item -> putAll(key + "." + item.name, item.item));

}

public <T extends ConfigMappable> void put(String key, Collection<T> items, Function<T, String> nameExtractor) {
items.stream()
.map(nameExtractor)
public <T extends ConfigMappable> void putMap(String key, Map<String, T> items) {
items.keySet().stream()
.reduce((x, y) -> String.join(","))
.ifPresent(names -> put(key, names));

items.forEach(item -> put(key, item));
items.forEach( (name, item) -> putAll(key + "." + name, item));

}

Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/debezium/operator/model/DebeziumServerSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.operator.model;

import java.util.List;
import java.util.Map;

import io.debezium.operator.config.ConfigMappable;
import io.debezium.operator.config.ConfigMapping;
Expand All @@ -20,7 +21,7 @@ public class DebeziumServerSpec implements ConfigMappable {
private Quarkus quarkus;
private Runtime runtime;
private List<Transformation> transforms;
private List<Predicate> predicates;
private Map<String, Predicate> predicates;

public DebeziumServerSpec() {
this.storage = new Storage();
Expand All @@ -30,7 +31,7 @@ public DebeziumServerSpec() {
this.quarkus = new Quarkus();
this.runtime = new Runtime();
this.transforms = List.of();
this.predicates = List.of();
this.predicates = Map.of();
}

public String getImage() {
Expand Down Expand Up @@ -105,26 +106,26 @@ public void setTransforms(List<Transformation> transforms) {
this.transforms = transforms;
}

public List<Predicate> getPredicates() {
public Map<String, Predicate> getPredicates() {
return predicates;
}

public void setPredicates(List<Predicate> predicates) {
public void setPredicates(Map<String, Predicate> predicates) {
this.predicates = predicates;
}

@Override
public ConfigMapping asConfiguration() {
var dbzConfig = ConfigMapping.prefixed("debezium");
dbzConfig.put("source", source);
dbzConfig.put("sink", sink);
dbzConfig.put("format", format);
dbzConfig.put("transforms", transforms, Transformation::getName);
dbzConfig.put("predicates", predicates, Predicate::getName);
dbzConfig.putAll("source", source);
dbzConfig.putAll("sink", sink);
dbzConfig.putAll("format", format);
dbzConfig.putList("transforms", transforms, "t");
dbzConfig.putMap("predicates", predicates);

var config = ConfigMapping.empty();
config.put("quarkus", quarkus);
config.put(dbzConfig.getAsMap());
config.putAll("quarkus", quarkus);
config.putAll(dbzConfig);

return config;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/debezium/operator/model/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void setHeader(FormatType header) {
@Override
public ConfigMapping asConfiguration() {
var config = ConfigMapping.empty();
config.put("key", key);
config.put("value", value);
config.put("header", header);
config.putAll("key", key);
config.putAll("value", value);
config.putAll("header", header);
return config;
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/debezium/operator/model/FormatType.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setType(String type) {
@Override
public ConfigMapping asConfiguration() {
var config = ConfigMapping.empty();
config.put(this.config);
config.putAll(this.config);
config.rootValue(type);
return config;
}
Expand Down
13 changes: 2 additions & 11 deletions src/main/java/io/debezium/operator/model/Predicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,13 @@

public class Predicate implements ConfigMappable {

private String name;
private String type;
private ConfigProperties config;

public Predicate() {
this.config = new ConfigProperties();
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}
Expand All @@ -44,9 +35,9 @@ public void setConfig(ConfigProperties config) {

@Override
public ConfigMapping asConfiguration() {
var config = ConfigMapping.prefixed(name);
var config = ConfigMapping.empty();
config.put("type", type);
config.put(this.config);
config.putAll(this.config);
return config;

}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/debezium/operator/model/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setConfig(ConfigProperties config) {
public ConfigMapping asConfiguration() {
var config = ConfigMapping.empty();
config.put("type", type);
config.put(type, this.config);
config.putAll(type, this.config);
return config;
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/debezium/operator/model/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void setConfig(ConfigProperties config) {
public ConfigMapping asConfiguration() {
var config = ConfigMapping.empty();
config.put("connector.class", sourceClass);
config.put(this.config);
config.putAll(this.config);
return config;
}
}
11 changes: 1 addition & 10 deletions src/main/java/io/debezium/operator/model/Transformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,11 @@

public class Transformation implements ConfigMappable {

private String name;
private String type;
private String predicate;
private boolean negate = false;
private ConfigProperties config;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -62,7 +53,7 @@ public ConfigMapping asConfiguration() {
config.put("type", type);
config.put("predicate", predicate);
config.put("negate", negate);
config.put(this.config);
config.putAll(this.config);
return config;
}
}

0 comments on commit 6af3a3e

Please sign in to comment.