From fe2959083340bfb9e92be93f65c3387bcda9e223 Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Fri, 22 Nov 2024 16:59:08 +0100 Subject: [PATCH 1/6] Fix sending large packages via grpc between components --- func/internal/podevaluator.go | 28 ++++++++++++++++++++-------- func/server/server.go | 8 ++++++-- func/wrapper-server/main.go | 11 ++++++++--- pkg/apiserver/apiserver.go | 3 ++- pkg/cmd/server/start.go | 7 ++++--- pkg/engine/clone_test.go | 2 +- pkg/engine/grpcruntime.go | 10 ++++++++-- pkg/engine/options.go | 4 ++-- 8 files changed, 51 insertions(+), 22 deletions(-) diff --git a/func/internal/podevaluator.go b/func/internal/podevaluator.go index e4b7368b..2085b186 100644 --- a/func/internal/podevaluator.go +++ b/func/internal/podevaluator.go @@ -87,6 +87,7 @@ func NewPodEvaluator( registryAuthSecretName string, enablePrivateRegistriesTls bool, tlsSecretPath string, + maxGrpcMessageSize int, ) (Evaluator, error) { restCfg, err := config.GetConfig() @@ -137,6 +138,7 @@ func NewPodEvaluator( functionPodTemplateName: functionPodTemplateName, podReadyTimeout: 60 * time.Second, managerNamespace: managerNs, + maxGrpcMessageSize: maxGrpcMessageSize, }, }, } @@ -154,9 +156,9 @@ func NewPodEvaluator( func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.EvaluateFunctionRequest) (*evaluator.EvaluateFunctionResponse, error) { starttime := time.Now() defer func() { - klog.Infof("evaluating %v in pod took %v", req.Image, time.Now().Sub(starttime)) + klog.Infof("evaluating %v in pod took %v", req.Image, time.Since(starttime)) }() - // make a buffer for the channel to prevent unnecessary blocking when the pod cache manager sends it to multiple waiting gorouthine in batch. + // make a buffer for the channel to prevent unnecessary blocking when the pod cache manager sends it to multiple waiting goroutine in batch. ccChan := make(chan *clientConnAndError, 1) // Send a request to request a grpc client. pe.requestCh <- &clientConnRequest{ @@ -207,7 +209,7 @@ type podCacheManager struct { // cache is a mapping from image name to . cache map[string]*podAndGRPCClient // waitlists is a mapping from image name to a list of channels that are - // waiting for the GPRC client connections. + // waiting for the GRPC client connections. waitlists map[string][]chan<- *clientConnAndError podManager *podManager @@ -240,7 +242,7 @@ type imagePodAndGRPCClient struct { func (pcm *podCacheManager) warmupCache(podTTLConfig string) error { start := time.Now() defer func() { - klog.Infof("cache warning is completed and it took %v", time.Now().Sub(start)) + klog.Infof("cache warning is completed and it took %v", time.Since(start)) }() content, err := os.ReadFile(podTTLConfig) if err != nil { @@ -303,7 +305,7 @@ func (pcm *podCacheManager) podCacheManager() { case req := <-pcm.requestCh: podAndCl, found := pcm.cache[req.image] if found && podAndCl != nil { - // Ensure the pod still exists and is not being deleted before sending the gprc client back to the channel. + // Ensure the pod still exists and is not being deleted before sending the grpc client back to the channel. // We can't simply return grpc client from the cache and let evaluator try to connect to the pod. // If the pod is deleted by others, it will take ~10 seconds for the evaluator to fail. // Wasting 10 second is so much, so we check if the pod still exist first. @@ -390,7 +392,7 @@ func (pcm *podCacheManager) garbageCollector() { go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, client.ObjectKeyFromObject(&pod), pcm.podTTL) continue } - // If the current time is after the reclaim-ater annotation in the pod, we delete the pod and remove the corresponding cache entry. + // If the current time is after the reclaim-later annotation in the pod, we delete the pod and remove the corresponding cache entry. if time.Now().After(time.Unix(reclaimAfter, 0)) { podIP := pod.Status.PodIP go func(po corev1.Pod) { @@ -453,6 +455,9 @@ type podManager struct { // of the main container, which must be called "function". // Pod manager will replace the image functionPodTemplateName string + + // The maximum size of grpc messages sent to KRM function evaluator pods + maxGrpcMessageSize int } type digestAndEntrypoint struct { @@ -481,7 +486,13 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, tt return nil, fmt.Errorf("pod %s/%s did not have podIP", podKey.Namespace, podKey.Name) } address := net.JoinHostPort(podIP, defaultWrapperServerPort) - cc, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial(address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(pm.maxGrpcMessageSize), + grpc.MaxCallSendMsgSize(pm.maxGrpcMessageSize), + ), + ) if err != nil { return nil, fmt.Errorf("failed to dial grpc function evaluator on %q for pod %s/%s: %w", address, podKey.Namespace, podKey.Name, err) } @@ -902,6 +913,7 @@ func (pm *podManager) patchNewPodContainer(pod *corev1.Pod, de digestAndEntrypoi if container.Name == functionContainerName { container.Args = append(container.Args, "--port", defaultWrapperServerPort, + "--max-request-body-size", strconv.Itoa(pm.maxGrpcMessageSize), "--", ) container.Args = append(container.Args, de.entrypoint...) @@ -954,7 +966,7 @@ func (pm *podManager) podIpIfRunningAndReady(ctx context.Context, podKey client. } return false, nil }); e != nil { - return "", fmt.Errorf("error occured when waiting the pod to be ready. If the error is caused by timeout, you may want to examine the pods in namespace %q. Error: %w", pm.namespace, e) + return "", fmt.Errorf("error occurred when waiting the pod to be ready. If the error is caused by timeout, you may want to examine the pods in namespace %q. Error: %w", pm.namespace, e) } return pod.Status.PodIP, nil } diff --git a/func/server/server.go b/func/server/server.go index b722d8ea..8010902b 100644 --- a/func/server/server.go +++ b/func/server/server.go @@ -52,6 +52,7 @@ var ( scanInterval = flag.Duration("scan-interval", time.Minute, "The interval of GC between scans.") disableRuntimes = flag.String("disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) functionPodTemplateName = flag.String("function-pod-template", "", "Configmap that contains a pod specification") + maxGrpcMessageSize = flag.Int("max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes.") ) func main() { @@ -94,7 +95,7 @@ func run() error { if wrapperServerImage == "" { return fmt.Errorf("environment variable %v must be set to use pod function evaluator runtime", wrapperServerImageEnv) } - podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName, *enablePrivateRegistries, *registryAuthSecretPath, *registryAuthSecretName, *enablePrivateRegistriesTls, *tlsSecretPath) + podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName, *enablePrivateRegistries, *registryAuthSecretPath, *registryAuthSecretName, *enablePrivateRegistriesTls, *tlsSecretPath, *maxGrpcMessageSize) if err != nil { return fmt.Errorf("failed to initialize pod evaluator: %w", err) } @@ -109,7 +110,10 @@ func run() error { klog.Infof("Listening on %s", address) // Start the gRPC server - server := grpc.NewServer() + server := grpc.NewServer( + grpc.MaxRecvMsgSize(*maxGrpcMessageSize), + grpc.MaxSendMsgSize(*maxGrpcMessageSize), + ) pb.RegisterFunctionEvaluatorServer(server, evaluator) healthService := healthchecker.NewHealthChecker() grpc_health_v1.RegisterHealthServer(server, healthService) diff --git a/func/wrapper-server/main.go b/func/wrapper-server/main.go index fda99d80..065d9b6d 100644 --- a/func/wrapper-server/main.go +++ b/func/wrapper-server/main.go @@ -48,6 +48,7 @@ func main() { }, } cmd.Flags().IntVar(&op.port, "port", 9446, "The server port") + cmd.Flags().IntVar(&op.maxGrpcMessageSize, "max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes.") if err := cmd.Execute(); err != nil { fmt.Fprintf(os.Stderr, "unexpected error: %v\n", err) os.Exit(1) @@ -55,8 +56,9 @@ func main() { } type options struct { - port int - entrypoint []string + port int + maxGrpcMessageSize int + entrypoint []string } func (o *options) run() error { @@ -73,7 +75,10 @@ func (o *options) run() error { klog.Infof("Listening on %s", address) // Start the gRPC server - server := grpc.NewServer() + server := grpc.NewServer( + grpc.MaxRecvMsgSize(o.maxGrpcMessageSize), + grpc.MaxSendMsgSize(o.maxGrpcMessageSize), + ) pb.RegisterFunctionEvaluatorServer(server, evaluator) healthService := healthchecker.NewHealthChecker() grpc_health_v1.RegisterHealthServer(server, healthService) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 08b9accf..e1c38b40 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -81,6 +81,7 @@ type ExtraConfig struct { DefaultImagePrefix string RepoSyncFrequency time.Duration UseGitCaBundle bool + MaxGrpcMessageSize int } // Config defines the config for the apiserver @@ -247,7 +248,7 @@ func (c completedConfig) New() (*PorchServer, error) { // evaluating a function, the runtimes will be tried in the same // order as they are registered. engine.WithBuiltinFunctionRuntime(), - engine.WithGRPCFunctionRuntime(c.ExtraConfig.FunctionRunnerAddress), + engine.WithGRPCFunctionRuntime(c.ExtraConfig.FunctionRunnerAddress, c.ExtraConfig.MaxGrpcMessageSize), engine.WithCredentialResolver(credentialResolver), engine.WithRunnerOptionsResolver(runnerOptionsResolver), engine.WithReferenceResolver(referenceResolver), diff --git a/pkg/cmd/server/start.go b/pkg/cmd/server/start.go index 8a495544..01b98275 100644 --- a/pkg/cmd/server/start.go +++ b/pkg/cmd/server/start.go @@ -58,7 +58,7 @@ type PorchServerOptions struct { RepoSyncFrequency time.Duration UseGitCaBundle bool DisableValidatingAdmissionPolicy bool - MaxRequestBodySize int64 + MaxRequestBodySize int SharedInformerFactory informers.SharedInformerFactory StdOut io.Writer @@ -185,7 +185,7 @@ func (o *PorchServerOptions) Config() (*apiserver.Config, error) { serverConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(sampleopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme)) serverConfig.OpenAPIConfig.Info.Title = OpenAPITitle serverConfig.OpenAPIConfig.Info.Version = OpenAPIVersion - serverConfig.MaxRequestBodyBytes = o.MaxRequestBodySize + serverConfig.MaxRequestBodyBytes = int64(o.MaxRequestBodySize) if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { return nil, err @@ -200,6 +200,7 @@ func (o *PorchServerOptions) Config() (*apiserver.Config, error) { FunctionRunnerAddress: o.FunctionRunnerAddress, DefaultImagePrefix: o.DefaultImagePrefix, UseGitCaBundle: o.UseGitCaBundle, + MaxGrpcMessageSize: o.MaxRequestBodySize, }, } return config, nil @@ -245,7 +246,7 @@ func (o *PorchServerOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.FunctionRunnerAddress, "function-runner", "", "Address of the function runner gRPC service.") fs.StringVar(&o.DefaultImagePrefix, "default-image-prefix", "gcr.io/kpt-fn/", "Default prefix for unqualified function names") fs.StringVar(&o.CacheDirectory, "cache-directory", "", "Directory where Porch server stores repository and package caches.") - fs.Int64Var(&o.MaxRequestBodySize, "max-request-body-size", 6*1024*1024, "Maximum size of the request body in bytes.") + fs.IntVar(&o.MaxRequestBodySize, "max-request-body-size", 6*1024*1024, "Maximum size of the request body in bytes.") fs.BoolVar(&o.UseGitCaBundle, "use-git-cabundle", false, "Determine whether to use a user-defined CaBundle for TLS towards git.") fs.BoolVar(&o.DisableValidatingAdmissionPolicy, "disable-validating-admissions-policy", true, "Determine whether to (dis|en)able the Validating Admission Policy, which requires k8s version >= v1.30") fs.DurationVar(&o.RepoSyncFrequency, "repo-sync-frequency", 10*time.Minute, "Frequency in seconds at which registered repositories will be synced and the background job repository refresh runs.") diff --git a/pkg/engine/clone_test.go b/pkg/engine/clone_test.go index fc715260..e9930ce5 100644 --- a/pkg/engine/clone_test.go +++ b/pkg/engine/clone_test.go @@ -159,7 +159,7 @@ func startGitServer(t *testing.T, repo *git.Repo, _ ...git.GitServerOption) stri return fmt.Sprintf("http://%s/%s", address, key) } -// TODO(mortent): See if we can restruture the packages to +// TODO(mortent): See if we can restructure the packages to // avoid having to create separate implementations of the auth // interfaces here. type credentialResolver struct { diff --git a/pkg/engine/grpcruntime.go b/pkg/engine/grpcruntime.go index 7032de66..b21de2e6 100644 --- a/pkg/engine/grpcruntime.go +++ b/pkg/engine/grpcruntime.go @@ -33,14 +33,20 @@ type grpcRuntime struct { client evaluator.FunctionEvaluatorClient } -func newGRPCFunctionRuntime(address string) (*grpcRuntime, error) { +func newGRPCFunctionRuntime(address string, maxGrpcMessageSize int) (*grpcRuntime, error) { if address == "" { return nil, fmt.Errorf("address is required to instantiate gRPC function runtime") } klog.Infof("Dialing grpc function runner %q", address) - cc, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial(address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(maxGrpcMessageSize), + grpc.MaxCallSendMsgSize(maxGrpcMessageSize), + ), + ) if err != nil { return nil, fmt.Errorf("failed to dial grpc function evaluator: %w", err) } diff --git a/pkg/engine/options.go b/pkg/engine/options.go index 18f33e93..5c25448d 100644 --- a/pkg/engine/options.go +++ b/pkg/engine/options.go @@ -58,9 +58,9 @@ func WithBuiltinFunctionRuntime() EngineOption { }) } -func WithGRPCFunctionRuntime(address string) EngineOption { +func WithGRPCFunctionRuntime(address string, maxGrpcMessageSize int) EngineOption { return EngineOptionFunc(func(engine *cadEngine) error { - runtime, err := newGRPCFunctionRuntime(address) + runtime, err := newGRPCFunctionRuntime(address, maxGrpcMessageSize) if err != nil { return fmt.Errorf("failed to create function runtime: %w", err) } From 3dd19d691ffb21f9e3cf2e80dc3fb56617ec1411 Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Fri, 22 Nov 2024 18:50:39 +0100 Subject: [PATCH 2/6] Refactor unnecessarily long argument lists --- func/internal/executableevaluator.go | 17 ++- func/internal/podevaluator.go | 131 +++++++++--------- func/internal/podevaluator_podmanager_test.go | 13 +- func/server/server.go | 70 ++++++---- 4 files changed, 133 insertions(+), 98 deletions(-) diff --git a/func/internal/executableevaluator.go b/func/internal/executableevaluator.go index b4ea8198..aeed82f9 100644 --- a/func/internal/executableevaluator.go +++ b/func/internal/executableevaluator.go @@ -31,6 +31,11 @@ import ( "k8s.io/klog/v2" ) +type ExecutableEvaluatorOptions struct { + ConfigFileName string // Path to the config file + FunctionCacheDir string // Path to cached functions +} + type executableEvaluator struct { // Fast-path function cache cache map[string]string @@ -47,17 +52,17 @@ type function struct { var _ Evaluator = &executableEvaluator{} -func NewExecutableEvaluator(functions string, config string) (Evaluator, error) { +func NewExecutableEvaluator(o ExecutableEvaluatorOptions) (Evaluator, error) { cache := map[string]string{} - if config != "" { - bytes, err := os.ReadFile(config) + if o.ConfigFileName != "" { + bytes, err := os.ReadFile(o.ConfigFileName) if err != nil { - return nil, fmt.Errorf("failed to read configuration file %q: %w", config, err) + return nil, fmt.Errorf("failed to read configuration file %q: %w", o.ConfigFileName, err) } var cfg configuration if err := yaml.Unmarshal(bytes, &cfg); err != nil { - return nil, fmt.Errorf("failed to parse configuration file %q: %w", config, err) + return nil, fmt.Errorf("failed to parse configuration file %q: %w", o.ConfigFileName, err) } for _, fn := range cfg.Functions { @@ -65,7 +70,7 @@ func NewExecutableEvaluator(functions string, config string) (Evaluator, error) if _, exists := cache[img]; exists { klog.Warningf("Ignoring duplicate image %q (%s)", img, fn.Function) } else { - abs, err := filepath.Abs(filepath.Join(functions, fn.Function)) + abs, err := filepath.Abs(filepath.Join(o.FunctionCacheDir, fn.Function)) if err != nil { return nil, fmt.Errorf("failed to determine path to the cached function %q: %w", img, err) } diff --git a/func/internal/podevaluator.go b/func/internal/podevaluator.go index 2085b186..31673e91 100644 --- a/func/internal/podevaluator.go +++ b/func/internal/podevaluator.go @@ -73,22 +73,24 @@ type podEvaluator struct { podCacheManager *podCacheManager } +type PodEvaluatorOptions struct { + PodNamespace string // Namespace to run KRM functions pods in + WrapperServerImage string // Container image name of the wrapper server + GcScanInterval time.Duration // Time interval between Garbage Collector scans + PodTTL time.Duration // Time-to-live for pods before GC + PodCacheConfigFileName string // Path to the pod cache config file. The file is map of function name to TTL. + FunctionPodTemplateName string // Configmap that contains a pod specification + EnablePrivateRegistries bool // If true enables the use of private registries and their authentication + RegistryAuthSecretPath string // The path of the secret used for authenticating to custom registries + RegistryAuthSecretName string // The name of the secret used for authenticating to custom registries + EnablePrivateRegistriesTls bool // If enabled, will prioritize use of user provided TLS secret when accessing registries + TlsSecretPath string // The path of the secret used in tls configuration + MaxGrpcMessageSize int // Maximum size of grpc messages in bytes +} + var _ Evaluator = &podEvaluator{} -func NewPodEvaluator( - namespace, - wrapperServerImage string, - interval, - ttl time.Duration, - podTTLConfig string, - functionPodTemplateName string, - enablePrivateRegistries bool, - registryAuthSecretPath string, - registryAuthSecretName string, - enablePrivateRegistriesTls bool, - tlsSecretPath string, - maxGrpcMessageSize int, -) (Evaluator, error) { +func NewPodEvaluator(o PodEvaluatorOptions) (Evaluator, error) { restCfg, err := config.GetConfig() if err != nil { @@ -118,34 +120,35 @@ func NewPodEvaluator( pe := &podEvaluator{ requestCh: reqCh, podCacheManager: &podCacheManager{ - gcScanInternal: interval, - podTTL: ttl, - enablePrivateRegistries: enablePrivateRegistries, - registryAuthSecretPath: registryAuthSecretPath, - registryAuthSecretName: registryAuthSecretName, - enablePrivateRegistriesTls: enablePrivateRegistriesTls, - tlsSecretPath: tlsSecretPath, - requestCh: reqCh, - podReadyCh: readyCh, - cache: map[string]*podAndGRPCClient{}, - waitlists: map[string][]chan<- *clientConnAndError{}, + gcScanInterval: o.GcScanInterval, + podTTL: o.PodTTL, + requestCh: reqCh, + podReadyCh: readyCh, + cache: map[string]*podAndGRPCClient{}, + waitlists: map[string][]chan<- *clientConnAndError{}, podManager: &podManager{ kubeClient: cl, - namespace: namespace, - wrapperServerImage: wrapperServerImage, + namespace: o.PodNamespace, + wrapperServerImage: o.WrapperServerImage, podReadyCh: readyCh, - functionPodTemplateName: functionPodTemplateName, + functionPodTemplateName: o.FunctionPodTemplateName, podReadyTimeout: 60 * time.Second, managerNamespace: managerNs, - maxGrpcMessageSize: maxGrpcMessageSize, + maxGrpcMessageSize: o.MaxGrpcMessageSize, + + enablePrivateRegistries: o.EnablePrivateRegistries, + registryAuthSecretPath: o.RegistryAuthSecretPath, + registryAuthSecretName: o.RegistryAuthSecretName, + enablePrivateRegistriesTls: o.EnablePrivateRegistriesTls, + tlsSecretPath: o.TlsSecretPath, }, }, } go pe.podCacheManager.podCacheManager() // TODO(mengqiy): add watcher that support reloading the cache when the config file was changed. - err = pe.podCacheManager.warmupCache(podTTLConfig) + err = pe.podCacheManager.warmupCache(o.PodCacheConfigFileName) // If we can't warm up the cache, we can still proceed without it. if err != nil { klog.Warningf("unable to warm up the pod cache: %v", err) @@ -191,16 +194,9 @@ func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.Eva // It also listens to the podReadyCh channel. If a pod is ready, it notifies the // goroutines by sending back the GRPC client by lookup the waitlists mapping. type podCacheManager struct { - gcScanInternal time.Duration + gcScanInterval time.Duration podTTL time.Duration - enablePrivateRegistries bool - registryAuthSecretPath string - registryAuthSecretName string - - enablePrivateRegistriesTls bool - tlsSecretPath string - // requestCh is a receive-only channel to receive requestCh <-chan *clientConnRequest // podReadyCh is a channel to receive the information when a pod is ready. @@ -269,7 +265,7 @@ func (pcm *podCacheManager) warmupCache(podTTLConfig string) error { // We invoke the function with useGenerateName=false so that the pod name is fixed, // since we want to ensure only one pod is created for each function. - pcm.podManager.getFuncEvalPodClient(ctx, fnImage, ttl, false, pcm.enablePrivateRegistries, pcm.registryAuthSecretPath, pcm.registryAuthSecretName, pcm.enablePrivateRegistriesTls, pcm.tlsSecretPath) + pcm.podManager.getFuncEvalPodClient(ctx, fnImage, ttl, false) klog.Infof("preloaded pod cache for function %v", fnImage) }) @@ -299,7 +295,7 @@ func forEachConcurrently(m map[string]string, fn func(k string, v string)) { // We must run this method in one single goroutine. Doing it this way simplify // design around concurrency. func (pcm *podCacheManager) podCacheManager() { - tick := time.Tick(pcm.gcScanInternal) + tick := time.Tick(pcm.gcScanInterval) for { select { case req := <-pcm.requestCh: @@ -337,7 +333,7 @@ func (pcm *podCacheManager) podCacheManager() { pcm.waitlists[req.image] = append(list, req.grpcClientCh) // We invoke the function with useGenerateName=true to avoid potential name collision, since if pod foo is // being deleted and we can't use the same name. - go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image, pcm.podTTL, true, pcm.enablePrivateRegistries, pcm.registryAuthSecretPath, pcm.registryAuthSecretName, pcm.enablePrivateRegistriesTls, pcm.tlsSecretPath) + go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image, pcm.podTTL, true) case resp := <-pcm.podReadyCh: if resp.err != nil { klog.Warningf("received error from the pod manager: %v", resp.err) @@ -458,6 +454,17 @@ type podManager struct { // The maximum size of grpc messages sent to KRM function evaluator pods maxGrpcMessageSize int + + // If true enables the use of private registries and their authentication + enablePrivateRegistries bool + // The path of the secret used for authenticating to custom registries + registryAuthSecretPath string + // The name of the secret used for authenticating to custom registries + registryAuthSecretName string + // If enabled, will prioritize use of user provided TLS secret when accessing registries + enablePrivateRegistriesTls bool + // The path of the secret used in tls configuration + tlsSecretPath string } type digestAndEntrypoint struct { @@ -472,9 +479,9 @@ type digestAndEntrypoint struct { // time-to-live period for the pod. If useGenerateName is false, it will try to // create a pod with a fixed name. Otherwise, it will create a pod and let the // apiserver to generate the name from a template. -func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, enablePrivateRegistries bool, registryAuthSecretPath string, registryAuthSecretName string, enablePrivateRegistriesTls bool, tlsSecretPath string) { +func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, ttl time.Duration, useGenerateName bool) { c, err := func() (*podAndGRPCClient, error) { - podKey, err := pm.retrieveOrCreatePod(ctx, image, ttl, useGenerateName, enablePrivateRegistries, registryAuthSecretPath, registryAuthSecretName, enablePrivateRegistriesTls, tlsSecretPath) + podKey, err := pm.retrieveOrCreatePod(ctx, image, ttl, useGenerateName) if err != nil { return nil, err } @@ -572,7 +579,7 @@ type DockerConfig struct { } // imageDigestAndEntrypoint gets the entrypoint of a container image by looking at its metadata. -func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string, enablePrivateRegistries bool, registryAuthSecretPath string, registryAuthSecretName string, enablePrivateRegistriesTls bool, tlsSecretPath string) (*digestAndEntrypoint, error) { +func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string) (*digestAndEntrypoint, error) { start := time.Now() defer func() { klog.Infof("getting image metadata for %v took %v", image, time.Since(start)) @@ -585,12 +592,12 @@ func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string } var auth authn.Authenticator - if enablePrivateRegistries && !strings.HasPrefix(image, defaultRegistry) { - if err := pm.ensureCustomAuthSecret(ctx, registryAuthSecretPath, registryAuthSecretName); err != nil { + if pm.enablePrivateRegistries && !strings.HasPrefix(image, defaultRegistry) { + if err := pm.ensureCustomAuthSecret(ctx, pm.registryAuthSecretPath, pm.registryAuthSecretName); err != nil { return nil, err } - auth, err = pm.getCustomAuth(ref, registryAuthSecretPath) + auth, err = pm.getCustomAuth(ref, pm.registryAuthSecretPath) if err != nil { return nil, err } @@ -602,7 +609,7 @@ func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string } } - return pm.getImageMetadata(ctx, ref, auth, image, enablePrivateRegistries, enablePrivateRegistriesTls, tlsSecretPath) + return pm.getImageMetadata(ctx, ref, auth, image) } // ensureCustomAuthSecret ensures that, if an image from a custom registry is requested, the appropriate credentials are passed into a secret for function pods to use when pulling. If the secret does not already exist, it is created. @@ -631,8 +638,8 @@ func (pm *podManager) getCustomAuth(ref name.Reference, registryAuthSecretPath s } // getImageMetadata retrieves the image digest and entrypoint. -func (pm *podManager) getImageMetadata(ctx context.Context, ref name.Reference, auth authn.Authenticator, image string, enablePrivateRegistries bool, enablePrivateRegistriesTls bool, tlsSecretPath string) (*digestAndEntrypoint, error) { - img, err := getImage(ctx, ref, auth, image, enablePrivateRegistries, enablePrivateRegistriesTls, tlsSecretPath) +func (pm *podManager) getImageMetadata(ctx context.Context, ref name.Reference, auth authn.Authenticator, image string) (*digestAndEntrypoint, error) { + img, err := pm.getImage(ctx, ref, auth, image) if err != nil { return nil, err } @@ -658,24 +665,24 @@ func (pm *podManager) getImageMetadata(ctx context.Context, ref name.Reference, return de, nil } -func getImage(ctx context.Context, ref name.Reference, auth authn.Authenticator, image string, enablePrivateRegistries bool, enablePrivateRegistriesTls bool, tlsSecretPath string) (containerregistry.Image, error) { +func (pm *podManager) getImage(ctx context.Context, ref name.Reference, auth authn.Authenticator, image string) (containerregistry.Image, error) { // if private registries or their appropriate tls configuration are disabled in the config we pull image with default operation otherwise try and use their tls cert's - if !enablePrivateRegistries || strings.HasPrefix(image, defaultRegistry) || !enablePrivateRegistriesTls { + if !pm.enablePrivateRegistries || strings.HasPrefix(image, defaultRegistry) || !pm.enablePrivateRegistriesTls { return remote.Image(ref, remote.WithAuth(auth), remote.WithContext(ctx)) } tlsFile := "ca.crt" // Check if mounted secret location contains CA file. - if _, err := os.Stat(tlsSecretPath); os.IsNotExist(err) { + if _, err := os.Stat(pm.tlsSecretPath); os.IsNotExist(err) { return nil, err } - if _, errCRT := os.Stat(filepath.Join(tlsSecretPath, "ca.crt")); os.IsNotExist(errCRT) { - if _, errPEM := os.Stat(filepath.Join(tlsSecretPath, "ca.pem")); os.IsNotExist(errPEM) { + if _, errCRT := os.Stat(filepath.Join(pm.tlsSecretPath, "ca.crt")); os.IsNotExist(errCRT) { + if _, errPEM := os.Stat(filepath.Join(pm.tlsSecretPath, "ca.pem")); os.IsNotExist(errPEM) { return nil, fmt.Errorf("ca.crt not found: %v, and ca.pem also not found: %v", errCRT, errPEM) } tlsFile = "ca.pem" } // Load the custom TLS configuration - tlsConfig, err := loadTLSConfig(filepath.Join(tlsSecretPath, tlsFile)) + tlsConfig, err := loadTLSConfig(filepath.Join(pm.tlsSecretPath, tlsFile)) if err != nil { return nil, err } @@ -718,14 +725,14 @@ func createTransport(tlsConfig *tls.Config) *http.Transport { } // retrieveOrCreatePod retrieves or creates a pod for an image. -func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, enablePrivateRegistries bool, registryAuthSecretPath string, registryAuthSecretName string, enablePrivateRegistriesTls bool, tlsSecretPath string) (client.ObjectKey, error) { +func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool) (client.ObjectKey, error) { var de *digestAndEntrypoint var replacePod bool var currentPod *corev1.Pod var err error val, found := pm.imageMetadataCache.Load(image) if !found { - de, err = pm.imageDigestAndEntrypoint(ctx, image, enablePrivateRegistries, registryAuthSecretPath, registryAuthSecretName, enablePrivateRegistriesTls, tlsSecretPath) + de, err = pm.imageDigestAndEntrypoint(ctx, image) if err != nil { return client.ObjectKey{}, fmt.Errorf("unable to get the entrypoint for %v: %w", image, err) } @@ -748,7 +755,7 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl klog.Errorf("failed to generate a base pod template: %v", err) return client.ObjectKey{}, fmt.Errorf("failed to generate a base pod template: %w", err) } - pm.appendImagePullSecret(image, enablePrivateRegistries, registryAuthSecretName, podTemplate) + pm.appendImagePullSecret(image, podTemplate) err = pm.kubeClient.List(ctx, podList, client.InNamespace(pm.namespace), client.MatchingLabels(map[string]string{krmFunctionLabel: podId})) if err != nil { klog.Warningf("error when listing pods for %q: %v", image, err) @@ -897,10 +904,10 @@ func (pm *podManager) getBasePodTemplate(ctx context.Context) (*corev1.Pod, stri } // if a custom image is requested, use the secret provided to authenticate -func (pm *podManager) appendImagePullSecret(image string, enablePrivateRegistries bool, registryAuthSecretName string, podTemplate *corev1.Pod) { - if enablePrivateRegistries && !strings.HasPrefix(image, defaultRegistry) { +func (pm *podManager) appendImagePullSecret(image string, podTemplate *corev1.Pod) { + if pm.enablePrivateRegistries && !strings.HasPrefix(image, defaultRegistry) { podTemplate.Spec.ImagePullSecrets = []corev1.LocalObjectReference{ - {Name: registryAuthSecretName}, + {Name: pm.registryAuthSecretName}, } } } diff --git a/func/internal/podevaluator_podmanager_test.go b/func/internal/podevaluator_podmanager_test.go index f85724b2..43e22d66 100644 --- a/func/internal/podevaluator_podmanager_test.go +++ b/func/internal/podevaluator_podmanager_test.go @@ -635,6 +635,15 @@ func TestPodManager(t *testing.T) { podReadyTimeout: 1 * time.Second, functionPodTemplateName: tt.functionPodTemplateName, managerNamespace: tt.managerNamespace, + + maxGrpcMessageSize: 4 * 1024 * 1024, + + enablePrivateRegistries: false, + registryAuthSecretPath: "/var/tmp/config-secret/.dockerconfigjson", + registryAuthSecretName: "auth-secret", + + enablePrivateRegistriesTls: false, + tlsSecretPath: "/var/tmp/tls-secret/", } for k, v := range tt.imageMetadataCache { @@ -644,7 +653,7 @@ func TestPodManager(t *testing.T) { fakeServer.evalFunc = tt.evalFunc //Execute the function under test - go pm.getFuncEvalPodClient(ctx, tt.functionImage, time.Hour, tt.useGenerateName, false, "/var/tmp/config-secret/.dockerconfigjson", "auth-secret", false, "/var/tmp/tls-secret/") + go pm.getFuncEvalPodClient(ctx, tt.functionImage, time.Hour, tt.useGenerateName) if tt.podPatch != nil { go func() { @@ -683,7 +692,7 @@ func TestPodManager(t *testing.T) { } if !strings.HasPrefix(pod.Labels[krmFunctionLabel], tt.functionImage) { - t.Errorf("Expected pod to have label starting wiht %s, got %s", tt.functionImage, pod.Labels[krmFunctionLabel]) + t.Errorf("Expected pod to have label starting with %s, got %s", tt.functionImage, pod.Labels[krmFunctionLabel]) } if pod.Spec.Containers[0].Image != tt.functionImage { t.Errorf("Expected pod to have image %s, got %s", tt.functionImage, pod.Spec.Containers[0].Image) diff --git a/func/server/server.go b/func/server/server.go index 8010902b..56513f91 100644 --- a/func/server/server.go +++ b/func/server/server.go @@ -37,35 +37,49 @@ const ( wrapperServerImageEnv = "WRAPPER_SERVER_IMAGE" ) -var ( - port = flag.Int("port", 9445, "The server port") - functions = flag.String("functions", "./functions", "Path to cached functions.") - config = flag.String("config", "./config.yaml", "Path to the config file.") - enablePrivateRegistries = flag.Bool("enable-private-registries", false, "if true enables the use of private registries and their authentication") - registryAuthSecretPath = flag.String("registry-auth-secret-path", "/var/tmp/config-secret/.dockerconfigjson", "The path of the secret used for authenticating to custom registries") - registryAuthSecretName = flag.String("registry-auth-secret-name", "auth-secret", "The name of the secret used for authenticating to custom registries") - enablePrivateRegistriesTls = flag.Bool("enable-private-registries-tls", false, "if enabled, will prioritize use of user provided TLS secret when accessing registries") - tlsSecretPath = flag.String("tls-secret-path", "/var/tmp/tls-secret/", "The path of the secret used in tls configuration") - podCacheConfig = flag.String("pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.") - podNamespace = flag.String("pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.") - podTTL = flag.Duration("pod-ttl", 30*time.Minute, "TTL for pods before GC.") - scanInterval = flag.Duration("scan-interval", time.Minute, "The interval of GC between scans.") - disableRuntimes = flag.String("disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) - functionPodTemplateName = flag.String("function-pod-template", "", "Configmap that contains a pod specification") - maxGrpcMessageSize = flag.Int("max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes.") -) +type options struct { + // The server port + port int + // The runtime(s) to disable. Multiple runtimes should separated by `,`. + disableRuntimes string + + // Parameters of ExecEvaluator + exec internal.ExecutableEvaluatorOptions + // Parameters of PodEvaluator + pod internal.PodEvaluatorOptions +} func main() { + o := &options{} + // generic flags + flag.IntVar(&o.port, "port", 9445, "The server port") + flag.StringVar(&o.disableRuntimes, "disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) + // flags for the exec runtime + flag.StringVar(&o.exec.FunctionCacheDir, "functions", "./functions", "Path to cached functions.") + flag.StringVar(&o.exec.ConfigFileName, "config", "./config.yaml", "Path to the config file of the exec runtime.") + // flags for the pod runtime + flag.StringVar(&o.pod.PodCacheConfigFileName, "pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.") + flag.StringVar(&o.pod.PodNamespace, "pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.") + flag.DurationVar(&o.pod.PodTTL, "pod-ttl", 30*time.Minute, "TTL for pods before GC.") + flag.DurationVar(&o.pod.GcScanInterval, "scan-interval", time.Minute, "The interval of GC between scans.") + flag.StringVar(&o.pod.FunctionPodTemplateName, "function-pod-template", "", "Configmap that contains a pod specification") + flag.BoolVar(&o.pod.EnablePrivateRegistries, "enable-private-registries", false, "if true enables the use of private registries and their authentication") + flag.StringVar(&o.pod.RegistryAuthSecretPath, "registry-auth-secret-path", "/var/tmp/config-secret/.dockerconfigjson", "The path of the secret used for authenticating to custom registries") + flag.StringVar(&o.pod.RegistryAuthSecretName, "registry-auth-secret-name", "auth-secret", "The name of the secret used for authenticating to custom registries") + flag.BoolVar(&o.pod.EnablePrivateRegistriesTls, "enable-private-registries-tls", false, "if enabled, will prioritize use of user provided TLS secret when accessing registries") + flag.StringVar(&o.pod.TlsSecretPath, "tls-secret-path", "/var/tmp/tls-secret/", "The path of the secret used in tls configuration") + flag.IntVar(&o.pod.MaxGrpcMessageSize, "max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes.") + flag.Parse() - if err := run(); err != nil { + if err := run(o); err != nil { fmt.Fprintf(os.Stderr, "unexpected error: %v\n", err) os.Exit(1) } } -func run() error { - address := fmt.Sprintf(":%d", *port) +func run(o *options) error { + address := fmt.Sprintf(":%d", o.port) lis, err := net.Listen("tcp", address) if err != nil { return fmt.Errorf("failed to listen: %w", err) @@ -75,8 +89,8 @@ func run() error { execRuntime: {}, podRuntime: {}, } - if disableRuntimes != nil { - runtimesFromFlag := strings.Split(*disableRuntimes, ",") + if o.disableRuntimes != "" { + runtimesFromFlag := strings.Split(o.disableRuntimes, ",") for _, rt := range runtimesFromFlag { delete(availableRuntimes, rt) } @@ -85,17 +99,17 @@ func run() error { for rt := range availableRuntimes { switch rt { case execRuntime: - execEval, err := internal.NewExecutableEvaluator(*functions, *config) + execEval, err := internal.NewExecutableEvaluator(o.exec) if err != nil { return fmt.Errorf("failed to initialize executable evaluator: %w", err) } runtimes = append(runtimes, execEval) case podRuntime: - wrapperServerImage := os.Getenv(wrapperServerImageEnv) - if wrapperServerImage == "" { + o.pod.WrapperServerImage = os.Getenv(wrapperServerImageEnv) + if o.pod.WrapperServerImage == "" { return fmt.Errorf("environment variable %v must be set to use pod function evaluator runtime", wrapperServerImageEnv) } - podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName, *enablePrivateRegistries, *registryAuthSecretPath, *registryAuthSecretName, *enablePrivateRegistriesTls, *tlsSecretPath, *maxGrpcMessageSize) + podEval, err := internal.NewPodEvaluator(o.pod) if err != nil { return fmt.Errorf("failed to initialize pod evaluator: %w", err) } @@ -111,8 +125,8 @@ func run() error { // Start the gRPC server server := grpc.NewServer( - grpc.MaxRecvMsgSize(*maxGrpcMessageSize), - grpc.MaxSendMsgSize(*maxGrpcMessageSize), + grpc.MaxRecvMsgSize(o.pod.MaxGrpcMessageSize), + grpc.MaxSendMsgSize(o.pod.MaxGrpcMessageSize), ) pb.RegisterFunctionEvaluatorServer(server, evaluator) healthService := healthchecker.NewHealthChecker() From 294f815ccfdee7b7ef8d9a9b54ff16eb79e0be98 Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Sun, 24 Nov 2024 12:57:22 +0100 Subject: [PATCH 3/6] Add end-to-end test for rendering large packages with pod evaluator (function runner) --- .vscode/launch.json | 16 ++++--- Makefile | 2 +- test/e2e/e2e_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 113 insertions(+), 9 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a72ff53f..980b6f76 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -18,8 +18,8 @@ "--repo-sync-frequency=60s" ], "cwd": "${workspaceFolder}", - "env": { - "CERT_STORAGE_DIR": "${workspaceFolder}/.build/pki/tmp", + "env": { + "CERT_STORAGE_DIR": "${workspaceFolder}/.build/pki/tmp", "WEBHOOK_HOST": "localhost", "GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB": "true" } @@ -31,9 +31,9 @@ "mode": "auto", "program": "${workspaceFolder}/controllers", "cwd": "${workspaceFolder}", - "env": { - "ENABLE_PACKAGEVARIANTS": "true", - "ENABLE_PACKAGEVARIANTSETS": "true" + "env": { + "ENABLE_PACKAGEVARIANTS": "true", + "ENABLE_PACKAGEVARIANTSETS": "true" } }, { @@ -45,9 +45,11 @@ "args": [ "-test.v", "-test.run", - "TestE2E/PorchSuite/TestGitRepositoryWithReleaseTagsAndDirectory" + "TestE2E/PorchSuite/TestPodEvaluatorWithLargeObjects" ], - "env": { "E2E": "1"} + "env": { + "E2E": "1" + } }, { "name": "Launch Func Client", diff --git a/Makefile b/Makefile index c2c4247a..d410908a 100644 --- a/Makefile +++ b/Makefile @@ -362,5 +362,5 @@ test-e2e: ## Run end-to-end tests E2E=1 go test -v -failfast ./test/e2e/cli .PHONY: test-e2e-clean -test-e2e-clean: porchctl ## Run end-to-end tests aginst a newly deployed porch in a newly created kind cluster +test-e2e-clean: porchctl ## Run end-to-end tests against a newly deployed porch in a newly created kind cluster ./scripts/clean-e2e-test.sh diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 34f51ebf..85daeb03 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -2016,7 +2016,7 @@ func (t *PorchSuite) TestPodEvaluator(ctx context.Context) { const ( generateFolderImage = "gcr.io/kpt-fn/generate-folders:v0.1.1" // This function is a TS based function. - setAnnotationsImage = "gcr.io/kpt-fn/set-annotations:v0.1.3" // set-annotations:v0.1.3 is an older version that porch maps it neither to built-in nor exec. + setAnnotationsImage = "gcr.io/kpt-fn/set-annotations:v0.1.3" // set-annotations:v0.1.3 is an older version that porch maps neither to built-in nor exec. ) // Register the repository as 'git-fn' @@ -2217,6 +2217,108 @@ func (t *PorchSuite) TestPodEvaluatorWithFailure(ctx context.Context) { } } +func (t *PorchSuite) TestPodEvaluatorWithLargeObjects(ctx context.Context) { + if t.TestRunnerIsLocal { + t.Skipf("Skipping due to not having pod evaluator in local mode") + } + const ( + setAnnotationsImage = "gcr.io/kpt-fn/set-annotations:v0.1.3" // set-annotations:v0.1.3 is an older version that porch maps neither to built-in nor exec. + ) + + t.RegisterMainGitRepositoryF(ctx, "git-fn-pod-large") + + // Create Package Revision + pr := &porchapi.PackageRevision{ + TypeMeta: metav1.TypeMeta{ + Kind: porchapi.PackageRevisionGVR.Resource, + APIVersion: porchapi.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + }, + Spec: porchapi.PackageRevisionSpec{ + PackageName: "new-package", + WorkspaceName: "workspace", + RepositoryName: "git-fn-pod-large", + Tasks: []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{ + Description: "this is a test", + }, + }, + }, + }, + } + + t.CreateF(ctx, pr) + + var prr porchapi.PackageRevisionResources + t.GetF(ctx, client.ObjectKeyFromObject(pr), &prr) + + testDataSize := 5 * 1024 * 1024 + + // set-annotations:v0.1.3 is an older version that porch maps neither to built-in nor exec runtimes. + prr.Spec.Resources["Kptfile"] += ` +pipeline: + mutators: + - image: ` + setAnnotationsImage + ` + configMap: + test-key: test-val + selectors: + - name: test-data +` + prr.Spec.Resources["largefile.yaml"] = ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-data + labels: + something: somewhere +data: + value: "` + strings.Repeat("a", testDataSize) + `" +` + + t.UpdateF(ctx, &prr) + + rs := prr.Status.RenderStatus + if rs.Err != "" || rs.Result.ExitCode != 0 { + t.Fatalf("Couldn't render large package! exit code: %v,\n%v", rs.Result.ExitCode, rs.Err) + } + + // Get package resources + t.GetF(ctx, client.ObjectKeyFromObject(pr), &prr) + + for name, obj := range prr.Spec.Resources { + if !strings.HasSuffix(name, ".yaml") { + continue + } + node, err := yaml.Parse(obj) + if err != nil { + t.Errorf("failed to parse object: %v", err) + } + switch node.GetName() { + case "kptfile.kpt.dev": + continue + case "test-data": + f := node.Field("data") + if f.IsNilOrEmpty() { + t.Fatalf("couldn't find data field in test-data") + } + long_string, err := f.Value.GetString("value") + if err != nil { + t.Fatalf("couldn't find large string in test-data: %v", err) + } + if len(long_string) != testDataSize { + t.Fatalf("large string size mismatch. want: %v, got: %v", testDataSize, len(long_string)) + } + if node.GetAnnotations()["test-key"] != "test-val" { + t.Errorf("Object (%s %q) should contain annotation `test-key:test-val`, but we got: %v", node.GetKind(), node.GetName(), node.GetAnnotations()) + } + } + } +} + func (t *PorchSuite) TestRepositoryError(ctx context.Context) { const ( repositoryName = "repo-with-error" From c7b73983e4972d1dc35494f3d2ebb58bddc1899f Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Sun, 24 Nov 2024 13:20:41 +0100 Subject: [PATCH 4/6] Explicitly mention --max-request-body-size in the deployment package for better awareness --- deployments/porch/2-function-runner.yaml | 1 + deployments/porch/3-porch-server.yaml | 19 ++++++++++--------- func/server/server.go | 2 +- pkg/cmd/server/start.go | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/deployments/porch/2-function-runner.yaml b/deployments/porch/2-function-runner.yaml index ca384233..8024624c 100644 --- a/deployments/porch/2-function-runner.yaml +++ b/deployments/porch/2-function-runner.yaml @@ -44,6 +44,7 @@ spec: - --config=/config.yaml - --functions=/functions - --pod-namespace=porch-fn-system + - --max-request-body-size=6291456 # Keep this in sync with porch-server's corresponding argument env: - name: WRAPPER_SERVER_IMAGE value: docker.io/nephio/porch-wrapper-server:latest diff --git a/deployments/porch/3-porch-server.yaml b/deployments/porch/3-porch-server.yaml index 65fb9381..b17053e6 100644 --- a/deployments/porch/3-porch-server.yaml +++ b/deployments/porch/3-porch-server.yaml @@ -61,15 +61,15 @@ spec: - name: api-server-certs mountPath: /tmp/certs env: - # Uncomment to enable trace-reporting to jaeger - #- name: OTEL - # value: otel://jaeger-oltp:4317 - - name: OTEL_SERVICE_NAME - value: porch-server - - name: CERT_STORAGE_DIR - value: "/etc/webhook/certs" - - name: GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB - value: "true" + # Uncomment to enable trace-reporting to jaeger + #- name: OTEL + # value: otel://jaeger-oltp:4317 + - name: OTEL_SERVICE_NAME + value: porch-server + - name: CERT_STORAGE_DIR + value: "/etc/webhook/certs" + - name: GOOGLE_API_GO_EXPERIMENTAL_DISABLE_NEW_AUTH_LIB + value: "true" args: - --function-runner=function-runner:9445 - --cache-directory=/cache @@ -77,6 +77,7 @@ spec: - --secure-port=4443 - --repo-sync-frequency=10m - --disable-validating-admissions-policy=true + - --max-request-body-size=6291456 # Keep this in sync with function-runner's corresponding argument --- apiVersion: v1 diff --git a/func/server/server.go b/func/server/server.go index 56513f91..fb936c32 100644 --- a/func/server/server.go +++ b/func/server/server.go @@ -68,7 +68,7 @@ func main() { flag.StringVar(&o.pod.RegistryAuthSecretName, "registry-auth-secret-name", "auth-secret", "The name of the secret used for authenticating to custom registries") flag.BoolVar(&o.pod.EnablePrivateRegistriesTls, "enable-private-registries-tls", false, "if enabled, will prioritize use of user provided TLS secret when accessing registries") flag.StringVar(&o.pod.TlsSecretPath, "tls-secret-path", "/var/tmp/tls-secret/", "The path of the secret used in tls configuration") - flag.IntVar(&o.pod.MaxGrpcMessageSize, "max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes.") + flag.IntVar(&o.pod.MaxGrpcMessageSize, "max-request-body-size", 6*1024*1024, "Maximum size of grpc messages in bytes. Keep this in sync with porch-server's corresponding argument.") flag.Parse() diff --git a/pkg/cmd/server/start.go b/pkg/cmd/server/start.go index 01b98275..a7588adb 100644 --- a/pkg/cmd/server/start.go +++ b/pkg/cmd/server/start.go @@ -246,7 +246,7 @@ func (o *PorchServerOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.FunctionRunnerAddress, "function-runner", "", "Address of the function runner gRPC service.") fs.StringVar(&o.DefaultImagePrefix, "default-image-prefix", "gcr.io/kpt-fn/", "Default prefix for unqualified function names") fs.StringVar(&o.CacheDirectory, "cache-directory", "", "Directory where Porch server stores repository and package caches.") - fs.IntVar(&o.MaxRequestBodySize, "max-request-body-size", 6*1024*1024, "Maximum size of the request body in bytes.") + fs.IntVar(&o.MaxRequestBodySize, "max-request-body-size", 6*1024*1024, "Maximum size of the request body in bytes. Keep this in sync with function-runner's corresponding argument.") fs.BoolVar(&o.UseGitCaBundle, "use-git-cabundle", false, "Determine whether to use a user-defined CaBundle for TLS towards git.") fs.BoolVar(&o.DisableValidatingAdmissionPolicy, "disable-validating-admissions-policy", true, "Determine whether to (dis|en)able the Validating Admission Policy, which requires k8s version >= v1.30") fs.DurationVar(&o.RepoSyncFrequency, "repo-sync-frequency", 10*time.Minute, "Frequency in seconds at which registered repositories will be synced and the background job repository refresh runs.") From 60b3fd15f6c309c09964b7841ec1d62657cfa872 Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Sun, 24 Nov 2024 13:43:35 +0100 Subject: [PATCH 5/6] Minor fix in e2e test "TestPodEvaluatorWithLargeObjects" --- test/e2e/e2e_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 85daeb03..afe5798b 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -2297,10 +2297,7 @@ data: if err != nil { t.Errorf("failed to parse object: %v", err) } - switch node.GetName() { - case "kptfile.kpt.dev": - continue - case "test-data": + if node.GetName() == "test-data" { f := node.Field("data") if f.IsNilOrEmpty() { t.Fatalf("couldn't find data field in test-data") From 4b77ee0bf844f4978b88b90f044c13d54d39efac Mon Sep 17 00:00:00 2001 From: Istvan Kispal Date: Mon, 25 Nov 2024 13:42:17 +0100 Subject: [PATCH 6/6] Merge TestLargePackageRevision and TestPodEvaluatorWithLargeObjects test cases --- test/e2e/e2e_test.go | 57 +++++++------------------------------------- 1 file changed, 8 insertions(+), 49 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index afe5798b..d2f45d22 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -2217,12 +2217,10 @@ func (t *PorchSuite) TestPodEvaluatorWithFailure(ctx context.Context) { } } -func (t *PorchSuite) TestPodEvaluatorWithLargeObjects(ctx context.Context) { - if t.TestRunnerIsLocal { - t.Skipf("Skipping due to not having pod evaluator in local mode") - } +func (t *PorchSuite) TestLargePackageRevision(ctx context.Context) { const ( setAnnotationsImage = "gcr.io/kpt-fn/set-annotations:v0.1.3" // set-annotations:v0.1.3 is an older version that porch maps neither to built-in nor exec. + testDataSize = 5 * 1024 * 1024 ) t.RegisterMainGitRepositoryF(ctx, "git-fn-pod-large") @@ -2256,10 +2254,9 @@ func (t *PorchSuite) TestPodEvaluatorWithLargeObjects(ctx context.Context) { var prr porchapi.PackageRevisionResources t.GetF(ctx, client.ObjectKeyFromObject(pr), &prr) - testDataSize := 5 * 1024 * 1024 - - // set-annotations:v0.1.3 is an older version that porch maps neither to built-in nor exec runtimes. - prr.Spec.Resources["Kptfile"] += ` + if !t.TestRunnerIsLocal { + // pod evaluator is not available in local test mode, skip testing it + prr.Spec.Resources["Kptfile"] += ` pipeline: mutators: - image: ` + setAnnotationsImage + ` @@ -2268,6 +2265,8 @@ pipeline: selectors: - name: test-data ` + } + prr.Spec.Resources["largefile.yaml"] = ` apiVersion: v1 kind: ConfigMap @@ -2309,7 +2308,7 @@ data: if len(long_string) != testDataSize { t.Fatalf("large string size mismatch. want: %v, got: %v", testDataSize, len(long_string)) } - if node.GetAnnotations()["test-key"] != "test-val" { + if !t.TestRunnerIsLocal && (node.GetAnnotations()["test-key"] != "test-val") { t.Errorf("Object (%s %q) should contain annotation `test-key:test-val`, but we got: %v", node.GetKind(), node.GetName(), node.GetAnnotations()) } } @@ -2969,43 +2968,3 @@ func (t *PorchSuite) TestPackageRevisionFieldSelectors(ctx context.Context) { } } } - -func (t *PorchSuite) TestLargePackageRevision(ctx context.Context) { - const ( - repository = "large-pkg-rev" - ) - - t.RegisterMainGitRepositoryF(ctx, repository) - pr := porchapi.PackageRevision{ - TypeMeta: metav1.TypeMeta{ - Kind: porchapi.PackageRevisionGVR.Resource, - APIVersion: porchapi.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: t.Namespace, - }, - Spec: porchapi.PackageRevisionSpec{ - PackageName: "new-package", - WorkspaceName: "workspace", - RepositoryName: repository, - Tasks: []porchapi.Task{ - { - Type: porchapi.TaskTypeInit, - Init: &porchapi.PackageInitTaskSpec{ - Description: "this is a test", - }, - }, - }, - }, - } - - t.CreateE(ctx, &pr) - - var prr porchapi.PackageRevisionResources - - t.GetE(ctx, client.ObjectKey{Name: pr.Name, Namespace: pr.Namespace}, &prr) - - prr.Spec.Resources["largefile.txt"] = strings.Repeat("a", 4*1024*1024) - - t.UpdateE(ctx, &prr) -}