From a7c4a3f2ea7341f2ffaa9c28af70f82ebaf53c84 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Fri, 3 Jun 2022 17:02:59 +0300 Subject: [PATCH] fix: execute inspect step for all test runs to have timeouts always --- controllers/k6_initialize.go | 233 ++++++++++++++++++----------------- 1 file changed, 118 insertions(+), 115 deletions(-) diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index 8d804a12..34a9dc6a 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -42,7 +42,118 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 return } - if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { + cli := types.ParseCLI(&k6.Spec) + + var initializer *batchv1.Job + if initializer, err = jobs.NewInitializerJob(k6, cli.ArchiveArgs); err != nil { + return res, err + } + + log.Info(fmt.Sprintf("Initializer job is ready to start with image `%s` and command `%s`", + initializer.Spec.Template.Spec.Containers[0].Image, initializer.Spec.Template.Spec.Containers[0].Command)) + + if err = ctrl.SetControllerReference(k6, initializer, r.Scheme); err != nil { + log.Error(err, "Failed to set controller reference for the initialize job") + return + } + + if err = r.Create(ctx, initializer); err != nil { + log.Error(err, "Failed to launch k6 test initializer") + return + } + err = wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) { + var ( + listOpts = &client.ListOptions{ + Namespace: k6.Namespace, + LabelSelector: labels.SelectorFromSet(map[string]string{ + "app": "k6", + "k6_cr": k6.Name, + "job-name": fmt.Sprintf("%s-initializer", k6.Name), + }), + } + podList = &corev1.PodList{} + ) + if err := r.List(ctx, podList, listOpts); err != nil { + log.Error(err, "Could not list pods") + return false, err + } + if len(podList.Items) < 1 { + log.Info("No initializing pod found yet") + return false, nil + } + + // there should be only 1 initializer pod + if podList.Items[0].Status.Phase != "Succeeded" { + log.Info("Waiting for initializing pod to finish") + return false, nil + } + + // Here we need to get the output of the pod + // pods/log is not currently supported by controller-runtime client and it is officially + // recommended to use REST client instead: + // https://github.com/kubernetes-sigs/controller-runtime/issues/1229 + + var opts corev1.PodLogOptions + config, err := rest.InClusterConfig() + if err != nil { + log.Error(err, "unable to fetch in-cluster REST config") + // don't return here + return false, nil + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error(err, "unable to get access to clientset") + // don't return here + return false, nil + } + req := clientset.CoreV1().Pods(k6.Namespace).GetLogs(podList.Items[0].Name, &opts) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + defer cancel() + + podLogs, err := req.Stream(ctx) + if err != nil { + log.Error(err, "unable to stream logs from the pod") + // don't return here + return false, nil + } + defer podLogs.Close() + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + log.Error(err, "unable to copy logs from the pod") + return false, err + } + + if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil { + // this shouldn't normally happen but if it does, let's log output by default + log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String())) + return true, err + } + + log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput)) + + if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism { + err = fmt.Errorf("number of instances > number of VUs") + // TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution? + // But logr doesn't seem to have warning level by default, only with V() method... + // It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84 + log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script", + "maxVUs", inspectOutput.MaxVUs, + "parallelism", k6.Spec.Parallelism) + return false, err + } + + return true, nil + }) + + if err != nil { + log.Error(err, "Failed to initialize the script") + return + } + + if cli.HasCloudOut { var ( secrets corev1.SecretList secretOpts = &client.ListOptions{ @@ -73,122 +184,14 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 } log.Info("Token for k6 Cloud was loaded.") - var initializer *batchv1.Job - if initializer, err = jobs.NewInitializerJob(k6, cli.ArchiveArgs); err != nil { - return res, err - } - - log.Info(fmt.Sprintf("Initializer job is ready to start with image `%s` and command `%s`", - initializer.Spec.Template.Spec.Containers[0].Image, initializer.Spec.Template.Spec.Containers[0].Command)) - - if err = ctrl.SetControllerReference(k6, initializer, r.Scheme); err != nil { - log.Error(err, "Failed to set controller reference for the initialize job") - return - } + host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST") - if err = r.Create(ctx, initializer); err != nil { - log.Error(err, "Failed to launch k6 test initializer") - return + if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil { + return res, err + } else { + testRunId = refID + log.Info(fmt.Sprintf("Created cloud test run: %s", testRunId)) } - err = wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) { - var ( - listOpts = &client.ListOptions{ - Namespace: k6.Namespace, - LabelSelector: labels.SelectorFromSet(map[string]string{ - "app": "k6", - "k6_cr": k6.Name, - "job-name": fmt.Sprintf("%s-initializer", k6.Name), - }), - } - podList = &corev1.PodList{} - ) - if err := r.List(ctx, podList, listOpts); err != nil { - log.Error(err, "Could not list pods") - return false, err - } - if len(podList.Items) < 1 { - log.Info("No initializing pod found yet") - return false, nil - } - - // there should be only 1 initializer pod - if podList.Items[0].Status.Phase != "Succeeded" { - log.Info("Waiting for initializing pod to finish") - return false, nil - } - - // Here we need to get the output of the pod - // pods/log is not currently supported by controller-runtime client and it is officially - // recommended to use REST client instead: - // https://github.com/kubernetes-sigs/controller-runtime/issues/1229 - - var opts corev1.PodLogOptions - config, err := rest.InClusterConfig() - if err != nil { - log.Error(err, "unable to fetch in-cluster REST config") - // don't return here - return false, nil - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error(err, "unable to get access to clientset") - // don't return here - return false, nil - } - req := clientset.CoreV1().Pods(k6.Namespace).GetLogs(podList.Items[0].Name, &opts) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - defer cancel() - - podLogs, err := req.Stream(ctx) - if err != nil { - log.Error(err, "unable to stream logs from the pod") - // don't return here - return false, nil - } - defer podLogs.Close() - - buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) - if err != nil { - log.Error(err, "unable to copy logs from the pod") - return false, err - } - - if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil { - // this shouldn't normally happen but if it does, let's log output by default - log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String())) - return true, err - } - - log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput)) - - if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism { - err = fmt.Errorf("number of instances > number of VUs") - // TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution? - // But logr doesn't seem to have warning level by default, only with V() method... - // It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84 - log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script", - "maxVUs", inspectOutput.MaxVUs, - "parallelism", k6.Spec.Parallelism) - return false, err - } - - host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST") - - if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil { - return true, err - } else { - testRunId = refID - log.Info(fmt.Sprintf("Created cloud test run: %s", testRunId)) - return true, nil - } - }) - } - - if err != nil { - log.Error(err, "Failed to initialize script with cloud output") - return } log.Info("Changing stage of K6 status to initialized")