Skip to content

Commit

Permalink
cloud output: fix for indefinite run of FinishJobs step
Browse files Browse the repository at this point in the history
No matter the status of the pods, FinishJobs must be able to a) finalize the cloud
output test and b) proceed to the next stage of controller.
  • Loading branch information
yorugac committed May 13, 2022
1 parent 515daa7 commit 0a72b7c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 34 deletions.
84 changes: 54 additions & 30 deletions controllers/k6_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/types"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -18,46 +20,68 @@ import (
func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {
log.Info("Waiting for pods to finish")

selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
"runner": "true",
})
// Here we assume that the test runs for some time and there is no need to
// check it more often than twice in a minute.
//
// The total timeout for the test is set to duration of the test + 2 min.
// These 2 min are meant to cover the time needed to start the pods: sometimes
// pods are ready a bit later than operator reaches this stage so from the
// viewpoint of operator it takes longer. This behaviour depends on the setup of
// cluster. 2 min are meant to be a sufficient safeguard for such cases.

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace}
jl := &batchv1.JobList{}
testDuration := inspectOutput.TotalDuration.TimeDuration()

if err := r.List(ctx, jl, opts); err != nil {
log.Error(err, "Could not list jobs")
return ctrl.Result{}, err
}
err := wait.PollImmediate(time.Second*30, testDuration+time.Minute*2, func() (done bool, err error) {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
"runner": "true",
})

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace}
jl := &batchv1.JobList{}

//TODO: We should distinguish between Suceeded/Failed/Unknown
var finished int32
for _, job := range jl.Items {
if job.Status.Active != 0 {
continue
if err := r.List(ctx, jl, opts); err != nil {
log.Error(err, "Could not list jobs")
return false, nil
}

// TODO: We should distinguish between Suceeded/Failed/Unknown
var finished int32
for _, job := range jl.Items {
if job.Status.Active != 0 {
continue
}
finished++
}
finished++
}

log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism))
log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism))

if finished >= k6.Spec.Parallelism {
k6.Status.Stage = "finished"
if err := r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
return ctrl.Result{}, err
if finished >= k6.Spec.Parallelism {
return true, nil
}

if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut {
if err := cloud.FinishTestRun(testRunId); err != nil {
log.Error(err, "Could not finish test run with cloud output")
return ctrl.Result{}, err
}
return false, nil
})

if err != nil {
log.Error(err, "Waiting for pods to finish ended with error")
}

// If this is a test run with cloud output, try to finalize it regardless.
if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut {
if err = cloud.FinishTestRun(testRunId); err != nil {
log.Error(err, "Could not finalize the test run with cloud output")
} else {
log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", testRunId))
}
}

log.Info(fmt.Sprintf("Cloud test run %s was finished", testRunId))
k6.Status.Stage = "finished"
if err = r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}
7 changes: 3 additions & 4 deletions controllers/k6_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
// k6 Cloud related vars
// Right now operator works with one test at a time so these should be safe.
var (
testRunId string
token string
testRunId string
token string
inspectOutput cloud.InspectOutput
)

// InitializeJobs creates jobs that will run initial checks for distributed test if any are necessary
Expand Down Expand Up @@ -153,8 +154,6 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6
return false, err
}

var inspectOutput cloud.InspectOutput

if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil {
// this shouldn't normally happen but if it does, let's log output by default
log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String()))
Expand Down

0 comments on commit 0a72b7c

Please sign in to comment.