From e2fdadb6535e2c69d4fcfb7d6a8df54eb4bbd604 Mon Sep 17 00:00:00 2001 From: Alex Angelini Date: Wed, 8 May 2024 14:11:07 +0200 Subject: [PATCH] Split clients and consistency changes --- Makefile | 10 ++- internal/pb/cache.pb.go | 24 +++---- internal/pb/cache.proto | 5 +- internal/pb/cache_grpc.pb.go | 62 +++++++++---------- internal/testutil/context.go | 9 +++ pkg/api/cached.go | 12 +++- pkg/cached/cached.go | 15 ++--- pkg/cli/{cachedaemon.go => cached.go} | 26 +++++--- pkg/cli/client.go | 20 ++++-- .../{getcache-from-daemon.go => getcached.go} | 17 +++-- pkg/cli/server.go | 2 +- pkg/client/client.go | 59 ++++++++++++++---- pkg/client/context.go | 13 ++++ pkg/server/server.go | 1 - test/shared_test.go | 18 +++--- 15 files changed, 188 insertions(+), 105 deletions(-) rename pkg/cli/{cachedaemon.go => cached.go} (92%) rename pkg/cli/{getcache-from-daemon.go => getcached.go} (61%) diff --git a/Makefile b/Makefile index 67b7b22b..b120fcdc 100644 --- a/Makefile +++ b/Makefile @@ -70,8 +70,6 @@ build: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go internal/pb/cache.pb.go in lint: golangci-lint run - - release/%_linux_amd64: cmd/%/main.go $(PKG_GO_FILES) $(INTERNAL_GO_FILES) go.sum CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build $(BUILD_FLAGS) -o $@ $< @@ -177,10 +175,10 @@ client-getcache: export DL_SKIP_SSL_VERIFICATION=1 client-getcache: go run cmd/client/main.go getcache --host $(GRPC_HOST) --path input/cache -client-getcache-from-daemon: export DL_TOKEN=$(DEV_TOKEN_ADMIN) -client-getcache-from-daemon: export DL_SKIP_SSL_VERIFICATION=1 -client-getcache-from-daemon: - mkdir -p tmp/pods/test-pod/volumes/example && go run cmd/client/main.go getcache-from-daemon --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) input/cache +client-getcached: export DL_TOKEN=$(DEV_TOKEN_ADMIN) +client-getcached: export DL_SKIP_SSL_VERIFICATION=1 +client-getcached: + go run cmd/client/main.go getcached --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) --path input/cache client-gc-contents: export DL_TOKEN=$(DEV_TOKEN_ADMIN) client-gc-contents: export DL_SKIP_SSL_VERIFICATION=1 diff --git a/internal/pb/cache.pb.go b/internal/pb/cache.pb.go index 458fb1c4..09453708 100644 --- a/internal/pb/cache.pb.go +++ b/internal/pb/cache.pb.go @@ -125,16 +125,16 @@ var file_internal_pb_cache_proto_rawDesc = []byte{ 0x19, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x32, 0x59, 0x0a, 0x05, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x50, 0x0a, - 0x11, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, - 0x68, 0x65, 0x12, 0x1c, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, - 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1d, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, - 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, - 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, - 0x64, 0x67, 0x65, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x65, 0x69, 0x6c, 0x61, - 0x67, 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x73, 0x69, 0x6f, 0x6e, 0x32, 0x5a, 0x0a, 0x06, 0x43, 0x61, 0x63, 0x68, 0x65, 0x64, 0x12, 0x50, + 0x0a, 0x11, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, + 0x63, 0x68, 0x65, 0x12, 0x1c, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, + 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, + 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x61, 0x64, 0x67, 0x65, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x65, 0x69, 0x6c, + 0x61, 0x67, 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -155,8 +155,8 @@ var file_internal_pb_cache_proto_goTypes = []interface{}{ (*PopulateDiskCacheResponse)(nil), // 1: pb.PopulateDiskCacheResponse } var file_internal_pb_cache_proto_depIdxs = []int32{ - 0, // 0: pb.Cache.PopulateDiskCache:input_type -> pb.PopulateDiskCacheRequest - 1, // 1: pb.Cache.PopulateDiskCache:output_type -> pb.PopulateDiskCacheResponse + 0, // 0: pb.Cached.PopulateDiskCache:input_type -> pb.PopulateDiskCacheRequest + 1, // 1: pb.Cached.PopulateDiskCache:output_type -> pb.PopulateDiskCacheResponse 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name diff --git a/internal/pb/cache.proto b/internal/pb/cache.proto index df257cd7..c4468590 100644 --- a/internal/pb/cache.proto +++ b/internal/pb/cache.proto @@ -4,9 +4,8 @@ package pb; option go_package = "github.com/gadget-inc/dateilager/pkg/pb"; -service Cache { - rpc PopulateDiskCache(PopulateDiskCacheRequest) - returns (PopulateDiskCacheResponse); +service Cached { + rpc PopulateDiskCache(PopulateDiskCacheRequest) returns (PopulateDiskCacheResponse); } message PopulateDiskCacheRequest { string path = 1; } diff --git a/internal/pb/cache_grpc.pb.go b/internal/pb/cache_grpc.pb.go index 23a7751c..656b7015 100644 --- a/internal/pb/cache_grpc.pb.go +++ b/internal/pb/cache_grpc.pb.go @@ -19,89 +19,89 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Cache_PopulateDiskCache_FullMethodName = "/pb.Cache/PopulateDiskCache" + Cached_PopulateDiskCache_FullMethodName = "/pb.Cached/PopulateDiskCache" ) -// CacheClient is the client API for Cache service. +// CachedClient is the client API for Cached service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type CacheClient interface { +type CachedClient interface { PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) } -type cacheClient struct { +type cachedClient struct { cc grpc.ClientConnInterface } -func NewCacheClient(cc grpc.ClientConnInterface) CacheClient { - return &cacheClient{cc} +func NewCachedClient(cc grpc.ClientConnInterface) CachedClient { + return &cachedClient{cc} } -func (c *cacheClient) PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) { +func (c *cachedClient) PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) { out := new(PopulateDiskCacheResponse) - err := c.cc.Invoke(ctx, Cache_PopulateDiskCache_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Cached_PopulateDiskCache_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -// CacheServer is the server API for Cache service. -// All implementations must embed UnimplementedCacheServer +// CachedServer is the server API for Cached service. +// All implementations must embed UnimplementedCachedServer // for forward compatibility -type CacheServer interface { +type CachedServer interface { PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) - mustEmbedUnimplementedCacheServer() + mustEmbedUnimplementedCachedServer() } -// UnimplementedCacheServer must be embedded to have forward compatible implementations. -type UnimplementedCacheServer struct { +// UnimplementedCachedServer must be embedded to have forward compatible implementations. +type UnimplementedCachedServer struct { } -func (UnimplementedCacheServer) PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) { +func (UnimplementedCachedServer) PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PopulateDiskCache not implemented") } -func (UnimplementedCacheServer) mustEmbedUnimplementedCacheServer() {} +func (UnimplementedCachedServer) mustEmbedUnimplementedCachedServer() {} -// UnsafeCacheServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to CacheServer will +// UnsafeCachedServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CachedServer will // result in compilation errors. -type UnsafeCacheServer interface { - mustEmbedUnimplementedCacheServer() +type UnsafeCachedServer interface { + mustEmbedUnimplementedCachedServer() } -func RegisterCacheServer(s grpc.ServiceRegistrar, srv CacheServer) { - s.RegisterService(&Cache_ServiceDesc, srv) +func RegisterCachedServer(s grpc.ServiceRegistrar, srv CachedServer) { + s.RegisterService(&Cached_ServiceDesc, srv) } -func _Cache_PopulateDiskCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Cached_PopulateDiskCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PopulateDiskCacheRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(CacheServer).PopulateDiskCache(ctx, in) + return srv.(CachedServer).PopulateDiskCache(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Cache_PopulateDiskCache_FullMethodName, + FullMethod: Cached_PopulateDiskCache_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(CacheServer).PopulateDiskCache(ctx, req.(*PopulateDiskCacheRequest)) + return srv.(CachedServer).PopulateDiskCache(ctx, req.(*PopulateDiskCacheRequest)) } return interceptor(ctx, in, info, handler) } -// Cache_ServiceDesc is the grpc.ServiceDesc for Cache service. +// Cached_ServiceDesc is the grpc.ServiceDesc for Cached service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) -var Cache_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "pb.Cache", - HandlerType: (*CacheServer)(nil), +var Cached_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.Cached", + HandlerType: (*CachedServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "PopulateDiskCache", - Handler: _Cache_PopulateDiskCache_Handler, + Handler: _Cached_PopulateDiskCache_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/testutil/context.go b/internal/testutil/context.go index 517b0aaa..238bc2d7 100644 --- a/internal/testutil/context.go +++ b/internal/testutil/context.go @@ -9,6 +9,7 @@ import ( "github.com/gadget-inc/dateilager/internal/db" "github.com/gadget-inc/dateilager/internal/environment" "github.com/gadget-inc/dateilager/pkg/api" + "github.com/gadget-inc/dateilager/pkg/client" "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -90,3 +91,11 @@ func (tc *TestCtx) FsApi() *api.Fs { ContentLookup: tc.ContentLookup(), } } + +func (tc *TestCtx) CachedApi(cl *client.Client, stagingPath string) *api.Cached { + return &api.Cached{ + Env: environment.Test, + Client: cl, + StagingPath: stagingPath, + } +} diff --git a/pkg/api/cached.go b/pkg/api/cached.go index abcc2931..9c833d7b 100644 --- a/pkg/api/cached.go +++ b/pkg/api/cached.go @@ -10,18 +10,24 @@ import ( "path" "time" + "github.com/gadget-inc/dateilager/internal/environment" "github.com/gadget-inc/dateilager/internal/files" "github.com/gadget-inc/dateilager/internal/key" "github.com/gadget-inc/dateilager/internal/logger" "github.com/gadget-inc/dateilager/internal/pb" "github.com/gadget-inc/dateilager/pkg/client" "golang.org/x/sys/unix" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type Cached struct { - pb.UnimplementedCacheServer + pb.UnimplementedCachedServer + + Env environment.Env Client *client.Client StagingPath string + // the current directory holding a fully formed downloaded cache currentDir string // the current version of the cache on disk at currentDir @@ -29,6 +35,10 @@ type Cached struct { } func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCacheRequest) (*pb.PopulateDiskCacheResponse, error) { + if c.Env != environment.Dev && c.Env != environment.Test { + return nil, status.Errorf(codes.Unimplemented, "Cached populateDiskCache only implemented in dev and test environments") + } + err := requireAdminAuth(ctx) if err != nil { return nil, err diff --git a/pkg/cached/cached.go b/pkg/cached/cached.go index 59710c23..3bc36b27 100644 --- a/pkg/cached/cached.go +++ b/pkg/cached/cached.go @@ -10,7 +10,6 @@ import ( "github.com/gadget-inc/dateilager/internal/logger" "github.com/gadget-inc/dateilager/internal/pb" "github.com/gadget-inc/dateilager/pkg/api" - "github.com/gadget-inc/dateilager/pkg/client" "github.com/gadget-inc/dateilager/pkg/server" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" @@ -24,10 +23,9 @@ import ( type CacheServer struct { Grpc *grpc.Server Health *health.Server - Cached *api.Cached } -func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate, stagingPath string, pasetoKey ed25519.PublicKey) *CacheServer { +func NewServer(ctx context.Context, cert *tls.Certificate, pasetoKey ed25519.PublicKey) *CacheServer { creds := credentials.NewServerTLSFromCert(cert) validator := auth.NewAuthValidator(pasetoKey) @@ -53,21 +51,18 @@ func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate healthServer := health.NewServer() healthpb.RegisterHealthServer(grpcServer, healthServer) - cached := &api.Cached{ - Client: client, - StagingPath: stagingPath, - } - pb.RegisterCacheServer(grpcServer, cached) - server := &CacheServer{ Grpc: grpcServer, Health: healthServer, - Cached: cached, } return server } +func (s *CacheServer) RegisterCachedServer(ctx context.Context, cached *api.Cached) { + pb.RegisterCachedServer(s.Grpc, cached) +} + func (s *CacheServer) Serve(lis net.Listener) error { return s.Grpc.Serve(lis) } diff --git a/pkg/cli/cachedaemon.go b/pkg/cli/cached.go similarity index 92% rename from pkg/cli/cachedaemon.go rename to pkg/cli/cached.go index 2e0777c8..6958a31c 100644 --- a/pkg/cli/cachedaemon.go +++ b/pkg/cli/cached.go @@ -15,6 +15,7 @@ import ( "github.com/gadget-inc/dateilager/internal/key" "github.com/gadget-inc/dateilager/internal/logger" "github.com/gadget-inc/dateilager/internal/telemetry" + "github.com/gadget-inc/dateilager/pkg/api" "github.com/gadget-inc/dateilager/pkg/cached" "github.com/gadget-inc/dateilager/pkg/client" "github.com/gadget-inc/dateilager/pkg/version" @@ -109,7 +110,14 @@ func NewCacheDaemonCommand() *cobra.Command { return fmt.Errorf("failed to listen on TCP port %d: %w", port, err) } - s := cached.NewServer(ctx, cl, &cert, stagingPath, pasetoKey) + s := cached.NewServer(ctx, &cert, pasetoKey) + logger.Info(ctx, "register Cached") + cached := &api.Cached{ + Env: env, + Client: cl, + StagingPath: stagingPath, + } + s.RegisterCachedServer(ctx, cached) osSignals := make(chan os.Signal, 1) signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM) @@ -118,12 +126,12 @@ func NewCacheDaemonCommand() *cobra.Command { s.Grpc.GracefulStop() }() - err = s.Cached.Prepare(ctx) + err = cached.Prepare(ctx) if err != nil { return fmt.Errorf("failed to prepare cache daemon in %s: %w", stagingPath, err) } - logger.Info(ctx, "start server", key.Port.Field(port), key.Environment.Field(env.String())) + logger.Info(ctx, "start cached server", key.Port.Field(port), key.Environment.Field(env.String())) return s.Serve(listen) }, PostRunE: func(cmd *cobra.Command, _ []string) error { @@ -146,17 +154,19 @@ func NewCacheDaemonCommand() *cobra.Command { flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)") flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled") flags.StringVar(&profilePath, "profile", "", "CPU profile output path (profiling enabled if set)") - flags.StringVar(&certFile, "cert", "development/server.crt", "TLS cert file") - flags.StringVar(&keyFile, "key", "development/server.key", "TLS key file") - flags.StringVar(&pasetoFile, "paseto", "development/paseto.pub", "Paseto public key file") + + flags.IntVar(&port, "port", 5053, "cache API port") flags.StringVar(&upstreamHost, "upstream-host", "localhost", "GRPC server hostname") flags.Uint16Var(&upstreamPort, "upstream-port", 5051, "GRPC server port") flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections") + flags.StringVar(&certFile, "cert", "development/server.crt", "TLS cert file") + flags.StringVar(&keyFile, "key", "development/server.key", "TLS key file") + flags.StringVar(&pasetoFile, "paseto", "development/paseto.pub", "Paseto public key file") flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)") - flags.IntVar(&port, "port", 5053, "cache API port") - flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches") + flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches") _ = cmd.MarkPersistentFlagRequired("staging-path") + return cmd } diff --git a/pkg/cli/client.go b/pkg/cli/client.go index 21d2dc5f..eb1bfe4e 100644 --- a/pkg/cli/client.go +++ b/pkg/cli/client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "flag" "fmt" + "slices" "strings" "time" @@ -21,8 +22,9 @@ import ( ) var ( - shutdownTelemetry func() - span trace.Span + shutdownTelemetry func() + span trace.Span + requiresCachedClient = []string{"getcached"} ) func NewClientCommand() *cobra.Command { @@ -88,9 +90,16 @@ func NewClientCommand() *cobra.Command { if err != nil { return err } - ctx = client.IntoContext(ctx, cl) + if slices.Contains(requiresCachedClient, cmd.CalledAs()) { + cachedClient, err := client.NewCachedClient(ctx, host, port, client.WithheadlessHost(headlessHost)) + if err != nil { + return err + } + ctx = client.CachedIntoContext(ctx, cachedClient) + } + cmd.SetContext(ctx) return nil @@ -109,11 +118,14 @@ func NewClientCommand() *cobra.Command { flags.AddGoFlag(flag.CommandLine.Lookup("log-level")) flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)") flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled") + flags.StringVar(&otelContext, "otel-context", "", "Open Telemetry context") flags.StringVar(&host, "host", "", "GRPC server hostname") flags.Uint16Var(&port, "port", 5051, "GRPC server port") - flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)") flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections") + flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)") + + _ = cmd.MarkFlagRequired("host") cmd.AddCommand(NewCmdGet()) cmd.AddCommand(NewCmdInspect()) diff --git a/pkg/cli/getcache-from-daemon.go b/pkg/cli/getcached.go similarity index 61% rename from pkg/cli/getcache-from-daemon.go rename to pkg/cli/getcached.go index 54f6ba29..2e810284 100644 --- a/pkg/cli/getcache-from-daemon.go +++ b/pkg/cli/getcached.go @@ -8,14 +8,17 @@ import ( ) func NewCmdGetCacheFromDaemon() *cobra.Command { + var ( + path string + ) + cmd := &cobra.Command{ - Use: "getcache-from-daemon ", - Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs), - RunE: func(cmd *cobra.Command, args []string) error { + Use: "getcached", + RunE: func(cmd *cobra.Command, _ []string) error { ctx := cmd.Context() - c := client.FromContext(ctx) + c := client.CachedFromContext(ctx) - version, err := c.PopulateDiskCache(ctx, args[0]) + version, err := c.PopulateDiskCache(ctx, path) if err != nil { return err } @@ -26,5 +29,9 @@ func NewCmdGetCacheFromDaemon() *cobra.Command { }, } + cmd.Flags().StringVar(&path, "path", "", "Cache directory") + + _ = cmd.MarkFlagRequired("path") + return cmd } diff --git a/pkg/cli/server.go b/pkg/cli/server.go index 94daa53d..c3d856ad 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -131,7 +131,7 @@ func NewServerCommand() *cobra.Command { s.Grpc.GracefulStop() }() - logger.Info(ctx, "start server", key.Port.Field(port), key.Environment.Field(env.String())) + logger.Info(ctx, "start fs server", key.Port.Field(port), key.Environment.Field(env.String())) return s.Serve(listen) }, PostRunE: func(cmd *cobra.Command, _ []string) error { diff --git a/pkg/client/client.go b/pkg/client/client.go index 7da2d7ef..f2587add 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -46,13 +46,21 @@ type VersionRange struct { } type Client struct { - conn *grpc.ClientConn - fs pb.FsClient - cache pb.CacheClient + conn *grpc.ClientConn + fs pb.FsClient +} + +type CachedClient struct { + conn *grpc.ClientConn + cached pb.CachedClient } func NewClientConn(conn *grpc.ClientConn) *Client { - return &Client{conn: conn, fs: pb.NewFsClient(conn), cache: pb.NewCacheClient(conn)} + return &Client{conn: conn, fs: pb.NewFsClient(conn)} +} + +func NewCachedClientConn(conn *grpc.ClientConn) *CachedClient { + return &CachedClient{conn: conn, cached: pb.NewCachedClient(conn)} } type options struct { @@ -72,12 +80,7 @@ func WithheadlessHost(host string) func(*options) { } } -func NewClient(ctx context.Context, host string, port uint16, opts ...func(*options)) (*Client, error) { - ctx, span := telemetry.Start(ctx, "client.new", trace.WithAttributes( - key.Server.Attribute(host), - )) - defer span.End() - +func grpcClientConn(ctx context.Context, host string, port uint16, opts ...func(*options)) (*grpc.ClientConn, error) { pool, err := x509.SystemCertPool() if err != nil { return nil, fmt.Errorf("load system cert pool: %w", err) @@ -116,7 +119,7 @@ func NewClient(ctx context.Context, host string, port uint16, opts ...func(*opti server = fmt.Sprintf("%s:%d", o.headlessHost, port) } - conn, err := grpc.DialContext(connectCtx, server, + return grpc.DialContext(connectCtx, server, grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(auth), grpc.WithReadBufferSize(BUFFER_SIZE), @@ -156,6 +159,15 @@ func NewClient(ctx context.Context, host string, port uint16, opts ...func(*opti } `), ) +} + +func NewClient(ctx context.Context, host string, port uint16, opts ...func(*options)) (*Client, error) { + ctx, span := telemetry.Start(ctx, "client.new", trace.WithAttributes( + key.Server.Attribute(host), + )) + defer span.End() + + conn, err := grpcClientConn(ctx, host, port, opts...) if err != nil { return nil, err } @@ -951,7 +963,28 @@ func (c *Client) CloneToProject(ctx context.Context, source int64, target int64, return &response.LatestVersion, nil } -func (c *Client) PopulateDiskCache(ctx context.Context, destination string) (int64, error) { +func NewCachedClient(ctx context.Context, host string, port uint16, opts ...func(*options)) (*CachedClient, error) { + ctx, span := telemetry.Start(ctx, "cached-client.new", trace.WithAttributes( + key.Server.Attribute(host), + )) + defer span.End() + + conn, err := grpcClientConn(ctx, host, port, opts...) + if err != nil { + return nil, err + } + + return NewCachedClientConn(conn), nil +} + +func (c *CachedClient) Close() { + // Give a chance for the upstream socket to finish writing it's response + // https://github.com/grpc/grpc-go/issues/2869#issuecomment-503310136 + time.Sleep(1 * time.Millisecond) + c.conn.Close() +} + +func (c *CachedClient) PopulateDiskCache(ctx context.Context, destination string) (int64, error) { ctx, span := telemetry.Start(ctx, "client.populate-disk-cache", trace.WithAttributes( key.CachePath.Attribute(destination), )) @@ -961,7 +994,7 @@ func (c *Client) PopulateDiskCache(ctx context.Context, destination string) (int Path: destination, } - response, err := c.cache.PopulateDiskCache(ctx, request) + response, err := c.cached.PopulateDiskCache(ctx, request) if err != nil { return 0, fmt.Errorf("populate disk cache for %s: %w", destination, err) } diff --git a/pkg/client/context.go b/pkg/client/context.go index 4f03ddcd..37d56f37 100644 --- a/pkg/client/context.go +++ b/pkg/client/context.go @@ -5,6 +5,7 @@ import ( ) type clientCtxKey struct{} +type cachedCtxKey struct{} func FromContext(ctx context.Context) *Client { client, ok := ctx.Value(clientCtxKey{}).(*Client) @@ -17,3 +18,15 @@ func FromContext(ctx context.Context) *Client { func IntoContext(ctx context.Context, client *Client) context.Context { return context.WithValue(ctx, clientCtxKey{}, client) } + +func CachedFromContext(ctx context.Context) *CachedClient { + client, ok := ctx.Value(cachedCtxKey{}).(*CachedClient) + if !ok { + return nil + } + return client +} + +func CachedIntoContext(ctx context.Context, client *CachedClient) context.Context { + return context.WithValue(ctx, cachedCtxKey{}, client) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 799e1d54..d14d2bd6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -130,7 +130,6 @@ type Server struct { func NewServer(ctx context.Context, dbConn *DbPoolConnector, cert *tls.Certificate, pasetoKey ed25519.PublicKey) *Server { creds := credentials.NewServerTLSFromCert(cert) - validator := auth.NewAuthValidator(pasetoKey) grpcServer := grpc.NewServer( diff --git a/test/shared_test.go b/test/shared_test.go index 794fd71b..7458b9b3 100644 --- a/test/shared_test.go +++ b/test/shared_test.go @@ -409,8 +409,10 @@ func createTestGRPCServer(tc util.TestCtx, reqAuth auth.Auth) (*bufconn.Listener func createTestClient(tc util.TestCtx) (*client.Client, *api.Fs, func()) { lis, s, getConn := createTestGRPCServer(tc, tc.Context().Value(auth.AuthCtxKey).(auth.Auth)) + fs := tc.FsApi() pb.RegisterFsServer(s, fs) + go func() { err := s.Serve(lis) require.NoError(tc.T(), err, "Server exited") @@ -423,27 +425,23 @@ func createTestClient(tc util.TestCtx) (*client.Client, *api.Fs, func()) { // Make a new client that connects to a test cached server // Under the hood, this creates a test storage server and connects to that -func createTestCachedClient(tc util.TestCtx) (*client.Client, *api.Cached, func()) { +func createTestCachedClient(tc util.TestCtx) (*client.CachedClient, *api.Cached, func()) { lis, s, getConn := createTestGRPCServer(tc, tc.Context().Value(auth.AuthCtxKey).(auth.Auth)) - storageClient, _, closeServerClient := createTestClient(tc) + cl, _, closeClient := createTestClient(tc) stagingPath := emptyTmpDir(tc.T()) - cached := &api.Cached{ - Client: storageClient, - StagingPath: stagingPath, - } - - pb.RegisterCacheServer(s, cached) + cached := tc.CachedApi(cl, stagingPath) + pb.RegisterCachedServer(s, cached) go func() { err := s.Serve(lis) require.NoError(tc.T(), err, "Server exited") }() - c := client.NewClientConn(getConn()) + cachedClient := client.NewCachedClientConn(getConn()) - return c, cached, func() { c.Close(); s.Stop(); closeServerClient() } + return cachedClient, cached, func() { cachedClient.Close(); closeClient(); s.Stop() } } func rebuild(tc util.TestCtx, c *client.Client, project int64, toVersion *int64, dir string, cacheDir *string, expected expectedResponse) {