diff --git a/controllers/k6_finish.go b/controllers/k6_finish.go index 4378405e..44671e18 100644 --- a/controllers/k6_finish.go +++ b/controllers/k6_finish.go @@ -3,6 +3,7 @@ package controllers import ( "context" "fmt" + "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" @@ -10,6 +11,7 @@ import ( "github.com/grafana/k6-operator/pkg/types" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -18,46 +20,68 @@ import ( func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { log.Info("Waiting for pods to finish") - selector := labels.SelectorFromSet(map[string]string{ - "app": "k6", - "k6_cr": k6.Name, - "runner": "true", - }) + // Here we assume that the test runs for some time and there is no need to + // check it more often than twice in a minute. + // + // The total timeout for the test is set to duration of the test + 2 min. + // These 2 min are meant to cover the time needed to start the pods: sometimes + // pods are ready a bit later than operator reaches this stage so from the + // viewpoint of operator it takes longer. This behaviour depends on the setup of + // cluster. 2 min are meant to be a sufficient safeguard for such cases. - opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} - jl := &batchv1.JobList{} + testDuration := inspectOutput.TotalDuration.TimeDuration() - if err := r.List(ctx, jl, opts); err != nil { - log.Error(err, "Could not list jobs") - return ctrl.Result{}, err - } + err := wait.PollImmediate(time.Second*30, testDuration+time.Minute*2, func() (done bool, err error) { + selector := labels.SelectorFromSet(map[string]string{ + "app": "k6", + "k6_cr": k6.Name, + "runner": "true", + }) + + opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} + jl := &batchv1.JobList{} - //TODO: We should distinguish between Suceeded/Failed/Unknown - var finished int32 - for _, job := range jl.Items { - if job.Status.Active != 0 { - continue + if err := r.List(ctx, jl, opts); err != nil { + log.Error(err, "Could not list jobs") + return false, nil + } + + // TODO: We should distinguish between Suceeded/Failed/Unknown + var finished int32 + for _, job := range jl.Items { + if job.Status.Active != 0 { + continue + } + finished++ } - finished++ - } - log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism)) + log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism)) - if finished >= k6.Spec.Parallelism { - k6.Status.Stage = "finished" - if err := r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return ctrl.Result{}, err + if finished >= k6.Spec.Parallelism { + return true, nil } - if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { - if err := cloud.FinishTestRun(testRunId); err != nil { - log.Error(err, "Could not finish test run with cloud output") - return ctrl.Result{}, err - } + return false, nil + }) + + if err != nil { + log.Error(err, "Waiting for pods to finish ended with error") + } + + // If this is a test run with cloud output, try to finalize it regardless. + if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { + if err = cloud.FinishTestRun(testRunId); err != nil { + log.Error(err, "Could not finalize the test run with cloud output") + } else { + log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", testRunId)) } + } - log.Info(fmt.Sprintf("Cloud test run %s was finished", testRunId)) + k6.Status.Stage = "finished" + if err = r.Client.Status().Update(ctx, k6); err != nil { + log.Error(err, "Could not update status of custom resource") + return ctrl.Result{}, err } + return ctrl.Result{}, nil } diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index 6684cf00..5d0513d0 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -26,8 +26,9 @@ import ( // k6 Cloud related vars // Right now operator works with one test at a time so these should be safe. var ( - testRunId string - token string + testRunId string + token string + inspectOutput cloud.InspectOutput ) // InitializeJobs creates jobs that will run initial checks for distributed test if any are necessary @@ -153,8 +154,6 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 return false, err } - var inspectOutput cloud.InspectOutput - if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil { // this shouldn't normally happen but if it does, let's log output by default log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String()))