From 0e7e11b9138bac291e1d1cc609f8be02740f1245 Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Mon, 17 Jun 2024 13:39:08 +0200 Subject: [PATCH] DBZ-7953 Ability to stop DS via debezium.io/stop annotation by scaling replicas to 0 --- .../operator/api/model/CommonAnnotations.java | 10 ++++++++ .../operator/api/model/DebeziumServer.java | 18 +++++++++++++ .../operator/api/model/status/Condition.java | 14 +++++++++++ .../model/status/ServerNotReadyCondition.java | 15 +++++++++++ .../model/status/ServerReadyCondition.java | 16 ++++++++++++ .../model/status/ServerRunningCondition.java | 15 +++++++++++ .../model/status/ServerStoppedCondition.java | 15 +++++++++++ .../core/DebeziumServerReconciler.java | 25 +++++++------------ .../core/dependent/DeploymentDependent.java | 17 +++++++++++-- 9 files changed, 127 insertions(+), 18 deletions(-) create mode 100644 debezium-operator-api/src/main/java/io/debezium/operator/api/model/CommonAnnotations.java create mode 100644 debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerNotReadyCondition.java create mode 100644 debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerReadyCondition.java create mode 100644 debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerRunningCondition.java create mode 100644 debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerStoppedCondition.java diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/CommonAnnotations.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/CommonAnnotations.java new file mode 100644 index 0000000..776dda1 --- /dev/null +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/CommonAnnotations.java @@ -0,0 +1,10 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.operator.api.model; + +public final class CommonAnnotations { + public static final String KEY_DBZ_STOP = "debezium.io/stop"; +} diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/DebeziumServer.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/DebeziumServer.java index 40dc88f..961d1a9 100644 --- a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/DebeziumServer.java +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/DebeziumServer.java @@ -5,8 +5,11 @@ */ package io.debezium.operator.api.model; +import com.fasterxml.jackson.annotation.JsonIgnore; + import io.debezium.operator.api.config.ConfigMappable; import io.debezium.operator.api.config.ConfigMapping; +import io.debezium.operator.api.model.status.Condition; import io.debezium.operator.api.model.status.DebeziumServerStatus; import io.debezium.operator.commons.OperatorConstants; import io.debezium.operator.docs.annotations.Documented; @@ -42,4 +45,19 @@ protected DebeziumServerSpec initSpec() { public ConfigMapping asConfiguration() { return spec.asConfiguration(); } + + @JsonIgnore + public boolean isStopped() { + var annotation = getMetadata().getAnnotations().getOrDefault(CommonAnnotations.KEY_DBZ_STOP, Condition.FALSE); + return annotation.equalsIgnoreCase(Condition.TRUE); + } + + public void setStopped(boolean stopped) { + if (stopped) { + getMetadata().getAnnotations().put(CommonAnnotations.KEY_DBZ_STOP, Condition.TRUE); + } + else { + getMetadata().getAnnotations().remove(CommonAnnotations.KEY_DBZ_STOP); + } + } } diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/Condition.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/Condition.java index 54a9076..50deee4 100644 --- a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/Condition.java +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/Condition.java @@ -8,10 +8,15 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription; import io.debezium.operator.docs.annotations.Documented; +import io.sundr.builder.annotations.Buildable; +@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false) @Documented public class Condition { + public static final String TRUE = "True"; + public static final String FALSE = "False"; + @JsonPropertyDescription("The status of the condition, either True, False or Unknown.") private String status; @JsonPropertyDescription("Human-readable message indicating details about the condition’s last transition.") @@ -19,6 +24,15 @@ public class Condition { @JsonPropertyDescription("Unique identifier of a condition.") private String type; + public Condition() { + } + + public Condition(String type, String status, String message) { + this.status = status; + this.message = message; + this.type = type; + } + public String getStatus() { return status; } diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerNotReadyCondition.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerNotReadyCondition.java new file mode 100644 index 0000000..c9a0452 --- /dev/null +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerNotReadyCondition.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.operator.api.model.status; + +public final class ServerNotReadyCondition extends Condition { + + public static final String MESSAGE_TEMPLATE = "Server %s deployment in progress"; + + public ServerNotReadyCondition(String name) { + super(ServerReadyCondition.TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name)); + } +} diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerReadyCondition.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerReadyCondition.java new file mode 100644 index 0000000..aa1777e --- /dev/null +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerReadyCondition.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.operator.api.model.status; + +public final class ServerReadyCondition extends Condition { + + public static final String TYPE = "Ready"; + private static final String MESSAGE_TEMPLATE = "Server %s is ready"; + + public ServerReadyCondition(String name) { + super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name)); + } +} diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerRunningCondition.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerRunningCondition.java new file mode 100644 index 0000000..e97f53a --- /dev/null +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerRunningCondition.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.operator.api.model.status; + +public class ServerRunningCondition extends Condition { + public static final String TYPE = "Running"; + public static final String MESSAGE_TEMPLATE = "Server %s is running"; + + public ServerRunningCondition(String name) { + super(TYPE, Condition.TRUE, MESSAGE_TEMPLATE.formatted(name)); + } +} diff --git a/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerStoppedCondition.java b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerStoppedCondition.java new file mode 100644 index 0000000..e749b8b --- /dev/null +++ b/debezium-operator-api/src/main/java/io/debezium/operator/api/model/status/ServerStoppedCondition.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.operator.api.model.status; + +public class ServerStoppedCondition extends Condition { + public static final String TYPE = "Running"; + public static final String MESSAGE_TEMPLATE = "Server %s is stopped"; + + public ServerStoppedCondition(String name) { + super(TYPE, Condition.FALSE, MESSAGE_TEMPLATE.formatted(name)); + } +} diff --git a/debezium-operator-core/src/main/java/io/debezium/operator/core/DebeziumServerReconciler.java b/debezium-operator-core/src/main/java/io/debezium/operator/core/DebeziumServerReconciler.java index 0377e90..7ce7af1 100644 --- a/debezium-operator-core/src/main/java/io/debezium/operator/core/DebeziumServerReconciler.java +++ b/debezium-operator-core/src/main/java/io/debezium/operator/core/DebeziumServerReconciler.java @@ -12,6 +12,10 @@ import io.debezium.operator.api.model.DebeziumServer; import io.debezium.operator.api.model.status.Condition; import io.debezium.operator.api.model.status.DebeziumServerStatus; +import io.debezium.operator.api.model.status.ServerNotReadyCondition; +import io.debezium.operator.api.model.status.ServerReadyCondition; +import io.debezium.operator.api.model.status.ServerRunningCondition; +import io.debezium.operator.api.model.status.ServerStoppedCondition; import io.debezium.operator.commons.OperatorConstants; import io.debezium.operator.core.dependent.ConfigMapDependent; import io.debezium.operator.core.dependent.DeploymentDependent; @@ -71,28 +75,17 @@ public UpdateControl reconcile(DebeziumServer debeziumServer, Co else { var delay = Duration.ofSeconds(10); Log.infof("Server %s not ready yet, rescheduling after %ds", name, delay.toSeconds()); - initializeNotReadyStatus(debeziumServer); + initializeConditions(debeziumServer, new ServerNotReadyCondition(name)); return UpdateControl.patchStatus(debeziumServer).rescheduleAfter(delay); } }).orElseThrow(); } private void initializeReadyStatus(DebeziumServer debeziumServer) { - var condition = new Condition(); - condition.setType("Ready"); - condition.setStatus("True"); - condition.setMessage("Server %s is ready".formatted(debeziumServer.getMetadata().getName())); - - initializeConditions(debeziumServer, condition); - } - - private void initializeNotReadyStatus(DebeziumServer debeziumServer) { - var condition = new Condition(); - condition.setType("Ready"); - condition.setStatus("False"); - condition.setMessage("Server %s deployment in progress".formatted(debeziumServer.getMetadata().getName())); - - initializeConditions(debeziumServer, condition); + var name = debeziumServer.getMetadata().getName(); + var ready = new ServerReadyCondition(name); + var running = debeziumServer.isStopped() ? new ServerStoppedCondition(name) : new ServerRunningCondition(name); + initializeConditions(debeziumServer, ready, running); } private void initializeConditions(DebeziumServer debeziumServer, Condition... conditions) { diff --git a/debezium-operator-core/src/main/java/io/debezium/operator/core/dependent/DeploymentDependent.java b/debezium-operator-core/src/main/java/io/debezium/operator/core/dependent/DeploymentDependent.java index 6cdefe5..89c1934 100644 --- a/debezium-operator-core/src/main/java/io/debezium/operator/core/dependent/DeploymentDependent.java +++ b/debezium-operator-core/src/main/java/io/debezium/operator/core/dependent/DeploymentDependent.java @@ -14,6 +14,7 @@ import jakarta.inject.Inject; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; import io.debezium.operator.api.model.CommonLabels; import io.debezium.operator.api.model.DebeziumServer; @@ -79,6 +80,9 @@ public class DeploymentDependent extends CRUDKubernetesDependentResource con var templates = runtime.getTemplates(); var name = primary.getMetadata().getName(); var labels = CommonLabels.serverComponent(name).getMap(); + var primaryLabels = primary.getMetadata().getLabels(); var annotations = Map.of(CONFIG_MD5_ANNOTATION, primary.asConfiguration().md5Sum()); + var replicas = desiredReplicas(primary); var sa = ServiceAccountDependent.serviceAccountNameFor(primary); @@ -98,7 +104,8 @@ protected Deployment desired(DebeziumServer primary, Context con var pod = new PodTemplateSpecBuilder() .withMetadata(new ObjectMetaBuilder() - .withLabels(labels) + .withLabels(primaryLabels) + .addToLabels(labels) .withAnnotations(annotations) .build()) .withSpec(new PodSpecBuilder() @@ -122,10 +129,12 @@ protected Deployment desired(DebeziumServer primary, Context con .withMetadata(new ObjectMetaBuilder() .withNamespace(primary.getMetadata().getNamespace()) .withName(name) - .withLabels(labels) + .withLabels(primaryLabels) + .addToLabels(labels) .withAnnotations(annotations) .build()) .withSpec(new DeploymentSpecBuilder() + .withReplicas(replicas) .withSelector(new LabelSelectorBuilder() .addToMatchLabels(labels) .build()) @@ -134,6 +143,10 @@ protected Deployment desired(DebeziumServer primary, Context con .build(); } + private int desiredReplicas(DebeziumServer primary) { + return primary.isStopped() ? 0 : 1; + } + private void addMetricConfigurationToPod(DebeziumServer primary, PodTemplateSpec pod) { var metrics = primary.getSpec().getRuntime().getMetrics(); var jmxExporter = metrics.getJmxExporter();