Skip to content

Commit

Permalink
Add dc_util
Browse files Browse the repository at this point in the history
  • Loading branch information
WalBeh committed Nov 28, 2024
1 parent 80a90f2 commit 2327ca8
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 6 deletions.
12 changes: 7 additions & 5 deletions crate/operator/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,13 @@ def get_statefulset_containers(
command=[
"/bin/sh",
"-c",
"ARCH=$(uname -m) && "
"curl -O "
"https://cdn.crate.io/downloads/dc_util_$ARCH && " # noqa
"chmod u+x ./dc_util_$ARCH && \n"
"./dc_util_$ARCH -min-availability PRIMARIES "
"curl -sLO "
"https://raw.githubusercontent.com/crate/crate-operator/master/dc_util && " # noqa
"curl -sLO "
"https://raw.githubusercontent.com/crate/crate-operator/master/dc_util && " # noqa
"sha256sum -c dc_util.sha256 && "
"chmod u+x ./dc_util && \n"
"./dc_util -min-availability PRIMARIES "
f"-timeout {DECOMMISSION_TIMEOUT}",
]
)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1641,7 +1641,7 @@ def test_get_cluster_resource_limits(node_spec, expected_limits_cpu):

@pytest.mark.asyncio
async def test_download_dc_util():
url = "https://cdn.crate.io/downloads/dc_util_x86_64"
url = "https://raw.githubusercontent.com/crate/crate-operator/master/dc_util"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
assert response.status == 200, f"Expected status 200, got {response.status}"
87 changes: 87 additions & 0 deletions utils/dc_util/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Rolling restart with `alter cluster decommission`

While working on a cloud issue a small tool was created
to not only _terminate_ a POD by`kubelet` sending a SIGTERM, but by having the ability
to use a preStop Hook and issue a `alter cluster decommission` for that node.

# What does the tool do?

First the decommission settings are configured for the cluster. We assume that
we always want to _force_ decommission - in terms of: If cratedb would come to the
decision that the decommission failed, it would roll it back. In context of terminating
the POD/process in kubernetes, the shutdown cannot be canceled - therefore _force_ is set
on cratedb side.

Before doing that, the STS is checked for the number of replicas configured. This is done
to figure out whether a FULL stop of all PODS in the cratedb Cluster is _scheduled_. In
case of a FULL restart there is **NO** decommission sent to the cluster and the k8s shutdown
continues by sending `SIGTERM`.

For having access to the number of replicas on the sts, additional permission need to be granted
to the ServiceAccount:

```yaml
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "list", "watch"]
```
This needs to be created/setup manually, or by the crate-operator.
When the decommission is sent to the cluster the command almost immediately returns. Nevertheless
cratedb started the decommissioning in the background andwe need to wait until cratedb
exit's.
After that control is _returned_ to`kubelet` which continues by sending SIGTERM.
`Termination Grace Period` needs to be set longer than the decomission timeout, as
kubelet is monitoring this timer and would eventually assume the preStop process _hangs_
and continue with _TERMINATING_ the containers/POD.

# How to configure it?
The preStop Hook needs to be configured on the _Statefulset_ by adding something like this
to the cratedb containers configuration:

```yaml
image: crate:5.8.3
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- |
curl -sLO https://raw.githubusercontent.com/crate/crate-operator/master/dc_util && \
curl -sLO https://raw.githubusercontent.com/crate/crate-operator/master/dc_util.sha256 && \
sha256sum -c dc_util.sha256 && \
chmod u+x ./dc_util && \
./dc_util -min-availability PRIMARIES
terminationGracePeriodSeconds: 7230
```

In this example the binary is loaded from the GH repo as part of the PODs termination process. In case one of this commands fails, `kubelet` continues with the SIGTERM immediately.

Upload the binary to the CDN: `scp ./dc_util root@web:/mnt/data/www/cdn.crate.io/downloads`. It seems the STDOUT messages written in the `prestop` hook do not
end up in the PODs log, which requires to check the cratedb Logs.

# How to build it?

```shell
GOOS=linux go build -o dc_util dc_util.go
shasum -a 256 dc_util
```

This builds the binary for `Linux` in case you are building it on MacOS. For convenience and to test locally - without running cratedb - as tiny `http_server` is available.

# Command Line
There are a bunch of CLI parameters that can be set to fine-tune the behavior. Most
are used for testing purpose:

