Skip to content

Commit

Permalink
DBZ-8015 Test offset store configuration and reconcilation
Browse files Browse the repository at this point in the history
  • Loading branch information
obabec committed Jul 29, 2024
1 parent e33a58b commit e1da7c3
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 42 deletions.
30 changes: 10 additions & 20 deletions systemtests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
<fabric8.version>6.10.0</fabric8.version>
<junit.jupiter.version>5.10.2</junit.jupiter.version>
<junit.platform.launcher.version>1.10.2</junit.platform.launcher.version>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j2.version>2.23.1</log4j2.version>
<assertj.version>3.25.3</assertj.version>
<testframe.version>0.1.1</testframe.version>
<testframe.version>0.2.1</testframe.version>
<dmt.version>0.0.1-alpha1</dmt.version>
<okhttp.version>5.0.0-alpha.12</okhttp.version>
<awaitility.version>4.2.1</awaitility.version>
<jackson.version>2.16.1</jackson.version>
<lombok.version>1.18.34</lombok.version>
<apache.commons.lang.version>3.14.0</apache.commons.lang.version>
<google.json.simple.version>1.1.1</google.json.simple.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -75,23 +75,8 @@
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down Expand Up @@ -139,6 +124,11 @@
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>${google.json.simple.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public final class ConfigProperties {
public static final String BUNDLE_PATH = System.getProperty("test.bundle.path", System.getProperty("user.dir") + "/../k8/");
public static final Integer HTTP_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.http.poll.timeout", "20"));
public static final Integer HTTP_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.http.poll.interval", "200"));
public static final Integer FABRIC8_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.fabric8.poll.interval", "2"));
public static final Integer FABRIC8_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.fabric8.poll.timeout", "60"));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.systemtests;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.Offset;
import io.debezium.operator.api.model.source.OffsetBuilder;
import io.debezium.operator.systemtests.resources.NamespaceHolder;
import io.debezium.operator.systemtests.resources.dmt.DmtClient;
import io.debezium.operator.systemtests.resources.operator.DebeziumOperatorBundleResource;
import io.debezium.operator.systemtests.resources.server.DebeziumServerGenerator;
import io.debezium.operator.systemtests.resources.sinks.RedisResource;
import io.fabric8.kubernetes.client.LocalPortForward;
import io.skodjob.testframe.resources.KubeResourceManager;

public class OffsetStorageTest extends TestBase {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Test
void testRedisOffsetStorage() {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
DebeziumOperatorBundleResource operatorBundleResource = new DebeziumOperatorBundleResource();
operatorBundleResource.configureAsDefault(namespace);
logger.info("Deploying Operator");
operatorBundleResource.deploy();
logger.info("Deploying Debezium Server");
DebeziumServer server = DebeziumServerGenerator.generateDefaultMysqlToRedis(namespace);

Offset offset = new OffsetBuilder()
.withNewRedis()
.withAddress(RedisResource.getDefaultRedisAddress())
.endRedis()
.withFlushMs(10)
.build();
server.getSpec().getSource().setOffset(offset);

KubeResourceManager.getInstance().createResourceWithWait(server);
assertStreamingWorks();

try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort);
assertThat(redis_offset).contains("file");
assertThat(redis_offset).contains("pos");
}
catch (IOException e) {
throw new RuntimeException(e);
}

server.getSpec().getSource().getOffset().getRedis().setKey("metadata:debezium_n:offsets");
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(server);
assertStreamingWorks(10, 20);

try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort, "metadata:debezium_n:offsets");
assertThat(redis_offset).contains("file");
assertThat(redis_offset).contains("pos");
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.time.Duration;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -34,9 +35,9 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
private final DmtResource dmtResource = new DmtResource();
private final String portForwardHost = "127.0.0.1";
private int portForwardPort = 8080;
protected final DmtResource dmtResource = new DmtResource();
protected final String portForwardHost = "127.0.0.1";
protected int portForwardPort = 8080;

@BeforeAll
void initDefault() {
Expand Down Expand Up @@ -70,18 +71,27 @@ void cleanUp() {
}

public void assertStreamingWorks() {
assertStreamingWorks(10, 10);
}

public void assertStreamingWorks(int messagesToDatabase, int expectedMessages) {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
try (LocalPortForward lcp = dmtResource.portForward(8080, namespace)) {
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(HTTP_POLL_TIMEOUT));
DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, 10);
DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(40), "inventory.inventory.operator_test");
DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, messagesToDatabase);
DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(60), "inventory.inventory.operator_test");
await().atMost(Duration.ofMinutes(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, 10) == 10);
.until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, expectedMessages) == expectedMessages);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterAll
void resetNamespace() {
NamespaceHolder.INSTANCE.resetNamespace();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public String getCurrentNamespace() {
return currentNamespace;
}

public void resetNamespace() {
this.currentNamespace = null;
}

public DmtResource getNamespacedDmt() {
return namespacedDmt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -47,8 +51,30 @@ private static OkHttpClient defaultClient() {
.build();
}

public static String readRedisOffsets(String host, int port) {
return readRedisOffsets(host, port, "metadata:debezium:offsets");
}

public static String readRedisOffsets(String host, int port, String key) {
AtomicReference<String> offset = new AtomicReference<>();
Map<String, String> params = Map.of("hashKey", key);
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/readHash", params)) {
offset.set(response.body().string());
return response.isSuccessful();
}
catch (Exception e) {
return false;
}
});
return offset.get();
}

