Skip to content

Commit

Permalink
Prefer autoscaling based on jobs rather than workflows if available (#…
Browse files Browse the repository at this point in the history
…114)

Adds the ability to autoscale on jobs in addition to workflows. We fall back to using workflow metrics if job details are not present.

Resolves #89
  • Loading branch information
dlobue authored Oct 8, 2020
1 parent 1bc6809 commit a638600
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 11 deletions.
43 changes: 38 additions & 5 deletions controllers/autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"errors"
"fmt"
"github.com/summerwind/actions-runner-controller/api/v1alpha1"
"strings"

"github.com/summerwind/actions-runner-controller/api/v1alpha1"
)

func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
Expand Down Expand Up @@ -44,6 +45,38 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp
}

var total, inProgress, queued, completed, unknown int
type callback func()
listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
if runID == 0 {
fallback_cb()
return
}
jobs, _, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, nil)
if err != nil {
r.Log.Error(err, "Error listing workflow jobs")
fallback_cb()
} else if len(jobs.Jobs) == 0 {
fallback_cb()
} else {
for _, job := range jobs.Jobs {
switch job.GetStatus() {

This comment has been minimized.

Copy link
@tuomoa

tuomoa Feb 28, 2022

@mumoshu I think we could filter out jobs that do not have self-hosted label here? WDYT? Also I'm not sure why we need the fallback_cb when there are zero jobs (which increases queued or in_progress counters). If there is a workflow which does not have jobs, there are no jobs that the runner which is scaled up could run?

This comment has been minimized.

Copy link
@mumoshu

mumoshu Feb 28, 2022

Collaborator

I have not touched code around this for a year or so. Please let me give some time to recall what we're doing here

This comment has been minimized.

Copy link
@tuomoa

tuomoa Feb 28, 2022

Yeah no worries take your time. My reasoning here was just that jobs meant for managed runners are now increasing calculation of required replicas (because we don't check labels here). And also the fallback logic looked a bit odd to me.

This comment has been minimized.

Copy link
@tuomoa

tuomoa Mar 22, 2022

@mumoshu did you have some time to think about this? We are currently using the pull based scaling and the jobs running in managed runners are now causing our self-hosted runners to scale up. :)

case "completed":
// We add a case for `completed` so it is not counted in `unknown`.
// And we do not increment the counter for completed because
// that counter only refers to workflows. The reason for
// this is because we do not get a list of jobs for
// completed workflows in order to keep the number of API
// calls to a minimum.
case "in_progress":
inProgress++
case "queued":
queued++
default:
unknown++
}
}
}
}

for _, repo := range repos {
user, repoName := repo[0], repo[1]
Expand All @@ -52,20 +85,20 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp
return nil, err
}

for _, r := range list.WorkflowRuns {
for _, run := range list.WorkflowRuns {
total++

// In May 2020, there are only 3 statuses.
// Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run
switch r.GetStatus() {
switch run.GetStatus() {
case "completed":
completed++
case "in_progress":
inProgress++
listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
case "queued":
queued++
listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
default:
unknown++
}
Expand Down
45 changes: 40 additions & 5 deletions controllers/autoscaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package controllers

import (
"fmt"
"net/http/httptest"
"net/url"
"testing"

"github.com/summerwind/actions-runner-controller/api/v1alpha1"
"github.com/summerwind/actions-runner-controller/github"
"github.com/summerwind/actions-runner-controller/github/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"net/http/httptest"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"testing"
)

func newGithubClient(server *httptest.Server) *github.Client {
Expand Down Expand Up @@ -44,9 +45,11 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
sReplicas *int
sTime *metav1.Time
workflowRuns string
workflowJobs map[int]string
want int
err string
}{
// Legacy functionality
// 3 demanded, max at 3
{
repo: "test/valid",
Expand Down Expand Up @@ -122,6 +125,21 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"in_progress"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 3,
},

// Job-level autoscaling
// 5 requested from 3 workflows
{
repo: "test/valid",
min: intPtr(2),
max: intPtr(10),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`,
workflowJobs: map[int]string{
1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`,
2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`,
3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`,
},
want: 5,
},
}

for i := range testcases {
Expand All @@ -136,7 +154,7 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
_ = v1alpha1.AddToScheme(scheme)

t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns))
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs))
defer server.Close()
client := newGithubClient(server)

Expand Down Expand Up @@ -211,6 +229,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
sReplicas *int
sTime *metav1.Time
workflowRuns string
workflowJobs map[int]string
want int
err string
}{
Expand Down Expand Up @@ -316,6 +335,22 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
err: "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment",
},

// Job-level autoscaling
// 5 requested from 3 workflows
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(10),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`,
workflowJobs: map[int]string{
1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`,
2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`,
3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`,
},
want: 5,
},
}

for i := range testcases {
Expand All @@ -330,7 +365,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
_ = v1alpha1.AddToScheme(scheme)

t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns))
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs))
defer server.Close()
client := newGithubClient(server)

Expand Down
26 changes: 25 additions & 1 deletion github/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"time"
"unicode"
)

const (
Expand All @@ -31,6 +34,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, h.Body)
}

type MapHandler struct {
Status int
Bodies map[int]string
}

func (h *MapHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Parse out int key from URL path
key, err := strconv.Atoi(strings.TrimFunc(req.URL.Path, func(r rune) bool { return !unicode.IsNumber(r) }))
if err != nil {
w.WriteHeader(400)
} else if body := h.Bodies[key]; len(body) == 0 {
w.WriteHeader(404)
} else {
w.WriteHeader(h.Status)
fmt.Fprintf(w, body)
}
}

type ServerConfig struct {
*FixedResponses
}
Expand All @@ -45,7 +66,7 @@ func NewServer(opts ...Option) *httptest.Server {
o(&config)
}

routes := map[string]*Handler{
routes := map[string]http.Handler{
// For CreateRegistrationToken
"/repos/test/valid/actions/runners/registration-token": &Handler{
Status: http.StatusCreated,
Expand Down Expand Up @@ -126,6 +147,9 @@ func NewServer(opts ...Option) *httptest.Server {

// For auto-scaling based on the number of queued(pending) workflow runs
"/repos/test/valid/actions/runs": config.FixedResponses.ListRepositoryWorkflowRuns,

// For auto-scaling based on the number of queued(pending) workflow jobs
"/repos/test/valid/actions/runs/": config.FixedResponses.ListWorkflowJobs,
}

mux := http.NewServeMux()
Expand Down
10 changes: 10 additions & 0 deletions github/fake/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fake

type FixedResponses struct {
ListRepositoryWorkflowRuns *Handler
ListWorkflowJobs *MapHandler
}

type Option func(*ServerConfig)
Expand All @@ -15,6 +16,15 @@ func WithListRepositoryWorkflowRunsResponse(status int, body string) Option {
}
}

func WithListWorkflowJobsResponse(status int, bodies map[int]string) Option {
return func(c *ServerConfig) {
c.FixedResponses.ListWorkflowJobs = &MapHandler{
Status: status,
Bodies: bodies,
}
}
}

func WithFixedResponses(responses *FixedResponses) Option {
return func(c *ServerConfig) {
c.FixedResponses = responses
Expand Down

0 comments on commit a638600

Please sign in to comment.