| Paramter | setting |
| --------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--crate-node-prefix` | allows to customize the cratedb node names in the statefulset in case it is not the default `data-hot`. This is not to be confused with the _hostname_! |
| `--timemout` | crateDBs default timeout is 7200s - this needs to be correlated to `TerminationGracePeriod` |
| `--pid` | For testing locally only |
| `--hostname` | Is used to derive the name of the kubernetes statefulset, the _replica number_ of the pod is _stripped_ from it, which returns the sts name. eg. `crate-data-hot-eadf76b5-c634-4f0f-abcc-7442d01cb7dd-0 -> crate-data-hot-eadf76b5-c634-4f0f-abcc-7442d01cb7dd` |
| `--min-availability` | Either `PRIMARIES`or `FULL`. Please refer to the crateDB documentation. |
216 changes: 216 additions & 0 deletions utils/dc_util/dc_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package main

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"syscall"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
defaultCrateNodePrefix = "data-hot"
defaultPID = 1
defaultProto = "https"
defaultMinAvailability = "FULL"
defaultTimeout = "7200s"
)

func sendSQLStatement(proto, stmt string) error {
payload := map[string]string{"stmt": stmt}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal JSON payload: %w", err)
}
log.Printf("Payload: %s", string(payloadBytes))

// Create an HTTP client with TLS configuration to skip certificate verification
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
// Make the HTTP POST request
url := fmt.Sprintf("%s://127.0.0.1:4200/_sql", proto)
resp, err := httpClient.Post(url, "application/json", bytes.NewBuffer(payloadBytes))
if err != nil {
return fmt.Errorf("failed to make HTTP POST request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP request failed with status: %s", resp.Status)
}

// Read and print the response body
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
log.Printf("Response from server: %s", string(respBody))

return nil
}

func isProcessRunning(pid int) bool {
err := syscall.Kill(pid, syscall.Signal(0))
return err == nil
}

func getNamespace(inCluster bool, kubeconfig clientcmd.ClientConfig) (string, error) {
if inCluster {
namespaceBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", fmt.Errorf("failed to read namespace: %w", err)
}
return strings.TrimSpace(string(namespaceBytes)), nil
}
namespace, _, err := kubeconfig.Namespace()
if err != nil {
return "", fmt.Errorf("failed to get namespace from kubeconfig: %w", err)
}
return namespace, nil
}

func run() error {
log.SetPrefix("Decommissioner: ")
log.SetOutput(os.Stdout)

envHostname := os.Getenv("HOSTNAME")

var (
crateNodePrefix string
decommissionTimeout string
pid int
proto string
hostname string
minAvailability string
)

flag.StringVar(&crateNodePrefix, "crate-node-prefix", defaultCrateNodePrefix, "Prefix of the CrateDB node name")
flag.StringVar(&decommissionTimeout, "timeout", defaultTimeout, "Timeout for decommission statement in seconds")
flag.IntVar(&pid, "pid", defaultPID, "PID of the process to check")
flag.StringVar(&proto, "proto", defaultProto, "Protocol to use for the HTTP server")
flag.StringVar(&hostname, "hostname", envHostname, "Hostname of the pod")
flag.StringVar(&minAvailability, "min-availability", defaultMinAvailability, "Minimum availability during decommission (FULL/PRIMARIES)")
flag.Parse()

if hostname == "" {
return fmt.Errorf("hostname is required")
}

// Determine if we are running in-cluster or using kubeconfig
kubeconfigPath := os.Getenv("KUBECONFIG")
inClusterConfig := kubeconfigPath == ""

var (
config *rest.Config
kubeconfig clientcmd.ClientConfig
err error
)

if inClusterConfig {
log.Println("Using in-cluster configuration")
config, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to create in-cluster config: %w", err)
}
} else {
log.Printf("Using kubeconfig from %s", kubeconfigPath)
kubeconfig = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
config, err = kubeconfig.ClientConfig()
if err != nil {
return fmt.Errorf("failed to load kubeconfig: %w", err)
}
}

// Create a Kubernetes client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

// Get the namespace
namespace, err := getNamespace(inClusterConfig, kubeconfig)
if err != nil {
return err
}

// Construct the StatefulSet name from HOSTNAME
hostnameParts := strings.Split(hostname, "-")
if len(hostnameParts) < 2 {
return fmt.Errorf("invalid HOSTNAME format: %s", hostname)
}
statefulSetName := strings.Join(hostnameParts[:len(hostnameParts)-1], "-")

ctx := context.Background()
statefulSet, err := clientset.AppsV1().StatefulSets(namespace).Get(ctx, statefulSetName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get StatefulSet: %w", err)
}
replicas := int(*statefulSet.Spec.Replicas)

log.Printf("StatefulSet has %d replicas configured", replicas)

time.Sleep(2 * time.Second) // Sleep to catch up with the replica settings

if replicas > 0 {
podNumber := hostnameParts[len(hostnameParts)-1]

// Send the SQL statements to decommission the node
log.Printf("Decommissioning node %s with graceful_stop.timeout of %s", podNumber, decommissionTimeout)

statements := []string{
fmt.Sprintf(`set global transient "cluster.graceful_stop.timeout" = '%s';`, decommissionTimeout),
`set global transient "cluster.graceful_stop.force" = True;`,
fmt.Sprintf(`set global transient "cluster.graceful_stop.min_availability"='%s';`, minAvailability),
fmt.Sprintf(`alter cluster decommission '%s-%s'`, crateNodePrefix, podNumber),
}

for _, stmt := range statements {
if err := sendSQLStatement(proto, stmt); err != nil {
return err
}
}

log.Println("Decommission command sent successfully")

// Loop to check if the process is running
counter := 0
for isProcessRunning(pid) {
if counter%10 == 0 {
log.Printf("Process %d is still running (check count: %d)", pid, counter)
}
counter++
time.Sleep(2 * time.Second)
}

log.Printf("Process %d has stopped", pid)
} else {
log.Printf("No replicas are configured -- Skipping decommission")
}

return nil
}

func main() {
if err := run(); err != nil {
log.Fatalf("Error: %v", err)
}
}
50 changes: 50 additions & 0 deletions utils/dc_util/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module decommission_util

go 1.23.2

require (
k8s.io/apimachinery v0.31.2
k8s.io/client-go v0.31.2
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
Loading

0 comments on commit 2327ca8

Please sign in to comment.