public static void resetRedis(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/reset")) {
return response.isSuccessful();
Expand All @@ -60,7 +86,8 @@ public static void resetRedis(String host, int port) {
}

public static void resetMysql(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Main/ResetDatabase")) {
return response.isSuccessful();
Expand Down Expand Up @@ -92,17 +119,24 @@ public static void waitForFilledRedis(String host, int port, Duration atMost, St
}

public static int digStreamedData(String host, int port, int number) {
String jsonRespo = readRedisChannel(host, port, "inventory.inventory.operator_test", number);
final String CHANNEL = "inventory.inventory.operator_test";
JSONParser parser = new JSONParser();
String jsonRespo = readRedisChannel(host, port, CHANNEL, number);

if (Objects.isNull(jsonRespo)) {
return 0;
}
int count = 0;
for (int i = 0; i < number; i++) {
if (jsonRespo.contains("name" + i)) {
count++;
}

try {
JSONArray response = (JSONArray) parser.parse(jsonRespo);
JSONObject topic = (JSONObject) response.get(0);
JSONArray responses = (JSONArray) topic.get(CHANNEL);
return responses.size();
}
catch (ParseException e) {
LOGGER.error("Cannot parse JSON response from DMT: {}", e.getMessage());
return 0;
}
return count;
}

public static String readRedisChannel(String host, int port, String channel, int limit) {
Expand Down Expand Up @@ -242,4 +276,24 @@ public static Response sendGetRequest(String host, int port, String command) thr
Call call = client.newCall(request);
return call.execute();
}

public static Response sendGetRequest(String host, int port, String command, Map<String, String> params) throws IOException {
OkHttpClient client = defaultClient();

HttpUrl.Builder builder = Objects.requireNonNull(HttpUrl.parse("http://" + host + ":" + port + command))
.newBuilder();

if (!Objects.isNull(params)) {
for (Map.Entry<String, String> entry : params.entrySet()) {
builder = builder.addQueryParameter(entry.getKey(), entry.getValue());
}
}

Request request = new Request.Builder()
.url(builder.build())
.build();

Call call = client.newCall(request);
return call.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@
*/
package io.debezium.operator.systemtests.resources.server;

import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_INTERVAL;
import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_TIMEOUT;
import static org.awaitility.Awaitility.await;

import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.operator.api.model.DebeziumServer;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperationsImpl;
import io.skodjob.testframe.interfaces.ResourceType;
import io.skodjob.testframe.resources.DeploymentType;
import io.skodjob.testframe.resources.KubeResourceManager;

public class DebeziumServerResource implements ResourceType<DebeziumServer> {

private final MixedOperation<DebeziumServer, DebeziumServerList, Resource<DebeziumServer>> client;
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public DebeziumServerResource() {
this.client = KubeResourceManager.getKubeClient().getClient().resources(DebeziumServer.class, DebeziumServerList.class);
Expand Down Expand Up @@ -65,11 +72,22 @@ public void replace(String s, Consumer<DebeziumServer> editor) {

@Override
public boolean waitForReadiness(DebeziumServer debeziumServer) {
new DeploymentType().getClient()
.inNamespace(debeziumServer.getMetadata().getNamespace())
.withName(debeziumServer.getMetadata().getName()).waitUntilReady(1, TimeUnit.MINUTES);

return client.resource(debeziumServer).isReady();
await().atMost(Duration.ofSeconds(FABRIC8_POLL_TIMEOUT)).pollInterval(Duration.ofSeconds(FABRIC8_POLL_INTERVAL))
.until(() -> {
DebeziumServer dbzServer = client.inNamespace(debeziumServer.getMetadata().getNamespace())
.withName(debeziumServer.getMetadata().getName()).get();

boolean ready = dbzServer.getStatus().getConditions().stream()
.anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
if (ready) {
return true;
}
else {
logger.info("Waiting for readiness of Debezium Server...");
return false;
}
});
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void configureAsDefault(String namespace) {
}
}

public static String getDefaultRedisAddress() {
return "redis-service:6379";
}

@Override
public void deploy() {
KubeResourceManager.getInstance().createResourceWithoutWait(configMap, service, persistentVolumeClaim);
Expand Down

0 comments on commit e1da7c3

Please sign in to comment.