Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7953 Ability to stop DS via debezium.io/stop annotation #57

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model;

public final class CommonAnnotations {
public static final String KEY_DBZ_STOP = "debezium.io/stop";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package io.debezium.operator.api.model;

import com.fasterxml.jackson.annotation.JsonIgnore;

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.status.Condition;
import io.debezium.operator.api.model.status.DebeziumServerStatus;
import io.debezium.operator.commons.OperatorConstants;
import io.debezium.operator.docs.annotations.Documented;
Expand Down Expand Up @@ -42,4 +45,19 @@ protected DebeziumServerSpec initSpec() {
public ConfigMapping asConfiguration() {
return spec.asConfiguration();
}

@JsonIgnore
public boolean isStopped() {
var annotation = getMetadata().getAnnotations().getOrDefault(CommonAnnotations.KEY_DBZ_STOP, Condition.FALSE);
return annotation.equalsIgnoreCase(Condition.TRUE);
}

public void setStopped(boolean stopped) {
if (stopped) {
getMetadata().getAnnotations().put(CommonAnnotations.KEY_DBZ_STOP, Condition.TRUE);
}
else {
getMetadata().getAnnotations().remove(CommonAnnotations.KEY_DBZ_STOP);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,31 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
@Documented
public class Condition {

public static final String TRUE = "True";
public static final String FALSE = "False";

@JsonPropertyDescription("The status of the condition, either True, False or Unknown.")
private String status;
@JsonPropertyDescription("Human-readable message indicating details about the condition’s last transition.")
private String message;
@JsonPropertyDescription("Unique identifier of a condition.")
private String type;

public Condition() {
}

public Condition(String type, String status, String message) {
this.status = status;
this.message = message;
this.type = type;
}

public String getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public final class ServerNotReadyCondition extends Condition {

public static final String MESSAGE_TEMPLATE = "Server %s deployment in progress";

public ServerNotReadyCondition(String name) {
super(ServerReadyCondition.TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public final class ServerReadyCondition extends Condition {

public static final String TYPE = "Ready";
private static final String MESSAGE_TEMPLATE = "Server %s is ready";

public ServerReadyCondition(String name) {
super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public class ServerRunningCondition extends Condition {
public static final String TYPE = "Running";
public static final String MESSAGE_TEMPLATE = "Server %s is running";

public ServerRunningCondition(String name) {
super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public class ServerStoppedCondition extends Condition {
public static final String TYPE = "Running";
public static final String MESSAGE_TEMPLATE = "Server %s is stopped";

public ServerStoppedCondition(String name) {
super(TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.status.Condition;
import io.debezium.operator.api.model.status.DebeziumServerStatus;
import io.debezium.operator.api.model.status.ServerNotReadyCondition;
import io.debezium.operator.api.model.status.ServerReadyCondition;
import io.debezium.operator.api.model.status.ServerRunningCondition;
import io.debezium.operator.api.model.status.ServerStoppedCondition;
import io.debezium.operator.commons.OperatorConstants;
import io.debezium.operator.core.dependent.ConfigMapDependent;
import io.debezium.operator.core.dependent.DeploymentDependent;
Expand Down Expand Up @@ -71,28 +75,17 @@ public UpdateControl<DebeziumServer> reconcile(DebeziumServer debeziumServer, Co
else {
var delay = Duration.ofSeconds(10);
Log.infof("Server %s not ready yet, rescheduling after %ds", name, delay.toSeconds());
initializeNotReadyStatus(debeziumServer);
initializeConditions(debeziumServer, new ServerNotReadyCondition(name));
return UpdateControl.patchStatus(debeziumServer).rescheduleAfter(delay);
}
}).orElseThrow();
}

private void initializeReadyStatus(DebeziumServer debeziumServer) {
var condition = new Condition();
condition.setType("Ready");
condition.setStatus("True");
condition.setMessage("Server %s is ready".formatted(debeziumServer.getMetadata().getName()));

initializeConditions(debeziumServer, condition);
}

private void initializeNotReadyStatus(DebeziumServer debeziumServer) {
var condition = new Condition();
condition.setType("Ready");
condition.setStatus("False");
condition.setMessage("Server %s deployment in progress".formatted(debeziumServer.getMetadata().getName()));

initializeConditions(debeziumServer, condition);
var name = debeziumServer.getMetadata().getName();
var ready = new ServerReadyCondition(name);
var running = debeziumServer.isStopped() ? new ServerStoppedCondition(name) : new ServerRunningCondition(name);
initializeConditions(debeziumServer, ready, running);
}

private void initializeConditions(DebeziumServer debeziumServer, Condition... conditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import io.debezium.operator.api.model.CommonLabels;
import io.debezium.operator.api.model.DebeziumServer;
Expand Down Expand Up @@ -79,6 +80,9 @@ public class DeploymentDependent extends CRUDKubernetesDependentResource<Deploym
@Inject
ServerImageProvider imageProvider;

@Inject
Logger logger;

public DeploymentDependent() {
super(Deployment.class);
}
Expand All @@ -90,15 +94,18 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
var templates = runtime.getTemplates();
var name = primary.getMetadata().getName();
var labels = CommonLabels.serverComponent(name).getMap();
var primaryLabels = primary.getMetadata().getLabels();
var annotations = Map.of(CONFIG_MD5_ANNOTATION, primary.asConfiguration().md5Sum());
var replicas = desiredReplicas(primary);

var sa = ServiceAccountDependent.serviceAccountNameFor(primary);

var desiredContainer = desiredServerContainer(primary);

var pod = new PodTemplateSpecBuilder()
.withMetadata(new ObjectMetaBuilder()
.withLabels(labels)
.withLabels(primaryLabels)
.addToLabels(labels)
.withAnnotations(annotations)
.build())
.withSpec(new PodSpecBuilder()
Expand All @@ -122,10 +129,12 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
.withMetadata(new ObjectMetaBuilder()
.withNamespace(primary.getMetadata().getNamespace())
.withName(name)
.withLabels(labels)
.withLabels(primaryLabels)
.addToLabels(labels)
.withAnnotations(annotations)
.build())
.withSpec(new DeploymentSpecBuilder()
.withReplicas(replicas)
.withSelector(new LabelSelectorBuilder()
.addToMatchLabels(labels)
.build())
Expand All @@ -134,6 +143,10 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
.build();
}

private int desiredReplicas(DebeziumServer primary) {
return primary.isStopped() ? 0 : 1;
}

private void addMetricConfigurationToPod(DebeziumServer primary, PodTemplateSpec pod) {
var metrics = primary.getSpec().getRuntime().getMetrics();
var jmxExporter = metrics.getJmxExporter();
Expand Down