Skip to content

Commit

Permalink
fix: execute inspect step for all test runs to have timeouts always
Browse files Browse the repository at this point in the history
  • Loading branch information
yorugac committed Jun 3, 2022
1 parent 6256831 commit a7c4a3f
Showing 1 changed file with 118 additions and 115 deletions.
233 changes: 118 additions & 115 deletions controllers/k6_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,118 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6
return
}

if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut {
cli := types.ParseCLI(&k6.Spec)

var initializer *batchv1.Job
if initializer, err = jobs.NewInitializerJob(k6, cli.ArchiveArgs); err != nil {
return res, err
}

log.Info(fmt.Sprintf("Initializer job is ready to start with image `%s` and command `%s`",
initializer.Spec.Template.Spec.Containers[0].Image, initializer.Spec.Template.Spec.Containers[0].Command))

if err = ctrl.SetControllerReference(k6, initializer, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for the initialize job")
return
}

if err = r.Create(ctx, initializer); err != nil {
log.Error(err, "Failed to launch k6 test initializer")
return
}
err = wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) {
var (
listOpts = &client.ListOptions{
Namespace: k6.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
"job-name": fmt.Sprintf("%s-initializer", k6.Name),
}),
}
podList = &corev1.PodList{}
)
if err := r.List(ctx, podList, listOpts); err != nil {
log.Error(err, "Could not list pods")
return false, err
}
if len(podList.Items) < 1 {
log.Info("No initializing pod found yet")
return false, nil
}

// there should be only 1 initializer pod
if podList.Items[0].Status.Phase != "Succeeded" {
log.Info("Waiting for initializing pod to finish")
return false, nil
}

// Here we need to get the output of the pod
// pods/log is not currently supported by controller-runtime client and it is officially
// recommended to use REST client instead:
// https://github.com/kubernetes-sigs/controller-runtime/issues/1229

var opts corev1.PodLogOptions
config, err := rest.InClusterConfig()
if err != nil {
log.Error(err, "unable to fetch in-cluster REST config")
// don't return here
return false, nil
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "unable to get access to clientset")
// don't return here
return false, nil
}
req := clientset.CoreV1().Pods(k6.Namespace).GetLogs(podList.Items[0].Name, &opts)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

podLogs, err := req.Stream(ctx)
if err != nil {
log.Error(err, "unable to stream logs from the pod")
// don't return here
return false, nil
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
log.Error(err, "unable to copy logs from the pod")
return false, err
}

if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil {
// this shouldn't normally happen but if it does, let's log output by default
log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String()))
return true, err
}

log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput))

if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism {
err = fmt.Errorf("number of instances > number of VUs")
// TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution?
// But logr doesn't seem to have warning level by default, only with V() method...
// It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84
log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script",
"maxVUs", inspectOutput.MaxVUs,
"parallelism", k6.Spec.Parallelism)
return false, err
}

return true, nil
})

if err != nil {
log.Error(err, "Failed to initialize the script")
return
}

if cli.HasCloudOut {
var (
secrets corev1.SecretList
secretOpts = &client.ListOptions{
Expand Down Expand Up @@ -73,122 +184,14 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6
}
log.Info("Token for k6 Cloud was loaded.")

var initializer *batchv1.Job
if initializer, err = jobs.NewInitializerJob(k6, cli.ArchiveArgs); err != nil {
return res, err
}

log.Info(fmt.Sprintf("Initializer job is ready to start with image `%s` and command `%s`",
initializer.Spec.Template.Spec.Containers[0].Image, initializer.Spec.Template.Spec.Containers[0].Command))

if err = ctrl.SetControllerReference(k6, initializer, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for the initialize job")
return
}
host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST")

if err = r.Create(ctx, initializer); err != nil {
log.Error(err, "Failed to launch k6 test initializer")
return
if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil {
return res, err
} else {
testRunId = refID
log.Info(fmt.Sprintf("Created cloud test run: %s", testRunId))
}
err = wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) {
var (
listOpts = &client.ListOptions{
Namespace: k6.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
"job-name": fmt.Sprintf("%s-initializer", k6.Name),
}),
}
podList = &corev1.PodList{}
)
if err := r.List(ctx, podList, listOpts); err != nil {
log.Error(err, "Could not list pods")
return false, err
}
if len(podList.Items) < 1 {
log.Info("No initializing pod found yet")
return false, nil
}

// there should be only 1 initializer pod
if podList.Items[0].Status.Phase != "Succeeded" {
log.Info("Waiting for initializing pod to finish")
return false, nil
}

// Here we need to get the output of the pod
// pods/log is not currently supported by controller-runtime client and it is officially
// recommended to use REST client instead:
// https://github.com/kubernetes-sigs/controller-runtime/issues/1229

var opts corev1.PodLogOptions
config, err := rest.InClusterConfig()
if err != nil {
log.Error(err, "unable to fetch in-cluster REST config")
// don't return here
return false, nil
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "unable to get access to clientset")
// don't return here
return false, nil
}
req := clientset.CoreV1().Pods(k6.Namespace).GetLogs(podList.Items[0].Name, &opts)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

podLogs, err := req.Stream(ctx)
if err != nil {
log.Error(err, "unable to stream logs from the pod")
// don't return here
return false, nil
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
log.Error(err, "unable to copy logs from the pod")
return false, err
}

if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil {
// this shouldn't normally happen but if it does, let's log output by default
log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String()))
return true, err
}

log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput))

if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism {
err = fmt.Errorf("number of instances > number of VUs")
// TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution?
// But logr doesn't seem to have warning level by default, only with V() method...
// It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84
log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script",
"maxVUs", inspectOutput.MaxVUs,
"parallelism", k6.Spec.Parallelism)
return false, err
}

host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST")

if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil {
return true, err
} else {
testRunId = refID
log.Info(fmt.Sprintf("Created cloud test run: %s", testRunId))
return true, nil
}
})
}

if err != nil {
log.Error(err, "Failed to initialize script with cloud output")
return
}

log.Info("Changing stage of K6 status to initialized")
Expand Down

0 comments on commit a7c4a3f

Please sign in to comment.