Skip to content

Commit

Permalink
DBZ-7953 Ability to stop DS via debezium.io/stop annotation by scalin…
Browse files Browse the repository at this point in the history
…g replicas to 0
  • Loading branch information
jcechace committed Jun 27, 2024
1 parent 77c48d6 commit bf7d336
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model;

public final class CommonAnnotations {
public static final String KEY_DBZ_STOP = "debezium.io/stop";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package io.debezium.operator.api.model;

import com.fasterxml.jackson.annotation.JsonIgnore;

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.status.Condition;
import io.debezium.operator.api.model.status.DebeziumServerStatus;
import io.debezium.operator.commons.OperatorConstants;
import io.debezium.operator.docs.annotations.Documented;
Expand Down Expand Up @@ -42,4 +45,19 @@ protected DebeziumServerSpec initSpec() {
public ConfigMapping asConfiguration() {
return spec.asConfiguration();
}

@JsonIgnore
public boolean isStopped() {
var annotation = getMetadata().getAnnotations().getOrDefault(CommonAnnotations.KEY_DBZ_STOP, Condition.FALSE);
return annotation.equalsIgnoreCase(Condition.TRUE);
}

public void setStopped(boolean stopped) {
if (stopped) {
getMetadata().getAnnotations().put(CommonAnnotations.KEY_DBZ_STOP, Condition.TRUE);
}
else {
getMetadata().getAnnotations().remove(CommonAnnotations.KEY_DBZ_STOP);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,31 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
@Documented
public class Condition {

public static final String TRUE = "True";
public static final String FALSE = "False";

@JsonPropertyDescription("The status of the condition, either True, False or Unknown.")
private String status;
@JsonPropertyDescription("Human-readable message indicating details about the condition’s last transition.")
private String message;
@JsonPropertyDescription("Unique identifier of a condition.")
private String type;

public Condition() {
}

public Condition(String type, String status, String message) {
this.status = status;
this.message = message;
this.type = type;
}

public String getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public final class ServerNotReadyCondition extends Condition {

public static final String MESSAGE_TEMPLATE = "Server %s deployment in progress";

public ServerNotReadyCondition(String name) {
super(ServerReadyCondition.TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public final class ServerReadyCondition extends Condition {

public static final String TYPE = "Ready";
private static final String MESSAGE_TEMPLATE = "Server %s is ready";

public ServerReadyCondition(String name) {
super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public class ServerRunningCondition extends Condition {
public static final String TYPE = "Running";
public static final String MESSAGE_TEMPLATE = "Server %s is running";

public ServerRunningCondition(String name) {
super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.status;

public class ServerStoppedCondition extends Condition {
public static final String TYPE = "Running";
public static final String MESSAGE_TEMPLATE = "Server %s is stopped";

public ServerStoppedCondition(String name) {
super(TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.status.Condition;
import io.debezium.operator.api.model.status.DebeziumServerStatus;
import io.debezium.operator.api.model.status.ServerNotReadyCondition;
import io.debezium.operator.api.model.status.ServerReadyCondition;
import io.debezium.operator.api.model.status.ServerRunningCondition;
import io.debezium.operator.api.model.status.ServerStoppedCondition;
import io.debezium.operator.commons.OperatorConstants;
import io.debezium.operator.core.dependent.ConfigMapDependent;
import io.debezium.operator.core.dependent.DeploymentDependent;
Expand Down Expand Up @@ -71,28 +75,17 @@ public UpdateControl<DebeziumServer> reconcile(DebeziumServer debeziumServer, Co
else {
var delay = Duration.ofSeconds(10);
Log.infof("Server %s not ready yet, rescheduling after %ds", name, delay.toSeconds());
initializeNotReadyStatus(debeziumServer);
initializeConditions(debeziumServer, new ServerNotReadyCondition(name));
return UpdateControl.patchStatus(debeziumServer).rescheduleAfter(delay);
}
}).orElseThrow();
}

private void initializeReadyStatus(DebeziumServer debeziumServer) {
var condition = new Condition();
condition.setType("Ready");
condition.setStatus("True");
condition.setMessage("Server %s is ready".formatted(debeziumServer.getMetadata().getName()));

initializeConditions(debeziumServer, condition);
}

private void initializeNotReadyStatus(DebeziumServer debeziumServer) {
var condition = new Condition();
condition.setType("Ready");
condition.setStatus("False");
condition.setMessage("Server %s deployment in progress".formatted(debeziumServer.getMetadata().getName()));

initializeConditions(debeziumServer, condition);
var name = debeziumServer.getMetadata().getName();
var ready = new ServerReadyCondition(name);
var running = debeziumServer.isStopped() ? new ServerStoppedCondition(name) : new ServerRunningCondition(name);
initializeConditions(debeziumServer, ready, running);
}

private void initializeConditions(DebeziumServer debeziumServer, Condition... conditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import io.debezium.operator.api.model.CommonLabels;
import io.debezium.operator.api.model.DebeziumServer;
Expand Down Expand Up @@ -79,6 +80,9 @@ public class DeploymentDependent extends CRUDKubernetesDependentResource<Deploym
@Inject
ServerImageProvider imageProvider;

@Inject
Logger logger;

public DeploymentDependent() {
super(Deployment.class);
}
Expand All @@ -90,15 +94,18 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
var templates = runtime.getTemplates();
var name = primary.getMetadata().getName();
var labels = CommonLabels.serverComponent(name).getMap();
var primaryLabels = primary.getMetadata().getLabels();
var annotations = Map.of(CONFIG_MD5_ANNOTATION, primary.asConfiguration().md5Sum());
var replicas = desiredReplicas(primary);

var sa = ServiceAccountDependent.serviceAccountNameFor(primary);

var desiredContainer = desiredServerContainer(primary);

var pod = new PodTemplateSpecBuilder()
.withMetadata(new ObjectMetaBuilder()
.withLabels(labels)
.withLabels(primaryLabels)
.addToLabels(labels)
.withAnnotations(annotations)
.build())
.withSpec(new PodSpecBuilder()
Expand All @@ -122,10 +129,12 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
.withMetadata(new ObjectMetaBuilder()
.withNamespace(primary.getMetadata().getNamespace())
.withName(name)
.withLabels(labels)
.withLabels(primaryLabels)
.addToLabels(labels)
.withAnnotations(annotations)
.build())
.withSpec(new DeploymentSpecBuilder()
.withReplicas(replicas)
.withSelector(new LabelSelectorBuilder()
.addToMatchLabels(labels)
.build())
Expand All @@ -134,6 +143,10 @@ protected Deployment desired(DebeziumServer primary, Context<DebeziumServer> con
.build();
}

private int desiredReplicas(DebeziumServer primary) {
return primary.isStopped() ? 0 : 1;
}

private void addMetricConfigurationToPod(DebeziumServer primary, PodTemplateSpec pod) {
var metrics = primary.getSpec().getRuntime().getMetrics();
var jmxExporter = metrics.getJmxExporter();
Expand Down

0 comments on commit bf7d336

Please sign in to comment.