Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7992 Improve HTTP calls and figuring bundle path #62

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.systemtests;

public final class ConfigProperties {
public static final String BUNDLE_PATH = System.getProperty("test.bundle.path", System.getProperty("user.dir") + "/../k8/");
public static final Integer HTTP_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.http.poll.timeout", "20"));
public static final Integer HTTP_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.http.poll.interval", "200"));
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
*/
package io.debezium.operator.systemtests;

import static org.assertj.core.api.Assertions.assertThat;
import static io.debezium.operator.systemtests.ConfigProperties.HTTP_POLL_INTERVAL;
import static io.debezium.operator.systemtests.ConfigProperties.HTTP_POLL_TIMEOUT;
import static org.awaitility.Awaitility.await;

import java.io.IOException;
Expand All @@ -27,8 +28,6 @@
import io.skodjob.testframe.annotations.ResourceManager;
import io.skodjob.testframe.annotations.TestVisualSeparator;

import okhttp3.Response;

@ResourceManager
@TestVisualSeparator
@DebeziumResourceTypes
Expand Down Expand Up @@ -61,13 +60,9 @@ void initDefault() {
void cleanUp() {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(5));
Response redis = DmtClient.resetRedis(portForwardHost, portForwardPort);
assertThat(redis.code()).isEqualTo(200);
redis.close();
Response mysql = DmtClient.resetMysql(portForwardHost, portForwardPort);
assertThat(mysql.code()).isEqualTo(200);
mysql.close();
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(HTTP_POLL_TIMEOUT));
DmtClient.resetRedis(portForwardHost, portForwardPort);
DmtClient.resetMysql(portForwardHost, portForwardPort);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -77,10 +72,11 @@ void cleanUp() {
public void assertStreamingWorks() {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
try (LocalPortForward lcp = dmtResource.portForward(8080, namespace)) {
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(5));
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(HTTP_POLL_TIMEOUT));
DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, 10);
DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(40), "inventory.inventory.operator_test");
await().atMost(Duration.ofMinutes(5)).pollInterval(Duration.ofMillis(500))
await().atMost(Duration.ofMinutes(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, 10) == 10);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.operator.systemtests.resources.dmt;

import static io.debezium.operator.systemtests.ConfigProperties.HTTP_POLL_INTERVAL;
import static io.debezium.operator.systemtests.ConfigProperties.HTTP_POLL_TIMEOUT;
import static org.awaitility.Awaitility.await;

import java.io.IOException;
Expand All @@ -13,6 +15,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -30,28 +36,58 @@

public class DmtClient {
private static final MediaType MEDIATYPE_JSON = MediaType.parse("application/json; charset=utf-8");
private static final Logger LOGGER = LoggerFactory.getLogger(DmtClient.class);

private static OkHttpClient defaultClient() {
return new OkHttpClient.Builder()
.writeTimeout(Duration.ofSeconds(10))
.callTimeout(Duration.ofSeconds(10))
.connectTimeout(Duration.ofSeconds(10))
.readTimeout(Duration.ofSeconds(10))
.build();
}

public static Response resetRedis(String host, int port) {
return sendGetRequest(host, port, "/Redis/reset");
public static void resetRedis(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/reset")) {
return response.isSuccessful();
}
catch (Exception e) {
return false;
}
});
}

public static Response resetMysql(String host, int port) {
return sendGetRequest(host, port, "/Main/ResetDatabase");
public static void resetMysql(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Main/ResetDatabase")) {
return response.isSuccessful();
}
catch (Exception e) {
return false;
}
});
}

public static void waitForDmt(String host, int port, Duration atMost) {
await().atMost(atMost)
.pollInterval(Duration.ofMillis(100))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/")) {
return response.isSuccessful();
}
catch (Exception e) {
LOGGER.trace("DMT is not ready yet!");
return false;
}
});
}

public static void waitForFilledRedis(String host, int port, Duration atMost, String channel) {
await().atMost(atMost)
.pollInterval(Duration.ofMillis(200))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> readRedisChannel(host, port, channel, 100).length() > 100);
}

