From 0a72b7cab3d31fcb2f3ddbcea586519c2c6dc412 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Fri, 29 Apr 2022 18:51:32 +0300 Subject: [PATCH] cloud output: fix for indefinite run of FinishJobs step No matter the status of the pods, FinishJobs must be able to a) finalize the cloud output test and b) proceed to the next stage of controller. --- controllers/k6_finish.go | 84 +++++++++++++++++++++++------------- controllers/k6_initialize.go | 7 ++- 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/controllers/k6_finish.go b/controllers/k6_finish.go index 4378405e..44671e18 100644 --- a/controllers/k6_finish.go +++ b/controllers/k6_finish.go @@ -3,6 +3,7 @@ package controllers import ( "context" "fmt" + "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" @@ -10,6 +11,7 @@ import ( "github.com/grafana/k6-operator/pkg/types" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -18,46 +20,68 @@ import ( func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { log.Info("Waiting for pods to finish") - selector := labels.SelectorFromSet(map[string]string{ - "app": "k6", - "k6_cr": k6.Name, - "runner": "true", - }) + // Here we assume that the test runs for some time and there is no need to + // check it more often than twice in a minute. + // + // The total timeout for the test is set to duration of the test + 2 min. + // These 2 min are meant to cover the time needed to start the pods: sometimes + // pods are ready a bit later than operator reaches this stage so from the + // viewpoint of operator it takes longer. This behaviour depends on the setup of + // cluster. 2 min are meant to be a sufficient safeguard for such cases. - opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} - jl := &batchv1.JobList{} + testDuration := inspectOutput.TotalDuration.TimeDuration() - if err := r.List(ctx, jl, opts); err != nil { - log.Error(err, "Could not list jobs") - return ctrl.Result{}, err - } + err := wait.PollImmediate(time.Second*30, testDuration+time.Minute*2, func() (done bool, err error) { + selector := labels.SelectorFromSet(map[string]string{ + "app": "k6", + "k6_cr": k6.Name, + "runner": "true", + }) + + opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} + jl := &batchv1.JobList{} - //TODO: We should distinguish between Suceeded/Failed/Unknown - var finished int32 - for _, job := range jl.Items { - if job.Status.Active != 0 { - continue + if err := r.List(ctx, jl, opts); err != nil { + log.Error(err, "Could not list jobs") + return false, nil + } + + // TODO: We should distinguish between Suceeded/Failed/Unknown + var finished int32 + for _, job := range jl.Items { + if job.Status.Active != 0 { + continue + } + finished++ } - finished++ - } - log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism)) + log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism)) - if finished >= k6.Spec.Parallelism { - k6.Status.Stage = "finished" - if err := r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return ctrl.Result{}, err + if finished >= k6.Spec.Parallelism { + return true, nil } - if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { - if err := cloud.FinishTestRun(testRunId); err != nil { - log.Error(err, "Could not finish test run with cloud output") - return ctrl.Result{}, err - } + return false, nil + }) + + if err != nil { + log.Error(err, "Waiting for pods to finish ended with error") + } + + // If this is a test run with cloud output, try to finalize it regardless. + if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { + if err = cloud.FinishTestRun(testRunId); err != nil { + log.Error(err, "Could not finalize the test run with cloud output") + } else { + log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", testRunId)) } + } - log.Info(fmt.Sprintf("Cloud test run %s was finished", testRunId)) + k6.Status.Stage = "finished" + if err = r.Client.Status().Update(ctx, k6); err != nil { + log.Error(err, "Could not update status of custom resource") + return ctrl.Result{}, err } + return ctrl.Result{}, nil } diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index 6684cf00..5d0513d0 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -26,8 +26,9 @@ import ( // k6 Cloud related vars // Right now operator works with one test at a time so these should be safe. var ( - testRunId string - token string + testRunId string + token string + inspectOutput cloud.InspectOutput ) // InitializeJobs creates jobs that will run initial checks for distributed test if any are necessary @@ -153,8 +154,6 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 return false, err } - var inspectOutput cloud.InspectOutput - if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil { // this shouldn't normally happen but if it does, let's log output by default log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String()))