diff --git a/Dockerfile b/Dockerfile index 2847fe02..7bc57144 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM --platform=linux/amd64 registry.fedoraproject.org/fedora-minimal:40 -RUN microdnf install -y curl findutils gzip iputils less postgresql procps shadow-utils tar time which \ +RUN microdnf install -y curl findutils gzip hostname iputils less postgresql procps shadow-utils tar time which \ && microdnf clean all RUN GRPC_HEALTH_PROBE_VERSION=v0.4.23 \ @@ -19,7 +19,9 @@ WORKDIR /home/main RUN mkdir -p /home/main/secrets VOLUME /home/main/secrets/tls VOLUME /home/main/secrets/paseto +VOLUME /home/main/varlib +COPY release/client_linux_amd64 client COPY release/server_linux_amd64 server COPY migrations migrations COPY entrypoint.sh entrypoint.sh diff --git a/Makefile b/Makefile index 52fc95c3..95cd5655 100644 --- a/Makefile +++ b/Makefile @@ -21,12 +21,18 @@ PROTO_FILES := $(shell find internal/pb/ -type f -name '*.proto') MIGRATE_DIR := ./migrations SERVICE := $(PROJECT).server +K8S_NS := dateilager +K8S_CTX := docker-desktop +KC := kubectl --context $(K8S_CTX) -n $(K8S_NS) + .PHONY: install migrate migrate-create clean build lint release .PHONY: test test-one test-fuzz test-js lint-js build-js .PHONY: reset-db setup-local server server-profile install-js .PHONY: client-update client-large-update client-get client-rebuild client-rebuild-with-cache .PHONY: client-getcache client-gc-contents client-gc-project client-gc-random-projects -.PHONY: health upload-container-image run-container gen-docs +.PHONY: start-agent gen-docs health +.PHONY: upload-container-image upload-prerelease-container-image build-local-container run-container +.PHONY: setup-k8s reset-k8s deploy-k8s deploy-k8s-sandbox .PHONY: load-test-new load-test-get load-test-update install: @@ -189,6 +195,14 @@ client-gc-random-projects: export DL_SKIP_SSL_VERIFICATION=1 client-gc-random-projects: go run cmd/client/main.go gc --host $(GRPC_HOST) --mode random-projects --sample 25 --keep 1 +start-agent: export DL_TOKEN=$(DEV_TOKEN_ADMIN) +start-agent: export DL_SKIP_SSL_VERIFICATION=1 +start-agent: + go run cmd/client/main.go agent --host $(GRPC_HOST) --dir /tmp/dl_agent + +gen-docs: + go run cmd/gen-docs/main.go + health: grpc-health-probe -addr $(GRPC_SERVER) grpc-health-probe -addr $(GRPC_SERVER) -service $(SERVICE) @@ -206,12 +220,32 @@ upload-prerelease-container-image: release docker build -t gcr.io/gadget-core-production/dateilager:$(GIT_COMMIT) . docker push gcr.io/gadget-core-production/dateilager:$(GIT_COMMIT) -run-container: release - docker build -t dl-local:latest . - docker run --rm -it -p 127.0.0.1:$(GRPC_PORT):$(GRPC_PORT)/tcp -v ./development:/home/main/secrets/tls -v ./development:/home/main/secrets/paseto dl-local:latest $(GRPC_PORT) "postgres://$(DB_USER):$(DB_PASS)@host.docker.internal:5432" dl +build-local-container: release + docker build -t local/dateilager:latest . -gen-docs: - go run cmd/gen-docs/main.go +run-container: build-local-container + docker run --rm -it -p 127.0.0.1:$(GRPC_PORT):$(GRPC_PORT)/tcp -v ./development:/home/main/secrets/tls -v ./development:/home/main/secrets/paseto local/dateilager:latest $(GRPC_PORT) "postgres://$(DB_USER):$(DB_PASS)@host.docker.internal:5432" dl + +setup-k8s: + kubectl --context $(K8S_CTX) apply -f k8s/namespace.yaml + $(KC) create secret tls dl-tls-secret --cert=development/server.crt --key=development/server.key + $(KC) create secret generic dl-paseto-secret --from-file=paseto.pub=development/paseto.pub + $(KC) create secret generic dl-app-secrets --from-literal="DATABASE_URL=postgres://$(DB_USER):$(DB_PASS)@host.docker.internal:5432/dl" + $(KC) create secret generic dl-agent-secrets --from-literal="DL_TOKEN=$(DEV_TOKEN_ADMIN)" + +reset-k8s: + $(KC) delete service --ignore-not-found dl-agent + $(KC) delete daemonset --ignore-not-found dl-agent + $(KC) delete service --ignore-not-found dl-headless + $(KC) delete deployment --ignore-not-found dl-server + $(KC) delete deployment --ignore-not-found dl-sandbox + +deploy-k8s: build-local-container reset-k8s + $(KC) apply -f k8s/server.yaml + $(KC) apply -f k8s/agent.yaml + +deploy-k8s-sandbox: + $(KC) apply -f k8s/sandbox.yaml define load-test ghz --cert=development/server.crt --key=development/server.key \ diff --git a/internal/files/writer.go b/internal/files/writer.go index 5425d046..41b5a059 100644 --- a/internal/files/writer.go +++ b/internal/files/writer.go @@ -90,7 +90,7 @@ func writeObject(rootDir string, cacheObjectsDir string, reader *db.TarReader, h return err } hashHex := hex.EncodeToString(content) - return hardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path) + return HardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path) case tar.TypeReg: dir := filepath.Dir(path) @@ -193,7 +193,7 @@ func makeSymlink(oldname, newname string) error { return nil } -func hardlinkDir(olddir, newdir string) error { +func HardlinkDir(olddir, newdir string) error { if fileExists(newdir) { err := os.RemoveAll(newdir) if err != nil { diff --git a/k8s/agent.yaml b/k8s/agent.yaml new file mode 100644 index 00000000..843ccf98 --- /dev/null +++ b/k8s/agent.yaml @@ -0,0 +1,76 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: dl-agent + labels: + app: dl-agent +spec: + selector: + matchLabels: + app: dl-agent + template: + metadata: + name: dl-agent + labels: + app: dl-agent + spec: + containers: + - name: agent + image: local/dateilager:latest + imagePullPolicy: Never + securityContext: # FIXME + allowPrivilegeEscalation: false + runAsUser: 0 + command: ["./client"] + args: + [ + "agent", + "--host=dl-headless.dateilager.svc.cluster.local", + "--dir=/tmp/dl_cache_k8s", + "--log-level=info", + "--log-encoding=json", + ] + ports: + - name: http + containerPort: 8080 + protocol: TCP + readinessProbe: + httpGet: + path: /healthz + port: 8080 + initialDelaySeconds: 3 + livenessProbe: + httpGet: + path: /healthz + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 2 + failureThreshold: 2 + env: + - name: DL_SKIP_SSL_VERIFICATION + value: "1" + envFrom: + - secretRef: + name: dl-agent-secrets + volumeMounts: + - name: varlib + mountPath: /home/main/varlib + volumes: + - name: varlib + hostPath: + path: /var/lib +--- +apiVersion: v1 +kind: Service +metadata: + name: dl-agent + labels: + app: dl-agent +spec: + internalTrafficPolicy: Local + selector: + app: dl-agent + ports: + - name: http + port: 8080 + targetPort: 8080 diff --git a/k8s/namespace.yaml b/k8s/namespace.yaml new file mode 100644 index 00000000..8f609a1c --- /dev/null +++ b/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: dateilager diff --git a/k8s/sandbox.yaml b/k8s/sandbox.yaml new file mode 100644 index 00000000..bda6eb3f --- /dev/null +++ b/k8s/sandbox.yaml @@ -0,0 +1,40 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dl-sandbox + labels: + app: dl-sandbox +spec: + replicas: 1 + selector: + matchLabels: + app: dl-sandbox + template: + metadata: + name: dl-sandbox + labels: + app: dl-sandbox + spec: + containers: + - name: sandbox + image: local/dateilager:latest + imagePullPolicy: Never + command: ["bash", "-c", "--"] + args: ["while true; do sleep 30; done;"] + # command: ["bash"] + # args: + # [ + # "-c", + # 'curl -XPOST -H ''Content-Type: application/json'' -d "{\"uid\":\"${K8S_CONTAINER_ID}\", \"volume\": \"appdir\"}" dl-agent.dateilager.svc.cluster.local:8080/link_cache', + # ] + env: + - name: K8S_CONTAINER_ID + valueFrom: + fieldRef: + fieldPath: metadata.uid + volumeMounts: + - name: appdir + mountPath: /tmp/appdir + volumes: + - name: appdir + emptyDir: {} diff --git a/k8s/server.yaml b/k8s/server.yaml new file mode 100644 index 00000000..71676a76 --- /dev/null +++ b/k8s/server.yaml @@ -0,0 +1,94 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dl-server + labels: + app: dl-server +spec: + replicas: 1 + selector: + matchLabels: + app: dl-server + template: + metadata: + name: dl-server + labels: + app: dl-server + spec: + containers: + - name: server + image: local/dateilager:latest + imagePullPolicy: Never + command: ["./server"] + args: + [ + "--log-level=info", + "--log-encoding=json", + "--port=5051", + "--dburi=$(DATABASE_URL)", + "--cert=secrets/tls/tls.crt", + "--key=secrets/tls/tls.key", + "--paseto=secrets/paseto/paseto.pub", + ] + ports: + - name: api + containerPort: 5051 + protocol: TCP + readinessProbe: + exec: + command: + [ + "/bin/grpc_health_probe", + "-addr=:5051", + "-service=dateilager.server", + "-tls", + "-tls-no-verify", + ] + initialDelaySeconds: 1 + livenessProbe: + exec: + command: + [ + "/bin/grpc_health_probe", + "-addr=:5051", + "-service=dateilager.server", + "-tls", + "-tls-no-verify", + ] + initialDelaySeconds: 2 + periodSeconds: 2 + failureThreshold: 2 + env: + - name: DL_ENV + value: "dev" + envFrom: + - secretRef: + name: dl-app-secrets + volumeMounts: + - mountPath: "/home/main/secrets/tls" + name: tls-secret + - mountPath: "/home/main/secrets/paseto" + name: paseto-secret + volumes: + - name: tls-secret + secret: + secretName: dl-tls-secret + - name: paseto-secret + secret: + secretName: dl-paseto-secret +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: dl-headless + name: dl-headless +spec: + clusterIP: None + selector: + app: dl-server + ports: + - name: grpc + protocol: TCP + port: 5051 + targetPort: 5051 diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go new file mode 100644 index 00000000..6f7566b6 --- /dev/null +++ b/pkg/agent/agent.go @@ -0,0 +1,81 @@ +package agent + +import ( + "context" + "encoding/json" + "net" + "net/http" + "path/filepath" + "strconv" + + "github.com/gadget-inc/dateilager/internal/files" + "github.com/gadget-inc/dateilager/internal/logger" + "go.uber.org/zap" +) + +type Agent struct { + varlibDir string + cacheDir string + port int +} + +func NewAgent(cacheDir, varlibDir string, port int) Agent { + return Agent{varlibDir, cacheDir, port} +} + +func (a *Agent) Server(ctx context.Context) *http.Server { + http.HandleFunc("GET /healthz", a.healthCheck) + http.HandleFunc("POST /link_cache", a.linkCache) + + server := &http.Server{ + Addr: ":" + strconv.Itoa(a.port), + BaseContext: func(net.Listener) context.Context { return ctx }, + } + return server +} + +type healthStatus struct { + Status string `json:"status"` +} + +func (a *Agent) healthCheck(resp http.ResponseWriter, req *http.Request) { + err := json.NewEncoder(resp).Encode(healthStatus{Status: "OK"}) + if err != nil { + httpErr(req.Context(), resp, err, "failed to encode status") + return + } +} + +type linkRequest struct { + Uid string `json:"uid"` + Volume string `json:"volume"` +} + +func (a *Agent) linkCache(resp http.ResponseWriter, req *http.Request) { + linkReq := linkRequest{} + err := json.NewDecoder(req.Body).Decode(&linkReq) + if err != nil { + httpReqErr(req.Context(), resp, err, "failed to decode link request") + return + } + + volumePath := filepath.Join(a.varlibDir, "kubelet/pods", linkReq.Uid, "volumes/kubernetes.io~projected", linkReq.Volume) + + err = files.HardlinkDir(a.cacheDir, filepath.Join(volumePath, "dl_cache")) + if err != nil { + httpErr(req.Context(), resp, err, "failed to link cache director") + return + } + + resp.WriteHeader(http.StatusCreated) +} + +func httpReqErr(ctx context.Context, resp http.ResponseWriter, err error, message string) { + logger.Warn(ctx, message, zap.Error(err)) + http.Error(resp, err.Error(), http.StatusBadRequest) +} + +func httpErr(ctx context.Context, resp http.ResponseWriter, err error, message string) { + logger.Error(ctx, message, zap.Error(err)) + http.Error(resp, err.Error(), http.StatusInternalServerError) +} diff --git a/pkg/cli/agent.go b/pkg/cli/agent.go new file mode 100644 index 00000000..481e7bc8 --- /dev/null +++ b/pkg/cli/agent.go @@ -0,0 +1,86 @@ +package cli + +import ( + "context" + "errors" + "os" + "os/signal" + "syscall" + "time" + + "github.com/gadget-inc/dateilager/internal/key" + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/pkg/agent" + "github.com/gadget-inc/dateilager/pkg/client" + "github.com/spf13/cobra" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func NewCmdAgent() *cobra.Command { + var ( + dir string + port int + ) + + cmd := &cobra.Command{ + Use: "agent", + RunE: func(cmd *cobra.Command, _ []string) error { + ctx := cmd.Context() + c := client.FromContext(ctx) + + // Wait for the server to be available + version := int64(-1) + var err error + + for i := 0; i < 10; i++ { + version, err = c.GetCache(ctx, dir) + if err != nil { + parentErr := errors.Unwrap(err) + statusErr, ok := status.FromError(parentErr) + if ok { + if statusErr.Code() == codes.Unavailable { + time.Sleep(500 * time.Millisecond) + continue + } + } + return err + } + break + } + + logger.Info(ctx, "cache downloaded", key.Version.Field(version), key.Directory.Field(dir)) + + a := agent.NewAgent("/home/main/varlib", dir, port) + + backgroundCtx, cancel := context.WithCancel(ctx) + server := a.Server(backgroundCtx) + + osSignals := make(chan os.Signal, 1) + signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM) + + go func() { + <-osSignals + logger.Info(ctx, "received interrupt signal") + + cancel() + + err := server.Shutdown(ctx) + if err != nil { + logger.Error(ctx, "error shutting down server", zap.Error(err)) + } + }() + + logger.Info(ctx, "start agent", zap.Int("port", port), key.Directory.Field(dir)) + return server.ListenAndServe() + }, + } + + cmd.Flags().StringVar(&dir, "dir", "", "Cache directory") + cmd.Flags().IntVar(&port, "port", 8080, "API server port") + + _ = cmd.MarkFlagRequired("path") + + return cmd +} diff --git a/pkg/cli/client.go b/pkg/cli/client.go index 9b74b308..094de887 100644 --- a/pkg/cli/client.go +++ b/pkg/cli/client.go @@ -124,6 +124,7 @@ func NewClientCommand() *cobra.Command { cmd.AddCommand(NewCmdUpdate()) cmd.AddCommand(NewCmdGc()) cmd.AddCommand(NewCmdGetCache()) + cmd.AddCommand(NewCmdAgent()) return cmd }