From 7ee43f7b16852120fb38a2e0e20f0b93302acd77 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Sat, 4 May 2024 20:06:20 -0400 Subject: [PATCH] Implement a cache daemon that can move the cache into place quickly on demand - a long running background process we'll deploy as a daemonset on k8s nodes - moves the cache into place via hardlinking the golden copy from a shared location - operates using the host filesystem, and thus has pretty priviledged access --- Contributing.md | 4 + Dockerfile | 18 +-- Makefile | 43 +++++-- cmd/cached/main.go | 7 ++ development/paseto.key | 3 + development/paseto.pub | 4 +- flake.nix | 2 + internal/key/key.go | 14 +++ internal/pb/cache.pb.go | 216 +++++++++++++++++++++++++++++++++++ internal/pb/cache.proto | 14 +++ internal/pb/cache_grpc.pb.go | 109 ++++++++++++++++++ internal/testutil/context.go | 5 +- js/spec/util.ts | 2 +- js/src/pb/cache_pb.client.ts | 39 +++++++ js/src/pb/cache_pb.ts | 133 +++++++++++++++++++++ pkg/api/cached.go | 106 +++++++++++++++++ pkg/cached/cached.go | 69 +++++++++++ pkg/cli/cachedaemon.go | 169 +++++++++++++++++++++++++++ pkg/cli/client.go | 1 + pkg/cli/populatediskcache.go | 30 +++++ pkg/client/client.go | 25 +++- test/cached_test.go | 104 +++++++++++++++++ test/client_rebuild_test.go | 4 +- test/shared_test.go | 34 +++++- 24 files changed, 1121 insertions(+), 34 deletions(-) create mode 100644 cmd/cached/main.go create mode 100644 development/paseto.key create mode 100644 internal/pb/cache.pb.go create mode 100644 internal/pb/cache.proto create mode 100644 internal/pb/cache_grpc.pb.go create mode 100644 js/src/pb/cache_pb.client.ts create mode 100644 js/src/pb/cache_pb.ts create mode 100644 pkg/api/cached.go create mode 100644 pkg/cached/cached.go create mode 100644 pkg/cli/cachedaemon.go create mode 100644 pkg/cli/populatediskcache.go create mode 100644 test/cached_test.go diff --git a/Contributing.md b/Contributing.md index 2916506f..d2c00a46 100644 --- a/Contributing.md +++ b/Contributing.md @@ -182,3 +182,7 @@ We also need to build the server docker image and push it to Gadget's container ```bash make upload-container-image version=0.0.x ``` + +### Getting PASETO tokens locally + +You can sign PASETO tokens locally with this handy online tool: https://token.dev/paseto/. Ensure you use the V2 algorithm in the public mode, and copy the PASTEO public and private key from the `development` folder. diff --git a/Dockerfile b/Dockerfile index 3f55edbd..52ab79a1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,6 +8,10 @@ EOF WORKDIR /app +RUN GRPC_HEALTH_PROBE_VERSION=v0.4.23 \ + && curl -Lfso /bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-${TARGETARCH} \ + && chmod +x /bin/grpc_health_probe + COPY flake.nix flake.lock ./ COPY development ./development @@ -20,15 +24,11 @@ RUN nix develop -c go mod download # copy everything else and build the project COPY . ./ -RUN nix develop -c make release/server_linux_$TARGETARCH +RUN nix develop -c make release/server_linux_$TARGETARCH release/cached_linux_$TARGETARCH -FROM buildpack-deps:bullseye AS build-release-stage +FROM debian:bullseye-slim AS release-stage ARG TARGETARCH -RUN GRPC_HEALTH_PROBE_VERSION=v0.4.23 \ - && curl -Lfso /bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-${TARGETARCH} \ - && chmod +x /bin/grpc_health_probe - RUN useradd -ms /bin/bash main USER main WORKDIR /home/main @@ -37,11 +37,15 @@ RUN mkdir -p /home/main/secrets VOLUME /home/main/secrets/tls VOLUME /home/main/secrets/paseto +COPY --from=build-stage /bin/grpc_health_probe /bin/grpc_health_probe +COPY --from=build-stage /app/release/cached_linux_${TARGETARCH} cached COPY --from=build-stage /app/release/server_linux_${TARGETARCH} server + COPY migrations migrations COPY entrypoint.sh entrypoint.sh -# smoke test -- ensure the server command can run +# smoke test -- ensure the commands can run RUN ./server --help +RUN ./cached --help ENTRYPOINT ["./entrypoint.sh"] diff --git a/Makefile b/Makefile index 4655a4c3..cf69fa6c 100644 --- a/Makefile +++ b/Makefile @@ -8,11 +8,13 @@ DB_USER ?= postgres DB_PASS ?= password DB_URI := postgres://$(DB_USER):$(DB_PASS)@$(DB_HOST):5432/dl -GRPC_PORT ?= 5051 GRPC_HOST ?= localhost +GRPC_PORT ?= 5051 +GRPC_CACHED_PORT ?= 5053 -DEV_TOKEN_ADMIN ?= v2.public.eyJzdWIiOiJhZG1pbiIsImlhdCI6IjIwMjEtMTAtMTVUMTE6MjA6MDAuMDM0WiJ9WtEey8KfQQRy21xoHq1C5KQatEevk8RxS47k4bRfMwVCPHumZmVuk6ADcfDHTmSnMtEGfFXdxnYOhRP6Clb_Dw -DEV_TOKEN_PROJECT_1 ?= v2.public.eyJzdWIiOiIxIiwiaWF0IjoiMjAyMS0xMC0xNVQxMToyMDowMC4wMzVaIn2MQ14RfIGpoEycCuvRu9J3CZp6PppUXf5l5w8uKKydN3C31z6f6GgOEPNcnwODqBnX7Pjarpz4i2uzWEqLgQYD +DEV_TOKEN_ADMIN ?= v2.public.eyJzdWIiOiJhZG1pbiJ9yt40HNkcyOUtDeFa_WPS6vi0WiE4zWngDGJLh17TuYvssTudCbOdQEkVDRD-mSNTXLgSRDXUkO-AaEr4ZLO4BQ +DEV_TOKEN_PROJECT_1 ?= v2.public.eyJzdWIiOiIxIn2jV7FOdEXafKDtAnVyDgI4fmIbqU7C1iuhKiL0lDnG1Z5-j6_ObNDd75sZvLZ159-X98_mP4qvwzui0w8pjt8F +DEV_SHARED_READER_TOKEN ?= v2.public.eyJzdWIiOiJzaGFyZWQtcmVhZGVyIn1CxWdB02s9el0Wt7qReARZ-7JtIb4Zj3D4Oiji1yXHqj0orkpbcVlswVUiekECJC16d1NrHwD2FWSwRORZn8gK PKG_GO_FILES := $(shell find pkg/ -type f -name '*.go') INTERNAL_GO_FILES := $(shell find internal/ -type f -name '*.go') @@ -63,7 +65,7 @@ development/server.key: development/server.crt: development/server.key -build: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go bin/server bin/client development/server.crt +build: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go bin/server bin/client bin/cached development/server.crt lint: golangci-lint run @@ -86,8 +88,9 @@ release/migrations.tar.gz: migrations/* tar -zcf $@ migrations release: build -release: release/server_linux_amd64 release/server_macos_amd64 release/server_macos_arm64 -release: release/client_linux_amd64 release/client_macos_amd64 release/client_macos_arm64 +release: release/server_linux_amd64 release/server_macos_amd64 release/server_macos_arm64 release/server_linux_arm64 +release: release/client_linux_amd64 release/client_macos_amd64 release/client_macos_arm64 release/client_linux_arm64 +release: release/cached_linux_amd64 release/cached_macos_amd64 release/cached_macos_arm64 release/cached_linux_arm64 release: release/migrations.tar.gz test: export DB_URI = postgres://$(DB_USER):$(DB_PASS)@$(DB_HOST):5432/dl_tests @@ -121,6 +124,11 @@ server-profile: export DL_ENV=dev server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --log-level info +cached: export DL_ENV=dev +cached: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN) +cached: internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go + go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --staging-path tmp/cache-stage + client-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1) client-update: export DL_SKIP_SSL_VERIFICATION=1 client-update: @@ -169,6 +177,16 @@ client-getcache: export DL_SKIP_SSL_VERIFICATION=1 client-getcache: go run cmd/client/main.go getcache --host $(GRPC_HOST) --path input/cache +client-populate-disk-cache: export DL_TOKEN=$(DEV_TOKEN_ADMIN) +client-populate-disk-cache: export DL_SKIP_SSL_VERIFICATION=1 +client-populate-disk-cache: + mkdir -p tmp/pods/test-pod/volumes/example && go run cmd/client/main.go populate-disk-cache --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) --id test-pod + +client-rebuild-with-cached-cache: export DL_TOKEN=$(DEV_TOKEN_ADMIN) +client-rebuild-with-cached-cache: export DL_SKIP_SSL_VERIFICATION=1 +client-rebuild-with-cached-cache: + go run cmd/client/main.go rebuild --host $(GRPC_HOST) --project 1 --prefix "$(prefix)" --dir tmp/pods/test-pod/workdir --cachedir pods/test-pod/volumes/example + client-gc-contents: export DL_TOKEN=$(DEV_TOKEN_ADMIN) client-gc-contents: export DL_SKIP_SSL_VERIFICATION=1 client-gc-contents: @@ -197,15 +215,14 @@ else docker push gcr.io/gadget-core-production/dateilager:latest endif -upload-prerelease-container-image: release - docker build -t gcr.io/gadget-core-production/dateilager:$(GIT_COMMIT) . - docker push gcr.io/gadget-core-production/dateilager:$(GIT_COMMIT) +upload-prerelease-container-image: + docker build --platform linux/arm64,linux/amd64 --push -t us-central1-docker.pkg.dev/gadget-core-production/core-production/dateilager:pre-$(GIT_COMMIT) . build-local-container: - docker build -t dl-local:latest . + docker build --load -t dl-local:dev . run-container: release build-local-container - docker run --rm -it -p 127.0.0.1:$(GRPC_PORT):$(GRPC_PORT)/tcp -v ./development:/home/main/secrets/tls -v ./development:/home/main/secrets/paseto dl-local:latest $(GRPC_PORT) "postgres://$(DB_USER):$(DB_PASS)@host.docker.internal:5432" dl + docker run --rm -it -p 127.0.0.1:$(GRPC_PORT):$(GRPC_PORT)/tcp -v ./development:/home/main/secrets/tls -v ./development:/home/main/secrets/paseto dl-local:dev $(GRPC_PORT) "postgres://$(DB_USER):$(DB_PASS)@host.docker.internal:5432" dl gen-docs: go run cmd/gen-docs/main.go @@ -244,7 +261,9 @@ else endif js/src/pb: $(PROTO_FILES) - cd js && mkdir -p ./src/pb && npx protoc --experimental_allow_proto3_optional --ts_out ./src/pb --ts_opt long_type_bigint,ts_nocheck,eslint_disable,add_pb_suffix --proto_path ../internal/pb/ ../$^ + cd js && mkdir -p ./src/pb && for proto in $^; do \ + npx protoc --experimental_allow_proto3_optional --ts_out ./src/pb --ts_opt long_type_bigint,ts_nocheck,eslint_disable,add_pb_suffix --proto_path ../internal/pb/ ../$$proto; \ + done js/dist: js/node_modules js/src/pb cd js && npm run build diff --git a/cmd/cached/main.go b/cmd/cached/main.go new file mode 100644 index 00000000..20a7b13e --- /dev/null +++ b/cmd/cached/main.go @@ -0,0 +1,7 @@ +package main + +import "github.com/gadget-inc/dateilager/pkg/cli" + +func main() { + cli.CacheDaemonExecute() +} diff --git a/development/paseto.key b/development/paseto.key new file mode 100644 index 00000000..cf19319d --- /dev/null +++ b/development/paseto.key @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEILTL+0PfTOIQcn2VPkpxMwf6Gbt9n4UEFDjZ4RuUKjd0 +-----END PRIVATE KEY----- \ No newline at end of file diff --git a/development/paseto.pub b/development/paseto.pub index aee98703..98bc10bf 100644 --- a/development/paseto.pub +++ b/development/paseto.pub @@ -1,3 +1,3 @@ -----BEGIN PUBLIC KEY----- -MCowBQYDK2VwAyEASKQkA/AxlNCdOHTnp5McesmQ+y756VTtGz8Xrt1G0fs= ------END PUBLIC KEY----- +MCowBQYDK2VwAyEAHrnbu7wEfAP9cGBOAHHwmH4Wsot1ciXBHwBBXQ4gsaI= +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/flake.nix b/flake.nix index 07a0d818..4324bd72 100644 --- a/flake.nix +++ b/flake.nix @@ -54,6 +54,7 @@ postgresql = pkgs.postgresql_14; + glibcLocales = pkgs.glibcLocales; ## DateiLager outputs dateilager = callPackage ./. { @@ -72,6 +73,7 @@ flake.packages.postgresql flake.packages.dev flake.packages.clean + flake.packages.glibcLocales git protobuf protoc-gen-go diff --git a/internal/key/key.go b/internal/key/key.go index 3d3a1d9b..70c90506 100644 --- a/internal/key/key.go +++ b/internal/key/key.go @@ -1,6 +1,8 @@ package key import ( + "time" + "github.com/gadget-inc/dateilager/pkg/stringutil" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" @@ -36,7 +38,9 @@ const ( Worker = IntKey("dl.worker") WorkerCount = IntKey("dl.worker_count") Ignores = StringSliceKey("dl.ignores") + DurationMS = DurationKey("dl.duration_ms") CloneToProject = Int64Key("dl.clone_to_project") + CachePath = StringKey("dl.cache_path") ) var ( @@ -148,3 +152,13 @@ func (isk Int64SliceKey) Field(value []int64) zap.Field { func (isk Int64SliceKey) Attribute(value []int64) attribute.KeyValue { return attribute.Int64Slice(string(isk), value) } + +type DurationKey string + +func (dk DurationKey) Field(value time.Duration) zap.Field { + return zap.Duration(string(dk), value) +} + +func (dk DurationKey) Attribute(value time.Duration) attribute.KeyValue { + return attribute.Float64(string(dk), float64(value.Milliseconds())) +} diff --git a/internal/pb/cache.pb.go b/internal/pb/cache.pb.go new file mode 100644 index 00000000..458fb1c4 --- /dev/null +++ b/internal/pb/cache.pb.go @@ -0,0 +1,216 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v4.24.4 +// source: internal/pb/cache.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PopulateDiskCacheRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` +} + +func (x *PopulateDiskCacheRequest) Reset() { + *x = PopulateDiskCacheRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_cache_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PopulateDiskCacheRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PopulateDiskCacheRequest) ProtoMessage() {} + +func (x *PopulateDiskCacheRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_cache_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PopulateDiskCacheRequest.ProtoReflect.Descriptor instead. +func (*PopulateDiskCacheRequest) Descriptor() ([]byte, []int) { + return file_internal_pb_cache_proto_rawDescGZIP(), []int{0} +} + +func (x *PopulateDiskCacheRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type PopulateDiskCacheResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version int64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *PopulateDiskCacheResponse) Reset() { + *x = PopulateDiskCacheResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_pb_cache_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PopulateDiskCacheResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PopulateDiskCacheResponse) ProtoMessage() {} + +func (x *PopulateDiskCacheResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_cache_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PopulateDiskCacheResponse.ProtoReflect.Descriptor instead. +func (*PopulateDiskCacheResponse) Descriptor() ([]byte, []int) { + return file_internal_pb_cache_proto_rawDescGZIP(), []int{1} +} + +func (x *PopulateDiskCacheResponse) GetVersion() int64 { + if x != nil { + return x.Version + } + return 0 +} + +var File_internal_pb_cache_proto protoreflect.FileDescriptor + +var file_internal_pb_cache_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x62, 0x2f, 0x63, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x2e, 0x0a, + 0x18, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, + 0x68, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, + 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0x35, 0x0a, + 0x19, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, + 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x32, 0x59, 0x0a, 0x05, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x50, 0x0a, + 0x11, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, + 0x68, 0x65, 0x12, 0x1c, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, + 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1d, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, + 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, + 0x64, 0x67, 0x65, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x65, 0x69, 0x6c, 0x61, + 0x67, 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_internal_pb_cache_proto_rawDescOnce sync.Once + file_internal_pb_cache_proto_rawDescData = file_internal_pb_cache_proto_rawDesc +) + +func file_internal_pb_cache_proto_rawDescGZIP() []byte { + file_internal_pb_cache_proto_rawDescOnce.Do(func() { + file_internal_pb_cache_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_pb_cache_proto_rawDescData) + }) + return file_internal_pb_cache_proto_rawDescData +} + +var file_internal_pb_cache_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pb_cache_proto_goTypes = []interface{}{ + (*PopulateDiskCacheRequest)(nil), // 0: pb.PopulateDiskCacheRequest + (*PopulateDiskCacheResponse)(nil), // 1: pb.PopulateDiskCacheResponse +} +var file_internal_pb_cache_proto_depIdxs = []int32{ + 0, // 0: pb.Cache.PopulateDiskCache:input_type -> pb.PopulateDiskCacheRequest + 1, // 1: pb.Cache.PopulateDiskCache:output_type -> pb.PopulateDiskCacheResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_internal_pb_cache_proto_init() } +func file_internal_pb_cache_proto_init() { + if File_internal_pb_cache_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_pb_cache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PopulateDiskCacheRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_pb_cache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PopulateDiskCacheResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_pb_cache_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_pb_cache_proto_goTypes, + DependencyIndexes: file_internal_pb_cache_proto_depIdxs, + MessageInfos: file_internal_pb_cache_proto_msgTypes, + }.Build() + File_internal_pb_cache_proto = out.File + file_internal_pb_cache_proto_rawDesc = nil + file_internal_pb_cache_proto_goTypes = nil + file_internal_pb_cache_proto_depIdxs = nil +} diff --git a/internal/pb/cache.proto b/internal/pb/cache.proto new file mode 100644 index 00000000..df257cd7 --- /dev/null +++ b/internal/pb/cache.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package pb; + +option go_package = "github.com/gadget-inc/dateilager/pkg/pb"; + +service Cache { + rpc PopulateDiskCache(PopulateDiskCacheRequest) + returns (PopulateDiskCacheResponse); +} + +message PopulateDiskCacheRequest { string path = 1; } + +message PopulateDiskCacheResponse { int64 version = 1; }; diff --git a/internal/pb/cache_grpc.pb.go b/internal/pb/cache_grpc.pb.go new file mode 100644 index 00000000..23a7751c --- /dev/null +++ b/internal/pb/cache_grpc.pb.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.4 +// source: internal/pb/cache.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Cache_PopulateDiskCache_FullMethodName = "/pb.Cache/PopulateDiskCache" +) + +// CacheClient is the client API for Cache service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CacheClient interface { + PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) +} + +type cacheClient struct { + cc grpc.ClientConnInterface +} + +func NewCacheClient(cc grpc.ClientConnInterface) CacheClient { + return &cacheClient{cc} +} + +func (c *cacheClient) PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) { + out := new(PopulateDiskCacheResponse) + err := c.cc.Invoke(ctx, Cache_PopulateDiskCache_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CacheServer is the server API for Cache service. +// All implementations must embed UnimplementedCacheServer +// for forward compatibility +type CacheServer interface { + PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) + mustEmbedUnimplementedCacheServer() +} + +// UnimplementedCacheServer must be embedded to have forward compatible implementations. +type UnimplementedCacheServer struct { +} + +func (UnimplementedCacheServer) PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PopulateDiskCache not implemented") +} +func (UnimplementedCacheServer) mustEmbedUnimplementedCacheServer() {} + +// UnsafeCacheServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CacheServer will +// result in compilation errors. +type UnsafeCacheServer interface { + mustEmbedUnimplementedCacheServer() +} + +func RegisterCacheServer(s grpc.ServiceRegistrar, srv CacheServer) { + s.RegisterService(&Cache_ServiceDesc, srv) +} + +func _Cache_PopulateDiskCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PopulateDiskCacheRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CacheServer).PopulateDiskCache(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Cache_PopulateDiskCache_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CacheServer).PopulateDiskCache(ctx, req.(*PopulateDiskCacheRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Cache_ServiceDesc is the grpc.ServiceDesc for Cache service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Cache_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.Cache", + HandlerType: (*CacheServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "PopulateDiskCache", + Handler: _Cache_PopulateDiskCache_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "internal/pb/cache.proto", +} diff --git a/internal/testutil/context.go b/internal/testutil/context.go index 44e72252..517b0aaa 100644 --- a/internal/testutil/context.go +++ b/internal/testutil/context.go @@ -34,6 +34,9 @@ func NewTestCtx(t *testing.T, role auth.Role, projects ...int64) TestCtx { Project: project, }) + log := zaptest.NewLogger(t) + zap.ReplaceGlobals(log) + dbConn, err := newDbTestConnector(ctx, os.Getenv("DB_URI")) require.NoError(t, err, "connecting to DB") @@ -42,7 +45,7 @@ func NewTestCtx(t *testing.T, role auth.Role, projects ...int64) TestCtx { return TestCtx{ t: t, - log: zaptest.NewLogger(t), + log: log, dbConn: dbConn, lookup: lookup, ctx: ctx, diff --git a/js/spec/util.ts b/js/spec/util.ts index 922fe249..a7581dcb 100644 --- a/js/spec/util.ts +++ b/js/spec/util.ts @@ -3,7 +3,7 @@ import path from "path"; import { DateiLagerBinaryClient, DateiLagerGrpcClient } from "../src"; export const devAdminToken = - "v2.public.eyJzdWIiOiJhZG1pbiIsImlhdCI6IjIwMjEtMTAtMTVUMTE6MjA6MDAuMDM0WiJ9WtEey8KfQQRy21xoHq1C5KQatEevk8RxS47k4bRfMwVCPHumZmVuk6ADcfDHTmSnMtEGfFXdxnYOhRP6Clb_Dw"; + "v2.public.eyJzdWIiOiJhZG1pbiJ9yt40HNkcyOUtDeFa_WPS6vi0WiE4zWngDGJLh17TuYvssTudCbOdQEkVDRD-mSNTXLgSRDXUkO-AaEr4ZLO4BQ"; export const grpcClient = new DateiLagerGrpcClient({ server: "localhost:5051", diff --git a/js/src/pb/cache_pb.client.ts b/js/src/pb/cache_pb.client.ts new file mode 100644 index 00000000..c84df578 --- /dev/null +++ b/js/src/pb/cache_pb.client.ts @@ -0,0 +1,39 @@ +/* eslint-disable */ +// @generated by protobuf-ts 2.9.4 with parameter long_type_bigint,ts_nocheck,eslint_disable,add_pb_suffix +// @generated from protobuf file "cache.proto" (package "pb", syntax proto3) +// tslint:disable +// @ts-nocheck +import type { RpcTransport } from "@protobuf-ts/runtime-rpc"; +import type { ServiceInfo } from "@protobuf-ts/runtime-rpc"; +import { Cache } from "./cache_pb"; +import { stackIntercept } from "@protobuf-ts/runtime-rpc"; +import type { PopulateDiskCacheResponse } from "./cache_pb"; +import type { PopulateDiskCacheRequest } from "./cache_pb"; +import type { UnaryCall } from "@protobuf-ts/runtime-rpc"; +import type { RpcOptions } from "@protobuf-ts/runtime-rpc"; +/** + * @generated from protobuf service pb.Cache + */ +export interface ICacheClient { + /** + * @generated from protobuf rpc: PopulateDiskCache(pb.PopulateDiskCacheRequest) returns (pb.PopulateDiskCacheResponse); + */ + populateDiskCache(input: PopulateDiskCacheRequest, options?: RpcOptions): UnaryCall; +} +/** + * @generated from protobuf service pb.Cache + */ +export class CacheClient implements ICacheClient, ServiceInfo { + typeName = Cache.typeName; + methods = Cache.methods; + options = Cache.options; + constructor(private readonly _transport: RpcTransport) { + } + /** + * @generated from protobuf rpc: PopulateDiskCache(pb.PopulateDiskCacheRequest) returns (pb.PopulateDiskCacheResponse); + */ + populateDiskCache(input: PopulateDiskCacheRequest, options?: RpcOptions): UnaryCall { + const method = this.methods[0], opt = this._transport.mergeOptions(options); + return stackIntercept("unary", this._transport, method, opt, input); + } +} diff --git a/js/src/pb/cache_pb.ts b/js/src/pb/cache_pb.ts new file mode 100644 index 00000000..80a6a2c2 --- /dev/null +++ b/js/src/pb/cache_pb.ts @@ -0,0 +1,133 @@ +/* eslint-disable */ +// @generated by protobuf-ts 2.9.4 with parameter long_type_bigint,ts_nocheck,eslint_disable,add_pb_suffix +// @generated from protobuf file "cache.proto" (package "pb", syntax proto3) +// tslint:disable +// @ts-nocheck +import { ServiceType } from "@protobuf-ts/runtime-rpc"; +import type { BinaryWriteOptions } from "@protobuf-ts/runtime"; +import type { IBinaryWriter } from "@protobuf-ts/runtime"; +import { WireType } from "@protobuf-ts/runtime"; +import type { BinaryReadOptions } from "@protobuf-ts/runtime"; +import type { IBinaryReader } from "@protobuf-ts/runtime"; +import { UnknownFieldHandler } from "@protobuf-ts/runtime"; +import type { PartialMessage } from "@protobuf-ts/runtime"; +import { reflectionMergePartial } from "@protobuf-ts/runtime"; +import { MessageType } from "@protobuf-ts/runtime"; +/** + * @generated from protobuf message pb.PopulateDiskCacheRequest + */ +export interface PopulateDiskCacheRequest { + /** + * @generated from protobuf field: string path = 1; + */ + path: string; +} +/** + * @generated from protobuf message pb.PopulateDiskCacheResponse + */ +export interface PopulateDiskCacheResponse { + /** + * @generated from protobuf field: int64 version = 1; + */ + version: bigint; +} +// @generated message type with reflection information, may provide speed optimized methods +class PopulateDiskCacheRequest$Type extends MessageType { + constructor() { + super("pb.PopulateDiskCacheRequest", [ + { no: 1, name: "path", kind: "scalar", T: 9 /*ScalarType.STRING*/ } + ]); + } + create(value?: PartialMessage): PopulateDiskCacheRequest { + const message = globalThis.Object.create((this.messagePrototype!)); + message.path = ""; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: PopulateDiskCacheRequest): PopulateDiskCacheRequest { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* string path */ 1: + message.path = reader.string(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: PopulateDiskCacheRequest, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* string path = 1; */ + if (message.path !== "") + writer.tag(1, WireType.LengthDelimited).string(message.path); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message pb.PopulateDiskCacheRequest + */ +export const PopulateDiskCacheRequest = new PopulateDiskCacheRequest$Type(); +// @generated message type with reflection information, may provide speed optimized methods +class PopulateDiskCacheResponse$Type extends MessageType { + constructor() { + super("pb.PopulateDiskCacheResponse", [ + { no: 1, name: "version", kind: "scalar", T: 3 /*ScalarType.INT64*/, L: 0 /*LongType.BIGINT*/ } + ]); + } + create(value?: PartialMessage): PopulateDiskCacheResponse { + const message = globalThis.Object.create((this.messagePrototype!)); + message.version = 0n; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: PopulateDiskCacheResponse): PopulateDiskCacheResponse { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* int64 version */ 1: + message.version = reader.int64().toBigInt(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: PopulateDiskCacheResponse, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* int64 version = 1; */ + if (message.version !== 0n) + writer.tag(1, WireType.Varint).int64(message.version); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message pb.PopulateDiskCacheResponse + */ +export const PopulateDiskCacheResponse = new PopulateDiskCacheResponse$Type(); +/** + * @generated ServiceType for protobuf service pb.Cache + */ +export const Cache = new ServiceType("pb.Cache", [ + { name: "PopulateDiskCache", options: {}, I: PopulateDiskCacheRequest, O: PopulateDiskCacheResponse } +]); diff --git a/pkg/api/cached.go b/pkg/api/cached.go new file mode 100644 index 00000000..cd3ea31f --- /dev/null +++ b/pkg/api/cached.go @@ -0,0 +1,106 @@ +package api + +import ( + "context" + "crypto/rand" + "encoding/base64" + "errors" + "fmt" + "os" + "path" + "time" + + "github.com/gadget-inc/dateilager/internal/files" + "github.com/gadget-inc/dateilager/internal/key" + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/internal/pb" + "github.com/gadget-inc/dateilager/pkg/client" + "golang.org/x/sys/unix" +) + +type Cached struct { + pb.UnimplementedCacheServer + Client *client.Client + StagingPath string + // the current directory holding a fully formed downloaded cache + currentDir string + // the current version of the cache on disk at currentDir + currentVersion int64 +} + +func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCacheRequest) (*pb.PopulateDiskCacheResponse, error) { + err := requireAdminAuth(ctx) + if err != nil { + return nil, err + } + + destination := req.Path + logger.Debug(ctx, "cached.PopulateDiskCache[Init]", + key.CachePath.Field(req.Path), + ) + + version, err := c.WriteCache(destination) + if err != nil { + return nil, err + } + + logger.Debug(ctx, "cached.PopulateDiskCache[Written]", + key.CachePath.Field(destination), + ) + + return &pb.PopulateDiskCacheResponse{Version: version}, nil +} + +// check if the destination exists, and if so, if its writable +// hardlink the golden copy into this downstream's destination, creating it if need be +func (c *Cached) WriteCache(destination string) (int64, error) { + if c.currentDir == "" { + return -1, errors.New("no cache prepared, currentDir is nil") + } + + stat, err := os.Stat(destination) + if !os.IsNotExist(err) { + if err != nil { + return -1, fmt.Errorf("failed to stat cache destination %s: %v", destination, err) + } + + if !stat.IsDir() { + return -1, fmt.Errorf("failed to open cache destination %s for writing -- it is already a file", destination) + } + + if unix.Access(destination, unix.W_OK) != nil { + return -1, fmt.Errorf("failed to open cache destination %s for writing -- write permission denied", destination) + } + } + + err = files.HardlinkDir(c.currentDir, destination) + if err != nil { + return -1, fmt.Errorf("failed to hardlink cache to destination %s: %v", destination, err) + } + return c.currentVersion, nil +} + +/** Fetch the cache into a spot in the staging dir */ +func (c *Cached) Prepare(ctx context.Context) error { + start := time.Now() + newDir := path.Join(c.StagingPath, randomString()) + version, count, err := c.Client.GetCache(ctx, newDir) + if err != nil { + return err + } + + c.currentDir = newDir + c.currentVersion = version + + logger.Info(ctx, "downloaded golden copy", key.Directory.Field(newDir), key.DurationMS.Field(time.Since(start)), key.Version.Field(version), key.Count.Field(int64(count))) + return nil +} + +func randomString() string { + // Generate a secure random string for the temporary directory name + randBytes := make([]byte, 10) // Adjust the size of the byte slice as needed + if _, err := rand.Read(randBytes); err != nil { + panic(err) // Handle error appropriately in production code + } + return base64.URLEncoding.EncodeToString(randBytes) +} diff --git a/pkg/cached/cached.go b/pkg/cached/cached.go new file mode 100644 index 00000000..80cf16ac --- /dev/null +++ b/pkg/cached/cached.go @@ -0,0 +1,69 @@ +package cached + +import ( + "context" + "crypto/tls" + "net" + + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/internal/pb" + "github.com/gadget-inc/dateilager/pkg/api" + "github.com/gadget-inc/dateilager/pkg/client" + "github.com/gadget-inc/dateilager/pkg/server" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +type CacheServer struct { + Grpc *grpc.Server + Health *health.Server + Cached *api.Cached +} + +func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate, stagingPath string) *CacheServer { + creds := credentials.NewServerTLSFromCert(cert) + + grpcServer := grpc.NewServer( + grpc.UnaryInterceptor( + grpc_middleware.ChainUnaryServer( + grpc_recovery.UnaryServerInterceptor(), + otelgrpc.UnaryServerInterceptor(), + logger.UnaryServerInterceptor(), + ), + ), + grpc.ReadBufferSize(server.BUFFER_SIZE), + grpc.WriteBufferSize(server.BUFFER_SIZE), + grpc.InitialConnWindowSize(server.INITIAL_CONN_WINDOW_SIZE), + grpc.InitialWindowSize(server.INITIAL_WINDOW_SIZE), + grpc.MaxRecvMsgSize(server.MAX_MESSAGE_SIZE), + grpc.MaxSendMsgSize(server.MAX_MESSAGE_SIZE), + grpc.Creds(creds), + ) + + logger.Info(ctx, "register HealthServer") + healthServer := health.NewServer() + healthpb.RegisterHealthServer(grpcServer, healthServer) + + cached := &api.Cached{ + Client: client, + StagingPath: stagingPath, + } + pb.RegisterCacheServer(grpcServer, cached) + + server := &CacheServer{ + Grpc: grpcServer, + Health: healthServer, + Cached: cached, + } + + return server +} + +func (s *CacheServer) Serve(lis net.Listener) error { + return s.Grpc.Serve(lis) +} diff --git a/pkg/cli/cachedaemon.go b/pkg/cli/cachedaemon.go new file mode 100644 index 00000000..1393f4ab --- /dev/null +++ b/pkg/cli/cachedaemon.go @@ -0,0 +1,169 @@ +package cli + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "net" + "os" + "os/signal" + "runtime/pprof" + "syscall" + + "github.com/gadget-inc/dateilager/internal/environment" + "github.com/gadget-inc/dateilager/internal/key" + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/internal/telemetry" + "github.com/gadget-inc/dateilager/pkg/cached" + "github.com/gadget-inc/dateilager/pkg/client" + "github.com/gadget-inc/dateilager/pkg/version" + "github.com/spf13/cobra" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func NewCacheDaemonCommand() *cobra.Command { + var ( + profilerEnabled bool = false + shutdownTelemetry func() + ) + + var ( + level *zapcore.Level + encoding string + tracing bool + profilePath string + upstreamHost string + upstreamPort uint16 + certFile string + keyFile string + port int + timeout uint + headlessHost string + stagingPath string + ) + + cmd := &cobra.Command{ + Use: "cached", + Short: "DateiLager cache daemon", + DisableAutoGenTag: true, + Version: version.Version, + RunE: func(cmd *cobra.Command, _ []string) error { + cmd.SilenceUsage = true // silence usage when an error occurs after flags have been parsed + + env, err := environment.LoadEnvironment() + if err != nil { + return fmt.Errorf("could not load environment: %w", err) + } + + var config zap.Config + if env == environment.Prod { + config = zap.NewProductionConfig() + } else { + config = zap.NewDevelopmentConfig() + } + + config.Encoding = encoding + config.Level = zap.NewAtomicLevelAt(*level) + config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + + err = logger.Init(config) + if err != nil { + return fmt.Errorf("could not initialize logger: %w", err) + } + + ctx := cmd.Context() + + if profilePath != "" { + file, err := os.Create(profilePath) + if err != nil { + return fmt.Errorf("cannot open profile path %s: %w", profilePath, err) + } + _ = pprof.StartCPUProfile(file) + profilerEnabled = true + } + + if tracing { + shutdownTelemetry = telemetry.Init(ctx, telemetry.Server) + } + + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return fmt.Errorf("cannot open TLS cert and key files (%s, %s): %w", certFile, keyFile, err) + } + + cl, err := client.NewClient(ctx, upstreamHost, upstreamPort, client.WithheadlessHost(headlessHost)) + if err != nil { + return err + } + ctx = client.IntoContext(ctx, cl) + + listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return fmt.Errorf("failed to listen on TCP port %d: %w", port, err) + } + + s := cached.NewServer(ctx, cl, &cert, stagingPath) + + osSignals := make(chan os.Signal, 1) + signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM) + go func() { + <-osSignals + s.Grpc.GracefulStop() + }() + + err = s.Cached.Prepare(ctx) + if err != nil { + return fmt.Errorf("failed to prepare cache daemon in %s: %w", stagingPath, err) + } + + logger.Info(ctx, "start server", key.Port.Field(port), key.Environment.Field(env.String())) + return s.Serve(listen) + }, + PostRunE: func(cmd *cobra.Command, _ []string) error { + if shutdownTelemetry != nil { + shutdownTelemetry() + } + + if profilerEnabled { + pprof.StopCPUProfile() + } + + return nil + }, + } + + flags := cmd.PersistentFlags() + + level = zap.LevelFlag("log-level", zap.DebugLevel, "Log level") + flags.AddGoFlag(flag.CommandLine.Lookup("log-level")) + flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)") + flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled") + flags.StringVar(&profilePath, "profile", "", "CPU profile output path (profiling enabled if set)") + flags.StringVar(&certFile, "cert", "development/server.crt", "TLS cert file") + flags.StringVar(&keyFile, "key", "development/server.key", "TLS key file") + flags.StringVar(&upstreamHost, "upstream-host", "localhost", "GRPC server hostname") + flags.Uint16Var(&upstreamPort, "upstream-port", 5051, "GRPC server port") + flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections") + flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)") + flags.IntVar(&port, "port", 5053, "cache API port") + flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches") + + _ = cmd.MarkPersistentFlagRequired("staging-path") + return cmd +} + +func CacheDaemonExecute() { + ctx := context.Background() + cmd := NewCacheDaemonCommand() + + err := cmd.ExecuteContext(ctx) + + logger.Info(ctx, "shut down server") + _ = logger.Sync() + + if err != nil { + logger.Fatal(ctx, "server failed", zap.Error(err)) + } +} diff --git a/pkg/cli/client.go b/pkg/cli/client.go index 9b74b308..200ded83 100644 --- a/pkg/cli/client.go +++ b/pkg/cli/client.go @@ -124,6 +124,7 @@ func NewClientCommand() *cobra.Command { cmd.AddCommand(NewCmdUpdate()) cmd.AddCommand(NewCmdGc()) cmd.AddCommand(NewCmdGetCache()) + cmd.AddCommand(NewCmdPopulateDiskCache()) return cmd } diff --git a/pkg/cli/populatediskcache.go b/pkg/cli/populatediskcache.go new file mode 100644 index 00000000..7f456eea --- /dev/null +++ b/pkg/cli/populatediskcache.go @@ -0,0 +1,30 @@ +package cli + +import ( + "github.com/gadget-inc/dateilager/internal/key" + "github.com/gadget-inc/dateilager/internal/logger" + "github.com/gadget-inc/dateilager/pkg/client" + "github.com/spf13/cobra" +) + +func NewCmdPopulateDiskCache() *cobra.Command { + cmd := &cobra.Command{ + Use: "populate-disk-cache ", + Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + c := client.FromContext(ctx) + + version, err := c.PopulateDiskCache(ctx, args[0]) + if err != nil { + return err + } + + logger.Info(ctx, "cache populated", key.Version.Field(version)) + + return nil + }, + } + + return cmd +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 805b0856..7da2d7ef 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -46,12 +46,13 @@ type VersionRange struct { } type Client struct { - conn *grpc.ClientConn - fs pb.FsClient + conn *grpc.ClientConn + fs pb.FsClient + cache pb.CacheClient } func NewClientConn(conn *grpc.ClientConn) *Client { - return &Client{conn: conn, fs: pb.NewFsClient(conn)} + return &Client{conn: conn, fs: pb.NewFsClient(conn), cache: pb.NewCacheClient(conn)} } type options struct { @@ -950,6 +951,24 @@ func (c *Client) CloneToProject(ctx context.Context, source int64, target int64, return &response.LatestVersion, nil } +func (c *Client) PopulateDiskCache(ctx context.Context, destination string) (int64, error) { + ctx, span := telemetry.Start(ctx, "client.populate-disk-cache", trace.WithAttributes( + key.CachePath.Attribute(destination), + )) + defer span.End() + + request := &pb.PopulateDiskCacheRequest{ + Path: destination, + } + + response, err := c.cache.PopulateDiskCache(ctx, request) + if err != nil { + return 0, fmt.Errorf("populate disk cache for %s: %w", destination, err) + } + + return response.Version, nil +} + func parallelWorkerCount() int { envCount := os.Getenv("DL_WRITE_WORKERS") if envCount != "" { diff --git a/test/cached_test.go b/test/cached_test.go new file mode 100644 index 00000000..34b9c8a2 --- /dev/null +++ b/test/cached_test.go @@ -0,0 +1,104 @@ +package test + +import ( + "fmt" + "os" + "path" + "testing" + + "github.com/gadget-inc/dateilager/internal/auth" + "github.com/gadget-inc/dateilager/internal/db" + "github.com/gadget-inc/dateilager/internal/pb" + util "github.com/gadget-inc/dateilager/internal/testutil" + "github.com/gadget-inc/dateilager/pkg/cached" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPopulateCache(t *testing.T) { + tc := util.NewTestCtx(t, auth.Admin, 1) + defer tc.Close() + + writeProject(tc, 1, 2) + writeObject(tc, 1, 1, i(2), "a", "a v1") + aHash := writePackedFiles(tc, 1, 1, nil, "pack/a") + bHash := writePackedFiles(tc, 1, 1, nil, "pack/b") + version, err := db.CreateCache(tc.Context(), tc.Connect(), "", 100) + require.NoError(t, err) + + c, _, close := createTestClient(tc) + defer close() + + tmpDir := emptyTmpDir(t) + defer os.RemoveAll(tmpDir) + + s := cached.NewServer(tc.Context(), c, testCert(), path.Join(tmpDir, "staging")) + require.NoError(t, s.Cached.Prepare(tc.Context()), "cached.Prepare must succeed") + + _, err = s.Cached.PopulateDiskCache(tc.Context(), &pb.PopulateDiskCacheRequest{Path: path.Join(tmpDir, "test")}) + require.NoError(t, err, "Cached.PopulateDiskCache") + + verifyDir(t, path.Join(tmpDir, "test"), -1, map[string]expectedFile{ + fmt.Sprintf("objects/%v/pack/a/1", aHash): {content: "pack/a/1 v1"}, + fmt.Sprintf("objects/%v/pack/a/2", aHash): {content: "pack/a/2 v1"}, + fmt.Sprintf("objects/%v/pack/b/1", bHash): {content: "pack/b/1 v1"}, + fmt.Sprintf("objects/%v/pack/b/2", bHash): {content: "pack/b/2 v1"}, + "versions": {content: fmt.Sprintf("%v\n", version)}, + }) +} + +func TestPopulateEmptyCache(t *testing.T) { + tc := util.NewTestCtx(t, auth.Admin, 1) + defer tc.Close() + + writeProject(tc, 1, 2) + writeObject(tc, 1, 1, i(2), "a", "a v1") + // no packed files, so no cache + version, err := db.CreateCache(tc.Context(), tc.Connect(), "", 100) + require.NoError(t, err) + assert.NotEqual(t, int64(-1), version) + + c, _, close := createTestClient(tc) + defer close() + + tmpDir := emptyTmpDir(t) + defer os.RemoveAll(tmpDir) + + s := cached.NewServer(tc.Context(), c, testCert(), path.Join(tmpDir, "staging")) + require.NoError(t, s.Cached.Prepare(tc.Context()), "cached.Prepare must succeed") + + _, err = s.Cached.PopulateDiskCache(tc.Context(), &pb.PopulateDiskCacheRequest{Path: path.Join(tmpDir, "test")}) + require.NoError(t, err, "Cached.PopulateDiskCache") + + verifyDir(t, path.Join(tmpDir, "test"), -1, map[string]expectedFile{ + "objects/": {content: "", fileType: typeDirectory}, + }) +} + +func TestPopulateCacheToPathWithNoWritePermissions(t *testing.T) { + tc := util.NewTestCtx(t, auth.Admin, 1) + defer tc.Close() + + writeProject(tc, 1, 2) + writeObject(tc, 1, 1, i(2), "a", "a v1") + writePackedFiles(tc, 1, 1, nil, "pack/a") + writePackedFiles(tc, 1, 1, nil, "pack/b") + _, err := db.CreateCache(tc.Context(), tc.Connect(), "", 100) + require.NoError(t, err) + + c, _, close := createTestClient(tc) + defer close() + + tmpDir := emptyTmpDir(t) + defer os.RemoveAll(tmpDir) + + s := cached.NewServer(tc.Context(), c, testCert(), path.Join(tmpDir, "staging")) + require.NoError(t, s.Cached.Prepare(tc.Context()), "cached.Prepare must succeed") + + // Create a directory with no write permissions + err = os.Mkdir(path.Join(tmpDir, "test"), 0000) + require.NoError(t, err) + + _, err = s.Cached.PopulateDiskCache(tc.Context(), &pb.PopulateDiskCacheRequest{Path: path.Join(tmpDir, "test")}) + require.Error(t, err, "populating cache to a path with no write permissions must fail") +} diff --git a/test/client_rebuild_test.go b/test/client_rebuild_test.go index e8812865..fd13b03d 100644 --- a/test/client_rebuild_test.go +++ b/test/client_rebuild_test.go @@ -277,8 +277,8 @@ func TestRebuildWithCache(t *testing.T) { count: 2, }) - aCachePath := filepath.Join(client.CacheObjectsDir(cacheDir), ha.Hex(), "pack/a") - bCachePath := filepath.Join(client.CacheObjectsDir(cacheDir), hb.Hex(), "pack/b") + aCachePath := filepath.Join(client.CacheObjectsDir(cacheDir), ha, "pack/a") + bCachePath := filepath.Join(client.CacheObjectsDir(cacheDir), hb, "pack/b") verifyDir(t, tmpDir, 1, map[string]expectedFile{ "pack/a/1": {content: "pack/a/1 v1"}, diff --git a/test/shared_test.go b/test/shared_test.go index 18c8723a..640e0232 100644 --- a/test/shared_test.go +++ b/test/shared_test.go @@ -4,11 +4,13 @@ import ( "archive/tar" "bytes" "context" + "crypto/tls" "fmt" "io" "io/fs" "net" "os" + "path" "path/filepath" "sort" "strings" @@ -213,11 +215,12 @@ func writePackedObjects(tc util.TestCtx, project int64, start int64, stop *int64 return hash } -func writePackedFiles(tc util.TestCtx, project int64, start int64, stop *int64, path string) db.Hash { - return writePackedObjects(tc, project, start, stop, path, map[string]expectedObject{ +func writePackedFiles(tc util.TestCtx, project int64, start int64, stop *int64, path string) string { + hash := writePackedObjects(tc, project, start, stop, path, map[string]expectedObject{ filepath.Join(path, "1"): {content: fmt.Sprintf("%s v%d", filepath.Join(path, "1"), start)}, filepath.Join(path, "2"): {content: fmt.Sprintf("%s v%d", filepath.Join(path, "2"), start)}, }) + return hash.Hex() } func packObjects(tc util.TestCtx, objects map[string]expectedObject) []byte { @@ -338,11 +341,13 @@ func verifyDir(t *testing.T, dir string, version int64, files map[string]expecte dirEntries[fmt.Sprintf("%s/", *maybeEmptyDir)] = *maybeEmptyInfo } - fileVersion, err := client.ReadVersionFile(dir) - require.NoError(t, err, "read version file") + if version != -1 { + fileVersion, err := client.ReadVersionFile(dir) + require.NoError(t, err, "read version file") - assert.Equal(t, version, fileVersion, "expected file version %v", version) - assert.Equal(t, len(files), len(dirEntries), "expected %v files in %v", len(files), dir) + assert.Equal(t, version, fileVersion, "expected file version %v", version) + assert.Equal(t, len(files), len(dirEntries), "expected %v files in %v", len(files), dir) + } for name, file := range files { path := filepath.Join(dir, name) @@ -646,6 +651,23 @@ func verifyTarResults(t *testing.T, results [][]byte, expected map[string]expect // Use debugProjects(tc) and debugObjects(tc) within a failing test to log the state of the DB +func testCert() *tls.Certificate { + ex, err := os.Getwd() + if err != nil { + panic(err) + } + exPath := filepath.Dir(ex) // hop up from ./test to root of project + + certFile := path.Join(exPath, "development/server.crt") + keyFile := path.Join(exPath, "development/server.key") + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + panic(fmt.Errorf("cannot open TLS cert and key files (%s, %s): %w", certFile, keyFile, err)) + } + + return &cert +} + //lint:ignore U1000 debug utility func debugProjects(tc util.TestCtx) { conn := tc.Connect()