Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 7.61.x] Performance improvements of External Metrics controller and allow multiple workers #31737

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/secrethelper/secret_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
)

// NewKubeClient returns a new kubernetes.Interface
type NewKubeClient func(timeout time.Duration) (kubernetes.Interface, error)
type NewKubeClient func(timeout time.Duration, qps float32, burst int) (kubernetes.Interface, error)

// cliParams are the command-line arguments for this subcommand
type cliParams struct {
Expand Down Expand Up @@ -175,7 +175,7 @@ func readSecretsUsingPrefixes(secretsList []string, rootPath string, newKubeClie
case filePrefix:
res[secretID] = providers.ReadSecretFile(id)
case k8sSecretPrefix:
kubeClient, err := newKubeClientFunc(10 * time.Second)
kubeClient, err := newKubeClientFunc(10*time.Second, 0, 0) // Default QPS and burst to Kube client defaults using 0
if err != nil {
res[secretID] = secrets.SecretVal{Value: "", ErrorMsg: err.Error()}
} else {
Expand Down
2 changes: 1 addition & 1 deletion cmd/secrethelper/secret_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestReadSecrets(t *testing.T) {
newKubeClientFunc := func(_ time.Duration) (kubernetes.Interface, error) {
newKubeClientFunc := func(_ time.Duration, _ float32, _ int) (kubernetes.Interface, error) {
return fake.NewSimpleClientset(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "some_name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"

datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
Expand All @@ -33,6 +33,15 @@ const (
ddmControllerStoreID string = "ddmc"
)

type controllerOperation string

const (
createControllerOperation controllerOperation = "create"
updateControllerOperation controllerOperation = "update"
deleteControllerOperation controllerOperation = "delete"
noopControllerOperation controllerOperation = "none"
)

var (
gvrDDM = datadoghq.GroupVersion.WithResource("datadogmetrics")
metaDDM = metav1.TypeMeta{
Expand Down Expand Up @@ -92,30 +101,37 @@ func NewDatadogMetricController(client dynamic.Interface, informer dynamicinform
}

// Run starts the controller to handle DatadogMetrics
func (c *DatadogMetricController) Run(ctx context.Context) {
func (c *DatadogMetricController) Run(ctx context.Context, numWorkers int) {
if ctx == nil {
log.Errorf("Cannot run with a nil context")
return
}
c.context = ctx

defer c.workqueue.ShutDown()

log.Infof("Starting DatadogMetric Controller (waiting for cache sync)")
if !cache.WaitForCacheSync(ctx.Done(), c.synced) {
log.Errorf("Failed to wait for DatadogMetric caches to sync")
return
}

go wait.Until(c.worker, time.Second, ctx.Done())
for i := 0; i < numWorkers; i++ {
go c.worker(i)
}

log.Infof("Started DatadogMetric Controller (cache sync finished)")
<-ctx.Done()
log.Infof("Stopping DatadogMetric Controller")
if c.isLeader() {
c.workqueue.ShutDownWithDrain()
} else {
c.workqueue.ShutDown()
}
log.Infof("DatadogMetric Controller stopped")
}

func (c *DatadogMetricController) worker() {
for c.process() {
func (c *DatadogMetricController) worker(workerID int) {
log.Debugf("Starting DatadogMetric worker: %d", workerID)
for c.process(workerID) {
}
}

Expand All @@ -135,16 +151,25 @@ func (c *DatadogMetricController) enqueueID(id, sender string) {
}
}

func (c *DatadogMetricController) process() bool {
func (c *DatadogMetricController) process(workerID int) bool {
key, shutdown := c.workqueue.Get()
if shutdown {
log.Infof("DatadogMetric Controller: Caught stop signal in workqueue")
return false
}

// We start the timer after waiting on the queue itself to have actual processing time.
startTime := time.Now()
operation := noopControllerOperation
var err error

defer func() {
reconcileElapsed.Observe(time.Since(startTime).Seconds(), string(operation), inErrorLabelValue(err), le.JoinLeaderValue)
}()

defer c.workqueue.Done(key)

err := c.processDatadogMetric(key)
operation, err = c.processDatadogMetric(workerID, key)
if err == nil {
c.workqueue.Forget(key)
} else {
Expand All @@ -158,13 +183,13 @@ func (c *DatadogMetricController) process() bool {
return true
}

func (c *DatadogMetricController) processDatadogMetric(key interface{}) error {
func (c *DatadogMetricController) processDatadogMetric(workerID int, key interface{}) (controllerOperation, error) {
datadogMetricKey := key.(string)
log.Debugf("Processing DatadogMetric: %s", datadogMetricKey)
log.Tracef("Processing DatadogMetric: %s - worker %d", datadogMetricKey, workerID)

ns, name, err := cache.SplitMetaNamespaceKey(datadogMetricKey)
if err != nil {
return fmt.Errorf("Could not split the key: %v", err)
return noopControllerOperation, fmt.Errorf("Could not split the key: %v", err)
}

datadogMetricCached := &datadoghq.DatadogMetric{}
Expand All @@ -178,34 +203,30 @@ func (c *DatadogMetricController) processDatadogMetric(key interface{}) error {
// We ignore not found here as we may need to create a DatadogMetric later
datadogMetricCached = nil
case err != nil:
return fmt.Errorf("Unable to retrieve DatadogMetric: %w", err)
case datadogMetricCached == nil:
return fmt.Errorf("Could not parse empty DatadogMetric from local cache")
return noopControllerOperation, fmt.Errorf("Unable to retrieve DatadogMetric: %w", err)
}

// No error path, check what to do with this event
if c.isLeader() {
err = c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached)
if err != nil {
return err
}
return c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached)
}

// Follower flow
if datadogMetricCached != nil {
// Feeding local cache with DatadogMetric information
c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID)
setDatadogMetricTelemetry(datadogMetricCached)
} else {
if datadogMetricCached != nil {
// Feeding local cache with DatadogMetric information
c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID)
setDatadogMetricTelemetry(datadogMetricCached)
} else {
c.store.Delete(datadogMetricKey, ddmControllerStoreID)
unsetDatadogMetricTelemetry(ns, name)
}
c.store.Delete(datadogMetricKey, ddmControllerStoreID)
unsetDatadogMetricTelemetry(ns, name)
}

return nil
return noopControllerOperation, nil
}

// Synchronize DatadogMetric state between internal store and Kubernetes objects
// Make sure any `return` has the proper store Unlock
func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) error {
func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) (controllerOperation, error) {
datadogMetricInternal := c.store.LockRead(datadogMetricKey, true)
if datadogMetricInternal == nil {
if datadogMetric != nil {
Expand All @@ -216,20 +237,20 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s
c.store.Unlock(datadogMetricKey)
}

return nil
return noopControllerOperation, nil
}

// If DatadogMetric object is not present in Kubernetes, we need to clear our store (removed by user) or create it (autogen)
if datadogMetric == nil {
if datadogMetricInternal.Autogen && !datadogMetricInternal.Deleted {
err := c.createDatadogMetric(ns, name, datadogMetricInternal)
c.store.Unlock(datadogMetricKey)
return err
return createControllerOperation, err
}

// Already deleted in Kube, cleaning internal store
c.store.UnlockDelete(datadogMetricKey, ddmControllerStoreID)
return nil
return noopControllerOperation, nil
}

// Objects exists in both places (local store and K8S), we need to sync them
Expand All @@ -241,20 +262,19 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s
c.store.Unlock(datadogMetricKey)
// We add a requeue in case the deleted event is lost
c.workqueue.AddAfter(datadogMetricKey, time.Duration(requeueDelaySeconds)*time.Second)
return c.deleteDatadogMetric(ns, name)
return deleteControllerOperation, c.deleteDatadogMetric(ns, name)
}

// After this `Unlock`, datadogMetricInternal cannot be modified
datadogMetricInternal.UpdateFrom(*datadogMetric)
defer c.store.UnlockSet(datadogMetricInternal.ID, *datadogMetricInternal, ddmControllerStoreID)
c.store.UnlockSet(datadogMetricKey, *datadogMetricInternal, ddmControllerStoreID)

if datadogMetricInternal.IsNewerThan(datadogMetric.Status) {
err := c.updateDatadogMetric(ns, name, datadogMetricInternal, datadogMetric)
if err != nil {
return err
}
return updateControllerOperation, err
}

return nil
return noopControllerOperation, nil
}

func (c *DatadogMetricController) createDatadogMetric(ns, name string, datadogMetricInternal *model.DatadogMetricInternal) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (f *fixture) runControllerSync(leader bool, datadogMetricID string, expecte
defer close(stopCh)
informer.Start(stopCh)

err := controller.processDatadogMetric(datadogMetricID)
_, err := controller.processDatadogMetric(0, datadogMetricID)
assert.Equal(f.t, expectedError, err)

actions := autoscaling.FilterInformerActions(f.client.Actions(), "datadogmetrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func (mr *MetricsRetriever) Run(stopCh <-chan struct{}) {
if mr.isLeader() {
startTime := time.Now()
mr.retrieveMetricsValues()
retrieverElapsed.Observe(time.Since(startTime).Seconds())
retrieverElapsed.Observe(time.Since(startTime).Seconds(), le.JoinLeaderValue)
}
case <-stopCh:
log.Infof("Stopping MetricsRetriever")
Expand Down Expand Up @@ -196,7 +197,6 @@ func (mr *MetricsRetriever) retrieveMetricsValuesSlice(datadogMetrics []model.Da
}

datadogMetricFromStore.UpdateTime = currentTime

mr.store.UnlockSet(datadogMetric.ID, *datadogMetricFromStore, metricRetrieverStoreID)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type metricsFixture struct {
expected []ddmWithQuery
}

//nolint:revive // TODO(CINT) Fix revive linter
func (f *metricsFixture) run(t *testing.T, testTime time.Time) {
func (f *metricsFixture) run(t *testing.T) {
t.Helper()

// Create and fill store
Expand Down Expand Up @@ -174,7 +173,7 @@ func TestRetrieveMetricsBasic(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down Expand Up @@ -500,7 +499,7 @@ func TestRetrieveMetricsErrorCases(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down Expand Up @@ -639,7 +638,7 @@ func TestRetrieveMetricsNotActive(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (d *DatadogMetricInternal) resolveQuery(query string) {
return
}
if resolvedQuery != "" {
log.Infof("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery)
log.Debugf("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery)
d.resolvedQuery = &resolvedQuery
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusteragent/autoscaling/externalmetrics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d
autogenNamespace := common.GetResourcesNamespace()
autogenEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.enable_datadogmetric_autogen")
wpaEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.wpa_controller")
numWorkers := pkgconfigsetup.Datadog().GetInt("external_metrics_provider.num_workers")

provider := &datadogMetricProvider{
apiCl: apiCl,
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d
apiCl.InformerFactory.Start(ctx.Done())

go autoscalerWatcher.Run(ctx.Done())
go controller.Run(ctx)
go controller.Run(ctx, numWorkers)

return provider, nil
}
Expand All @@ -133,7 +134,7 @@ func (p *datadogMetricProvider) GetExternalMetric(_ context.Context, namespace s
}
}

setQueryTelemtry("get", namespace, startTime, err)
setQueryTelemtry("get", startTime, err)
return res, err
}

Expand Down
Loading
Loading