Expand Down Expand Up @@ -128,22 +164,35 @@ public static Response insertDataToDatabase(String host, int port, DatabaseEntry
}

public static Response sendPostRequest(String host, int port, String command, String body) {
OkHttpClient client = new OkHttpClient();
OkHttpClient client = defaultClient();
Request request = new Request.Builder()
.url("http://" + host + ":" + port + command)
.post(RequestBody.create(body, MEDIATYPE_JSON))
.build();
Call call = client.newCall(request);
try (Response response = call.execute()) {
return response;
}
catch (IOException e) {
throw new RuntimeException(e);
}

AtomicReference<Response> responseAtomicReference = new AtomicReference<>();
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = call.execute()) {
if (response.isSuccessful()) {
responseAtomicReference.set(response);
return true;
}
else {
return false;
}
}
catch (Exception e) {
return false;
}
});
return responseAtomicReference.get();
}

public static String sendPostRequest(String host, int port, String command, Map<String, String> params, String body) {
OkHttpClient client = new OkHttpClient();
OkHttpClient client = defaultClient();

HttpUrl.Builder builder = new HttpUrl.Builder()
.scheme("http")
Expand All @@ -165,28 +214,32 @@ public static String sendPostRequest(String host, int port, String command, Map<
.method("POST", requestBody)
.build();
Call call = client.newCall(request);
try (Response response = call.execute()) {
if (response.isSuccessful()) {
return response.body().string();
}
return null;
}
catch (IOException e) {
throw new RuntimeException(e);
}
AtomicReference<String> responseAtomicReference = new AtomicReference<>();
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = call.execute()) {
if (response.isSuccessful()) {
responseAtomicReference.set(response.body().string());
return true;
}
else {
return false;
}
}
catch (Exception e) {
return false;
}
});
return responseAtomicReference.get();
}

public static Response sendGetRequest(String host, int port, String command) {
OkHttpClient client = new OkHttpClient();
public static Response sendGetRequest(String host, int port, String command) throws IOException {
OkHttpClient client = defaultClient();
Request request = new Request.Builder()
.url("http://" + host + ":" + port + command)
.build();
Call call = client.newCall(request);
try (Response response = call.execute()) {
return response;
}
catch (IOException e) {
throw new RuntimeException(e);
}
return call.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.operator.systemtests.resources.operator;

import static io.debezium.operator.systemtests.ConfigProperties.BUNDLE_PATH;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
Expand All @@ -20,7 +22,6 @@
import io.skodjob.testframe.resources.KubeResourceManager;

public class DebeziumOperatorBundleResource implements DeployableResourceGroup {
private static final String DEFAULT_BUNDLE_PATH = System.getProperty("user.dir") + "/../k8/";
CustomResourceDefinition crd;
ServiceAccount serviceAccount;
ClusterRole clusterRole;
Expand All @@ -32,7 +33,7 @@ public class DebeziumOperatorBundleResource implements DeployableResourceGroup {
@Override
public void configureAsDefault(String namespace) {
try {
List<HasMetadata> res = KubeResourceManager.getKubeClient().readResourcesFromFile(Paths.get(DEFAULT_BUNDLE_PATH + "kubernetes.yml"));
List<HasMetadata> res = KubeResourceManager.getKubeClient().readResourcesFromFile(Paths.get(BUNDLE_PATH + "kubernetes.yml"));
for (HasMetadata object : res) {
object.getMetadata().setNamespace(namespace);
switch (object.getKind()) {
Expand All @@ -59,7 +60,7 @@ public void configureAsDefault(String namespace) {
break;
}
}
res = KubeResourceManager.getKubeClient().readResourcesFromFile(Paths.get(DEFAULT_BUNDLE_PATH + "/debeziumservers.debezium.io-v1.yml"));
res = KubeResourceManager.getKubeClient().readResourcesFromFile(Paths.get(BUNDLE_PATH + "/debeziumservers.debezium.io-v1.yml"));
if (res.size() != 1) {
throw new IOException("Specified file cannot be found or is in wrong format!");
}
Expand Down