From bf6864395c47ffa63af623fdf920aa956d5c1c90 Mon Sep 17 00:00:00 2001 From: Alex Angelini Date: Sun, 5 May 2024 15:06:28 +0200 Subject: [PATCH] Add DL agent --- Makefile | 4 +++ internal/files/writer.go | 4 +-- pkg/agent/agent.go | 78 ++++++++++++++++++++++++++++++++++++++++ pkg/cli/agent.go | 67 ++++++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 pkg/agent/agent.go create mode 100644 pkg/cli/agent.go diff --git a/Makefile b/Makefile index 52fc95c3..2ec1e149 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ SERVICE := $(PROJECT).server .PHONY: reset-db setup-local server server-profile install-js .PHONY: client-update client-large-update client-get client-rebuild client-rebuild-with-cache .PHONY: client-getcache client-gc-contents client-gc-project client-gc-random-projects +.PHONY: start-agent .PHONY: health upload-container-image run-container gen-docs .PHONY: load-test-new load-test-get load-test-update @@ -189,6 +190,9 @@ client-gc-random-projects: export DL_SKIP_SSL_VERIFICATION=1 client-gc-random-projects: go run cmd/client/main.go gc --host $(GRPC_HOST) --mode random-projects --sample 25 --keep 1 +start-agent: + go run cmd/client/main.go agent --dir /tmp/dl_agent + health: grpc-health-probe -addr $(GRPC_SERVER) grpc-health-probe -addr $(GRPC_SERVER) -service $(SERVICE) diff --git a/internal/files/writer.go b/internal/files/writer.go index 5425d046..41b5a059 100644 --- a/internal/files/writer.go +++ b/internal/files/writer.go @@ -90,7 +90,7 @@ func writeObject(rootDir string, cacheObjectsDir string, reader *db.TarReader, h return err } hashHex := hex.EncodeToString(content) - return hardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path) + return HardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path) case tar.TypeReg: dir := filepath.Dir(path) @@ -193,7 +193,7 @@ func makeSymlink(oldname, newname string) error { return nil } -func hardlinkDir(olddir, newdir string) error { +func HardlinkDir(olddir, newdir string) error { if fileExists(newdir) { err := os.RemoveAll(newdir) if err != nil { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go new file mode 100644 index 00000000..33386dee --- /dev/null +++ b/pkg/agent/agent.go @@ -0,0 +1,78 @@ +package agent + +import ( + "context" + "encoding/json" + "net" + "net/http" + "path/filepath" + "strconv" + + "github.com/gadget-inc/dateilager/internal/files" + "github.com/gadget-inc/dateilager/internal/logger" + "go.uber.org/zap" +) + +type Agent struct { + cacheDir string + port int +} + +func NewAgent(cacheDir string, port int) Agent { + return Agent{cacheDir, port} +} + +func (a *Agent) Server(ctx context.Context) *http.Server { + http.HandleFunc("GET /healthz", a.healthCheck) + http.HandleFunc("POST /link_cache", a.linkCache) + + server := &http.Server{ + Addr: ":" + strconv.Itoa(a.port), + BaseContext: func(net.Listener) context.Context { return ctx }, + } + return server +} + +type healthStatus struct { + Status string `json:"status"` +} + +func (a *Agent) healthCheck(resp http.ResponseWriter, req *http.Request) { + err := json.NewEncoder(resp).Encode(healthStatus{Status: "OK"}) + if err != nil { + httpErr(req.Context(), resp, err, "failed to encode status") + return + } + resp.WriteHeader(http.StatusOK) +} + +type linkRequest struct { + Dir string `json:"dir"` +} + +func (a *Agent) linkCache(resp http.ResponseWriter, req *http.Request) { + linkReq := linkRequest{} + err := json.NewDecoder(req.Body).Decode(&linkReq) + if err != nil { + httpReqErr(req.Context(), resp, err, "failed to decode link request") + return + } + + err = files.HardlinkDir(a.cacheDir, filepath.Join(linkReq.Dir, "cache")) + if err != nil { + httpErr(req.Context(), resp, err, "failed to link cache director") + return + } + + resp.WriteHeader(http.StatusCreated) +} + +func httpReqErr(ctx context.Context, resp http.ResponseWriter, err error, message string) { + logger.Warn(ctx, message, zap.Error(err)) + http.Error(resp, err.Error(), http.StatusBadRequest) +} + +func httpErr(ctx context.Context, resp http.ResponseWriter, err error, message string) { + logger.Error(ctx, message, zap.Error(err)) + http.Error(resp, err.Error(), http.StatusInternalServerError) +} diff --git a/pkg/cli/agent.go b/pkg/cli/agent.go new file mode 100644 index 00000000..447682a2 --- /dev/null +++ b/pkg/cli/agent.go @@ -0,0 +1,67 @@ +package cli + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/gadget-inc/dateilager/internal/key" + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/pkg/agent" + "github.com/gadget-inc/dateilager/pkg/client" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func NewCmdAgent() *cobra.Command { + var ( + dir string + port int + ) + + cmd := &cobra.Command{ + Use: "agent", + RunE: func(cmd *cobra.Command, _ []string) error { + ctx := cmd.Context() + c := client.FromContext(ctx) + + version, err := c.GetCache(ctx, dir) + if err != nil { + return err + } + + logger.Info(ctx, "cache downloaded", key.Version.Field(version), key.Directory.Field(dir)) + + a := agent.NewAgent(dir, port) + + backgroundCtx, cancel := context.WithCancel(ctx) + server := a.Server(backgroundCtx) + + osSignals := make(chan os.Signal, 1) + signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM) + + go func() { + <-osSignals + logger.Info(ctx, "received interrupt signal") + + cancel() + + err := server.Shutdown(ctx) + if err != nil { + logger.Error(ctx, "error shutting down server", zap.Error(err)) + } + }() + + logger.Info(ctx, "start agent", zap.Int("port", port), key.Directory.Field(dir)) + return server.ListenAndServe() + }, + } + + cmd.Flags().StringVar(&dir, "dir", "", "Cache directory") + cmd.Flags().IntVar(&port, "port", 8080, "API server port") + + _ = cmd.MarkFlagRequired("path") + + return cmd +}