From 45194b76729f0323bd005fcd420ec15aaa6b1c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wilson=20J=C3=BAnior?= Date: Tue, 12 Mar 2024 11:38:00 -0300 Subject: [PATCH] Add initial draft to allow limit number of changes on cluster --- api/v1alpha1/rpaasinstance_types.go | 9 +- .../extensions.tsuru.io_rpaasinstances.yaml | 9 +- controllers/controller.go | 229 ++++++--- controllers/controller_test.go | 478 +++++++++++++++++- controllers/rpaasinstance_controller.go | 80 ++- controllers/system_rate_limit.go | 50 ++ main.go | 13 +- 7 files changed, 768 insertions(+), 100 deletions(-) create mode 100644 controllers/system_rate_limit.go diff --git a/api/v1alpha1/rpaasinstance_types.go b/api/v1alpha1/rpaasinstance_types.go index 441bbf785..bcca273d6 100644 --- a/api/v1alpha1/rpaasinstance_types.go +++ b/api/v1alpha1/rpaasinstance_types.go @@ -340,7 +340,14 @@ type TLSSessionTicket struct { // RpaasInstanceStatus defines the observed state of RpaasInstance type RpaasInstanceStatus struct { - // Revision hash calculated for the current spec. + //Revision hash calculated for the current spec of rpaasinstance + RevisionHash string `json:"revisionHash,omitempty"` + + // NginxUpdated is true if the wanted nginx revision hash equals the + // observed nginx revision hash. + ReconcileDelayed bool `json:"reconcileDelayed,omitempty"` + + // Revision hash calculated for the current spec of nginx. WantedNginxRevisionHash string `json:"wantedNginxRevisionHash,omitempty"` // The revision hash observed by the controller in the nginx object. diff --git a/config/crd/bases/extensions.tsuru.io_rpaasinstances.yaml b/config/crd/bases/extensions.tsuru.io_rpaasinstances.yaml index bf7af8cd2..3d839a329 100644 --- a/config/crd/bases/extensions.tsuru.io_rpaasinstances.yaml +++ b/config/crd/bases/extensions.tsuru.io_rpaasinstances.yaml @@ -6476,8 +6476,15 @@ spec: podSelector: description: PodSelector is the NGINX's pod label selector. type: string + reconcileDelayed: + description: NginxUpdated is true if the wanted nginx revision hash + equals the observed nginx revision hash. + type: boolean + revisionHash: + description: Revision hash calculated for the current spec of rpaasinstance + type: string wantedNginxRevisionHash: - description: Revision hash calculated for the current spec. + description: Revision hash calculated for the current spec of nginx. type: string required: - nginxUpdated diff --git a/controllers/controller.go b/controllers/controller.go index cf6d8eb13..13910a5b4 100644 --- a/controllers/controller.go +++ b/controllers/controller.go @@ -240,20 +240,26 @@ func (r *RpaasInstanceReconciler) listDefaultFlavors(ctx context.Context, instan return result, nil } -func (r *RpaasInstanceReconciler) reconcileTLSSessionResumption(ctx context.Context, instance *v1alpha1.RpaasInstance) error { - if err := r.reconcileSecretForSessionTickets(ctx, instance); err != nil { - return err +func (r *RpaasInstanceReconciler) reconcileTLSSessionResumption(ctx context.Context, instance *v1alpha1.RpaasInstance) (changed bool, err error) { + secretChanged, err := r.reconcileSecretForSessionTickets(ctx, instance) + if err != nil { + return false, err + } + + cronJobChanged, err := r.reconcileCronJobForSessionTickets(ctx, instance) + if err != nil { + return false, err } - return r.reconcileCronJobForSessionTickets(ctx, instance) + return cronJobChanged || secretChanged, nil } -func (r *RpaasInstanceReconciler) reconcileSecretForSessionTickets(ctx context.Context, instance *v1alpha1.RpaasInstance) error { +func (r *RpaasInstanceReconciler) reconcileSecretForSessionTickets(ctx context.Context, instance *v1alpha1.RpaasInstance) (changed bool, err error) { enabled := isTLSSessionTicketEnabled(instance) newSecret, err := newSecretForTLSSessionTickets(instance) if err != nil { - return err + return false, err } var secret corev1.Secret @@ -264,30 +270,44 @@ func (r *RpaasInstanceReconciler) reconcileSecretForSessionTickets(ctx context.C err = r.Client.Get(ctx, secretName, &secret) if err != nil && k8sErrors.IsNotFound(err) { if !enabled { - return nil + return false, nil } - return r.Client.Create(ctx, newSecret) - } + err = r.Client.Create(ctx, newSecret) + if err != nil { + return false, err + } - if err != nil { - return err + return true, nil + } else if err != nil { + return false, err } if !enabled { - return r.Client.Delete(ctx, &secret) + err = r.Client.Delete(ctx, &secret) + if err != nil { + return false, err + } + + return true, nil } newData := newSessionTicketData(secret.Data, newSecret.Data) if !reflect.DeepEqual(newData, secret.Data) { secret.Data = newData - return r.Client.Update(ctx, &secret) + err = r.Client.Update(ctx, &secret) + + if err != nil { + return false, err + } + + return true, nil } - return nil + return false, nil } -func (r *RpaasInstanceReconciler) reconcileCronJobForSessionTickets(ctx context.Context, instance *v1alpha1.RpaasInstance) error { +func (r *RpaasInstanceReconciler) reconcileCronJobForSessionTickets(ctx context.Context, instance *v1alpha1.RpaasInstance) (changed bool, err error) { enabled := isTLSSessionTicketEnabled(instance) newCronJob := newCronJobForSessionTickets(instance) @@ -297,29 +317,41 @@ func (r *RpaasInstanceReconciler) reconcileCronJobForSessionTickets(ctx context. Name: newCronJob.Name, Namespace: newCronJob.Namespace, } - err := r.Client.Get(ctx, cjName, &cj) + err = r.Client.Get(ctx, cjName, &cj) if err != nil && k8sErrors.IsNotFound(err) { if !enabled { - return nil + return false, nil } - return r.Client.Create(ctx, newCronJob) - } + err = r.Client.Create(ctx, newCronJob) + if err != nil { + return false, err + } - if err != nil { - return err + return true, nil + } else if err != nil { + return false, err } if !enabled { - return r.Client.Delete(ctx, &cj) + err = r.Client.Delete(ctx, &cj) + if err != nil { + return false, err + } + return true, nil } if equality.Semantic.DeepDerivative(newCronJob.Spec, cj.Spec) { - return nil + return false, nil } newCronJob.ResourceVersion = cj.ResourceVersion - return r.Client.Update(ctx, newCronJob) + err = r.Client.Update(ctx, newCronJob) + if err != nil { + return false, err + } + + return true, nil } func newCronJobForSessionTickets(instance *v1alpha1.RpaasInstance) *batchv1.CronJob { @@ -522,13 +554,14 @@ func newSessionTicketData(old, new map[string][]byte) map[string][]byte { return newest } -func (r *RpaasInstanceReconciler) reconcileHPA(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) error { +func (r *RpaasInstanceReconciler) reconcileHPA(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (changed bool, err error) { if isKEDAHandlingHPA(instance) { return r.reconcileKEDA(ctx, instance, nginx) } - if err := r.cleanUpKEDAScaledObject(ctx, instance); err != nil { - return err + cleanedKeda, err := r.cleanUpKEDAScaledObject(ctx, instance) + if err != nil { + return false, err } logger := r.Log.WithName("reconcileHPA"). @@ -548,26 +581,26 @@ func (r *RpaasInstanceReconciler) reconcileHPA(ctx context.Context, instance *v1 desired := newHPA(instance, nginx) var observed autoscalingv2.HorizontalPodAutoscaler - err := r.Client.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &observed) + err = r.Client.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &observed) if k8sErrors.IsNotFound(err) { if !isAutoscaleEnabled(instance.Spec.Autoscale) { logger.V(4).Info("Skipping HorizontalPodAutoscaler reconciliation: both HPA resource and desired RpaasAutoscaleSpec not found") - return nil + return cleanedKeda, nil } logger.V(4).Info("Creating HorizontalPodAutoscaler resource") if err = r.Client.Create(ctx, desired); err != nil { logger.Error(err, "Unable to create the HorizontalPodAutoscaler resource") - return err + return false, err } - return nil + return true, nil } if err != nil { logger.Error(err, "Unable to get the HorizontalPodAutoscaler resource") - return err + return false, err } logger = logger.WithValues("HorizontalPodAutoscaler", types.NamespacedName{Name: observed.Name, Namespace: observed.Namespace}) @@ -576,10 +609,10 @@ func (r *RpaasInstanceReconciler) reconcileHPA(ctx context.Context, instance *v1 logger.V(4).Info("Deleting HorizontalPodAutoscaler resource") if err = r.Client.Delete(ctx, &observed); err != nil { logger.Error(err, "Unable to delete the HorizontalPodAutoscaler resource") - return err + return false, err } - return nil + return true, nil } if !reflect.DeepEqual(desired.Spec, observed.Spec) { @@ -588,25 +621,31 @@ func (r *RpaasInstanceReconciler) reconcileHPA(ctx context.Context, instance *v1 observed.Spec = desired.Spec if err = r.Client.Update(ctx, &observed); err != nil { logger.Error(err, "Unable to update the HorizontalPodAustoscaler resource") - return err + return false, err } + + return true, nil } - return nil + return cleanedKeda, nil } -func (r *RpaasInstanceReconciler) cleanUpKEDAScaledObject(ctx context.Context, instance *v1alpha1.RpaasInstance) error { +func (r *RpaasInstanceReconciler) cleanUpKEDAScaledObject(ctx context.Context, instance *v1alpha1.RpaasInstance) (cleaned bool, err error) { var so kedav1alpha1.ScaledObject - err := r.Client.Get(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, &so) + err = r.Client.Get(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, &so) if k8sErrors.IsNotFound(err) { - return nil + return false, nil } if err != nil { - return nil // custom resource does likely not exist in the cluster, so we should ignore it + return false, nil // custom resource does likely not exist in the cluster, so we should ignore it } - return r.Client.Delete(ctx, &so) + err = r.Client.Delete(ctx, &so) + if err != nil { + return false, err + } + return true, nil } func isKEDAHandlingHPA(instance *v1alpha1.RpaasInstance) bool { @@ -616,36 +655,48 @@ func isKEDAHandlingHPA(instance *v1alpha1.RpaasInstance) bool { instance.Spec.Autoscale.KEDAOptions.Enabled } -func (r *RpaasInstanceReconciler) reconcileKEDA(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) error { +func (r *RpaasInstanceReconciler) reconcileKEDA(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (changed bool, err error) { desired, err := newKEDAScaledObject(instance, nginx) if err != nil { - return err + return false, err } var observed kedav1alpha1.ScaledObject err = r.Client.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &observed) if k8sErrors.IsNotFound(err) { if !isAutoscaleEnabled(instance.Spec.Autoscale) { - return nil // nothing to do + return false, nil // nothing to do } - return r.Client.Create(ctx, desired) + err = r.Client.Create(ctx, desired) + if err != nil { + return false, err + } + return true, nil } if err != nil { - return err + return false, err } if !isAutoscaleEnabled(instance.Spec.Autoscale) { - return r.Client.Delete(ctx, &observed) + err = r.Client.Delete(ctx, &observed) + if err != nil { + return false, err + } + return true, nil } - if !reflect.DeepEqual(desired.Spec, observed.Spec) { - desired.ResourceVersion = observed.ResourceVersion - return r.Client.Update(ctx, desired) + if reflect.DeepEqual(desired.Spec, observed.Spec) { + return false, nil } - return nil + desired.ResourceVersion = observed.ResourceVersion + err = r.Client.Update(ctx, desired) + if err != nil { + return false, err + } + return true, nil } func isAutoscaleEnabled(a *v1alpha1.RpaasInstanceAutoscaleSpec) bool { @@ -778,39 +829,51 @@ func newKEDAScaledObject(instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1. }, nil } -func (r *RpaasInstanceReconciler) reconcilePDB(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) error { +func (r *RpaasInstanceReconciler) reconcilePDB(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (changed bool, err error) { if nginx.Status.PodSelector == "" { - return nil + return false, nil } pdb, err := newPDB(instance, nginx) if err != nil { - return err + return false, err } var existingPDB policyv1.PodDisruptionBudget err = r.Get(ctx, client.ObjectKey{Name: pdb.Name, Namespace: pdb.Namespace}, &existingPDB) if err != nil { if !k8sErrors.IsNotFound(err) { - return err + return false, err } if instance.Spec.EnablePodDisruptionBudget != nil && *instance.Spec.EnablePodDisruptionBudget { - return r.Create(ctx, pdb) + err = r.Create(ctx, pdb) + if err != nil { + return false, err + } + return true, nil } - return nil + return false, nil } if instance.Spec.EnablePodDisruptionBudget == nil || (instance.Spec.EnablePodDisruptionBudget != nil && !*instance.Spec.EnablePodDisruptionBudget) { - return r.Delete(ctx, &existingPDB) + err = r.Delete(ctx, &existingPDB) + if err != nil { + return false, err + } + return true, nil } - if equality.Semantic.DeepDerivative(existingPDB.Spec, pdb.Spec) { - return nil + if equality.Semantic.DeepDerivative(existingPDB.Spec, pdb.Spec) && reflect.DeepEqual(existingPDB.Labels, pdb.Labels) { + return false, nil } pdb.ResourceVersion = existingPDB.ResourceVersion - return r.Update(ctx, pdb) + err = r.Update(ctx, pdb) + if err != nil { + return false, err + } + return true, nil } func newPDB(instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (*policyv1.PodDisruptionBudget, error) { @@ -849,28 +912,34 @@ func newPDB(instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (*poli }, nil } -func (r *RpaasInstanceReconciler) reconcileConfigMap(ctx context.Context, configMap *corev1.ConfigMap) error { +func (r *RpaasInstanceReconciler) reconcileConfigMap(ctx context.Context, configMap *corev1.ConfigMap) (changed bool, err error) { found := &corev1.ConfigMap{} - err := r.Client.Get(ctx, types.NamespacedName{Name: configMap.ObjectMeta.Name, Namespace: configMap.ObjectMeta.Namespace}, found) + err = r.Client.Get(ctx, types.NamespacedName{Name: configMap.ObjectMeta.Name, Namespace: configMap.ObjectMeta.Namespace}, found) if err != nil { if !k8sErrors.IsNotFound(err) { logrus.Errorf("Failed to get configMap: %v", err) - return err + return false, err } err = r.Client.Create(ctx, configMap) if err != nil { logrus.Errorf("Failed to create configMap: %v", err) - return err + return false, err } - return nil + return true, nil } configMap.ObjectMeta.ResourceVersion = found.ObjectMeta.ResourceVersion + + if reflect.DeepEqual(found.Data, configMap.Data) && reflect.DeepEqual(found.BinaryData, configMap.BinaryData) && reflect.DeepEqual(found.Labels, configMap.Labels) { + return false, nil + } + err = r.Client.Update(ctx, configMap) if err != nil { logrus.Errorf("Failed to update configMap: %v", err) + return false, err } - return err + return true, nil } func (r *RpaasInstanceReconciler) getNginx(ctx context.Context, instance *v1alpha1.RpaasInstance) (*nginxv1alpha1.Nginx, error) { @@ -901,29 +970,33 @@ func externalAddresssesFromNginx(nginx *nginxv1alpha1.Nginx) v1alpha1.RpaasInsta return ingressesStatus } -func (r *RpaasInstanceReconciler) reconcileNginx(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) error { +func (r *RpaasInstanceReconciler) reconcileNginx(ctx context.Context, instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) (changed bool, err error) { found, err := r.getNginx(ctx, instance) if err != nil { if !k8sErrors.IsNotFound(err) { logrus.Errorf("Failed to get nginx CR: %v", err) - return err + return false, err } err = r.Client.Create(ctx, nginx) if err != nil { logrus.Errorf("Failed to create nginx CR: %v", err) - return err + return false, err } - return nil + return true, nil } + if equality.Semantic.DeepEqual(nginx.Spec, found.Spec) { + return false, nil + } nginx.ObjectMeta.ResourceVersion = found.ObjectMeta.ResourceVersion err = r.Client.Update(ctx, nginx) if err != nil { logrus.Errorf("Failed to update nginx CR: %v", err) + return false, err } - return err + return true, nil } func (r *RpaasInstanceReconciler) renderTemplate(ctx context.Context, instance *v1alpha1.RpaasInstance, plan *v1alpha1.RpaasPlan) (string, error) { @@ -1216,6 +1289,18 @@ func generateNginxHash(nginx *nginxv1alpha1.Nginx) (string, error) { return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:])), nil } +func generateSpecHash(spec *v1alpha1.RpaasInstanceSpec) (string, error) { + if spec == nil { + return "", nil + } + data, err := json.Marshal(spec) + if err != nil { + return "", err + } + hash := sha256.Sum256(data) + return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:])), nil +} + func newHPA(instance *v1alpha1.RpaasInstance, nginx *nginxv1alpha1.Nginx) *autoscalingv2.HorizontalPodAutoscaler { var metrics []autoscalingv2.MetricSpec diff --git a/controllers/controller_test.go b/controllers/controller_test.go index 153bf81b5..325a7c1f0 100644 --- a/controllers/controller_test.go +++ b/controllers/controller_test.go @@ -18,7 +18,6 @@ import ( corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -990,7 +989,8 @@ func Test_reconcileHPA(t *testing.T) { expectedScaledObject func(*kedav1alpha1.ScaledObject) *kedav1alpha1.ScaledObject customAssert func(t *testing.T, r *RpaasInstanceReconciler) bool - expectedError func(t *testing.T) + expectedError func(t *testing.T) + expectedChanged bool }{ "(native HPA controller) setting autoscaling params first time": { instance: func(ri *v1alpha1.RpaasInstance) *v1alpha1.RpaasInstance { @@ -1025,6 +1025,7 @@ func Test_reconcileHPA(t *testing.T) { } return hpa }, + expectedChanged: true, }, "(native HPA controller) updating autoscaling params": { @@ -1063,6 +1064,7 @@ func Test_reconcileHPA(t *testing.T) { } return ri }, + expectedChanged: true, expectedHPA: func(hpa *autoscalingv2.HorizontalPodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler { hpa.ResourceVersion = "2" // second change hpa.Spec = autoscalingv2.HorizontalPodAutoscalerSpec{ @@ -1100,6 +1102,69 @@ func Test_reconcileHPA(t *testing.T) { }, }, + "(native HPA controller) there is nothing to update": { + resources: []runtime.Object{ + func(hpa *autoscalingv2.HorizontalPodAutoscaler) runtime.Object { + hpa.Spec = autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "my-instance", + }, + MinReplicas: pointer.Int32(1), + MaxReplicas: 10, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: "cpu", + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageUtilization: pointer.Int32(50), + }, + }, + }, + }, + } + return hpa + }(baseExpectedHPA.DeepCopy()), + }, + instance: func(ri *v1alpha1.RpaasInstance) *v1alpha1.RpaasInstance { + ri.Spec.Autoscale = &v1alpha1.RpaasInstanceAutoscaleSpec{ + MinReplicas: pointer.Int32(1), + MaxReplicas: 10, + TargetCPUUtilizationPercentage: pointer.Int32(50), + } + return ri + }, + expectedChanged: false, + expectedHPA: func(hpa *autoscalingv2.HorizontalPodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler { + hpa.ResourceVersion = "1" // second change + hpa.Spec = autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "my-instance", + }, + MinReplicas: pointer.Int32(1), + MaxReplicas: 10, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: "cpu", + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageUtilization: pointer.Int32(50), + }, + }, + }, + }, + } + return hpa + }, + }, + "(native HPA controller) removing autoscale params": { resources: []runtime.Object{ baseExpectedHPA.DeepCopy(), @@ -1109,6 +1174,7 @@ func Test_reconcileHPA(t *testing.T) { err := r.Client.Get(context.TODO(), types.NamespacedName{Name: "my-instance", Namespace: "default"}, &hpa) return assert.True(t, k8sErrors.IsNotFound(err)) }, + expectedChanged: true, }, "(native HPA controller) with RPS enabled": { @@ -1125,6 +1191,7 @@ func Test_reconcileHPA(t *testing.T) { require.True(t, ok, "event recorder must be FakeRecorder") return assert.Equal(t, "Warning RpaasInstanceAutoscaleFailed native HPA controller doesn't support RPS metric target yet", <-rec.Events) }, + expectedChanged: true, }, "(native HPA controller) with scheduled windows": { @@ -1143,6 +1210,7 @@ func Test_reconcileHPA(t *testing.T) { require.True(t, ok, "event recorder must be FakeRecorder") return assert.Equal(t, "Warning RpaasInstanceAutoscaleFailed native HPA controller doesn't support scheduled windows", <-rec.Events) }, + expectedChanged: true, }, "(KEDA controller) with RPS enabled": { @@ -1174,6 +1242,7 @@ func Test_reconcileHPA(t *testing.T) { } return so }, + expectedChanged: true, }, "(KEDA controller) updating autoscaling params": { @@ -1201,6 +1270,7 @@ func Test_reconcileHPA(t *testing.T) { return so }(baseExpectedScaledObject.DeepCopy()), }, + expectedChanged: true, instance: func(ri *v1alpha1.RpaasInstance) *v1alpha1.RpaasInstance { ri.Spec.Autoscale = &v1alpha1.RpaasInstanceAutoscaleSpec{ MinReplicas: func(n int32) *int32 { return &n }(5), @@ -1250,6 +1320,82 @@ func Test_reconcileHPA(t *testing.T) { }, }, + "(KEDA controller) there is nothing to update": { + resources: []runtime.Object{ + func(so *kedav1alpha1.ScaledObject) runtime.Object { + so.Spec = kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "my-instance", + }, + MinReplicaCount: pointer.Int32(2), + MaxReplicaCount: pointer.Int32(500), + PollingInterval: pointer.Int32(5), + Advanced: &kedav1alpha1.AdvancedConfig{ + HorizontalPodAutoscalerConfig: &kedav1alpha1.HorizontalPodAutoscalerConfig{ + Name: "my-instance", + }, + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "prometheus", + Metadata: map[string]string{ + "serverAddress": "https://prometheus.example.com", + "query": `sum(rate(nginx_vts_requests_total{instance="my-instance", namespace="default"}[5m]))`, + "threshold": "50", + }, + AuthenticationRef: &kedav1alpha1.ScaledObjectAuthRef{ + Name: "prometheus-auth", + Kind: "ClusterTriggerAuthentication", + }, + }, + }, + } + return so + }(baseExpectedScaledObject.DeepCopy()), + }, + expectedChanged: false, + instance: func(ri *v1alpha1.RpaasInstance) *v1alpha1.RpaasInstance { + ri.Spec.Autoscale = &v1alpha1.RpaasInstanceAutoscaleSpec{ + MinReplicas: pointer.Int32(2), + MaxReplicas: 500, + TargetRequestsPerSecond: pointer.Int32(50), + KEDAOptions: &v1alpha1.AutoscaleKEDAOptions{ + Enabled: true, + PrometheusServerAddress: "https://prometheus.example.com", + RPSQueryTemplate: `sum(rate(nginx_vts_requests_total{instance="{{ .Name }}", namespace="{{ .Namespace }}"}[5m]))`, + RPSAuthenticationRef: &kedav1alpha1.ScaledObjectAuthRef{ + Kind: "ClusterTriggerAuthentication", + Name: "prometheus-auth", + }, + PollingInterval: pointer.Int32(5), + }, + } + return ri + }, + expectedScaledObject: func(so *kedav1alpha1.ScaledObject) *kedav1alpha1.ScaledObject { + so.Spec.MinReplicaCount = pointer.Int32(2) + so.Spec.MaxReplicaCount = pointer.Int32(500) + so.Spec.PollingInterval = pointer.Int32(5) + so.Spec.Triggers = []kedav1alpha1.ScaleTriggers{ + { + Type: "prometheus", + Metadata: map[string]string{ + "serverAddress": "https://prometheus.example.com", + "query": `sum(rate(nginx_vts_requests_total{instance="my-instance", namespace="default"}[5m]))`, + "threshold": "50", + }, + AuthenticationRef: &kedav1alpha1.ScaledObjectAuthRef{ + Kind: "ClusterTriggerAuthentication", + Name: "prometheus-auth", + }, + }, + } + return so + }, + }, + "(KEDA controller) removing autoscaling params": { resources: []runtime.Object{ baseExpectedScaledObject.DeepCopy(), @@ -1269,6 +1415,7 @@ func Test_reconcileHPA(t *testing.T) { err := r.Client.Get(context.TODO(), types.NamespacedName{Name: baseExpectedScaledObject.Name, Namespace: baseExpectedScaledObject.Namespace}, &so) return assert.True(t, k8sErrors.IsNotFound(err), "ScaledObject resource should not exist") }, + expectedChanged: true, }, "(KEDA controller) KEDA controller enabled, but instance does not have RPS trigger": { @@ -1315,6 +1462,8 @@ func Test_reconcileHPA(t *testing.T) { err := r.Client.Get(context.TODO(), types.NamespacedName{Name: baseExpectedScaledObject.Name, Namespace: baseExpectedScaledObject.Namespace}, &so) return assert.True(t, k8sErrors.IsNotFound(err), "ScaledObject resource should not exist") }, + + expectedChanged: true, }, "(KEDA controller) with scheduled windows": { @@ -1333,6 +1482,7 @@ func Test_reconcileHPA(t *testing.T) { } return ri }, + expectedChanged: true, expectedScaledObject: func(so *kedav1alpha1.ScaledObject) *kedav1alpha1.ScaledObject { so.Spec.MinReplicaCount = func(n int32) *int32 { return &n }(0) so.Spec.MaxReplicaCount = func(n int32) *int32 { return &n }(50) @@ -1395,7 +1545,7 @@ func Test_reconcileHPA(t *testing.T) { r := newRpaasInstanceReconciler(resources...) - err := r.reconcileHPA(context.TODO(), instance, nginx) + changed, err := r.reconcileHPA(context.TODO(), instance, nginx) require.NoError(t, err) if tt.expectedHPA == nil && tt.expectedScaledObject == nil && tt.customAssert == nil { @@ -1419,11 +1569,14 @@ func Test_reconcileHPA(t *testing.T) { require.NoError(t, err) assert.Equal(t, tt.expectedScaledObject(baseExpectedScaledObject.DeepCopy()), got.DeepCopy()) } + + assert.Equal(t, tt.expectedChanged, changed) }) } } func Test_reconcilePDB(t *testing.T) { + defaultMaxAvailable := intstr.FromString("10%") resources := []runtime.Object{ &v1alpha1.RpaasInstance{ ObjectMeta: metav1.ObjectMeta{ @@ -1465,12 +1618,48 @@ func Test_reconcilePDB(t *testing.T) { }, }, }, + + &policyv1.PodDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "policy/v1", + Kind: "PodDisruptionBudget", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "unchanged-instance", + Namespace: "rpaasv2", + Labels: map[string]string{ + "rpaas.extensions.tsuru.io/instance-name": "unchanged-instance", + "rpaas.extensions.tsuru.io/plan-name": "", + "rpaas.extensions.tsuru.io/service-name": "", + "rpaas.extensions.tsuru.io/team-owner": "", + "rpaas_instance": "unchanged-instance", + "rpaas_service": "", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "extensions.tsuru.io/v1alpha1", + Kind: "RpaasInstance", + Name: "unchanged-instance", + Controller: func(b bool) *bool { return &b }(true), + BlockOwnerDeletion: func(b bool) *bool { return &b }(true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &defaultMaxAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "unchanged-instance"}, + }, + }, + }, } tests := map[string]struct { instance *v1alpha1.RpaasInstance nginx *nginxv1alpha1.Nginx assert func(t *testing.T, c client.Client) + + expectedChanged bool }{ "creating PDB, instance with 1 replicas": { instance: &v1alpha1.RpaasInstance{ @@ -1479,10 +1668,11 @@ func Test_reconcilePDB(t *testing.T) { Namespace: "rpaasv2", }, Spec: v1alpha1.RpaasInstanceSpec{ - EnablePodDisruptionBudget: func(b bool) *bool { return &b }(true), - Replicas: func(n int32) *int32 { return &n }(1), + EnablePodDisruptionBudget: pointer.Bool(true), + Replicas: pointer.Int32(1), }, }, + expectedChanged: true, nginx: &nginxv1alpha1.Nginx{ ObjectMeta: metav1.ObjectMeta{ Name: "my-instance", @@ -1532,7 +1722,7 @@ func Test_reconcilePDB(t *testing.T) { }, }, Spec: policyv1.PodDisruptionBudgetSpec{ - MaxUnavailable: func(n intstr.IntOrString) *intstr.IntOrString { return &n }(intstr.FromString("10%")), + MaxUnavailable: &defaultMaxAvailable, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "my-instance"}, }, @@ -1552,6 +1742,7 @@ func Test_reconcilePDB(t *testing.T) { Replicas: func(n int32) *int32 { return &n }(10), }, }, + expectedChanged: true, nginx: &nginxv1alpha1.Nginx{ ObjectMeta: metav1.ObjectMeta{ Name: "my-instance", @@ -1601,7 +1792,7 @@ func Test_reconcilePDB(t *testing.T) { }, }, Spec: policyv1.PodDisruptionBudgetSpec{ - MaxUnavailable: func(n intstr.IntOrString) *intstr.IntOrString { return &n }(intstr.FromString("10%")), + MaxUnavailable: &defaultMaxAvailable, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "my-instance"}, }, @@ -1642,6 +1833,7 @@ func Test_reconcilePDB(t *testing.T) { PodSelector: "nginx.tsuru.io/resource-name=another-instance", }, }, + expectedChanged: true, assert: func(t *testing.T, c client.Client) { var pdb policyv1.PodDisruptionBudget err := c.Get(context.TODO(), client.ObjectKey{Name: "another-instance", Namespace: "rpaasv2"}, &pdb) @@ -1674,7 +1866,7 @@ func Test_reconcilePDB(t *testing.T) { }, }, Spec: policyv1.PodDisruptionBudgetSpec{ - MaxUnavailable: func(n intstr.IntOrString) *intstr.IntOrString { return &n }(intstr.FromString("10%")), + MaxUnavailable: &defaultMaxAvailable, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "another-instance"}, }, @@ -1683,6 +1875,80 @@ func Test_reconcilePDB(t *testing.T) { }, }, + "ignore updating PDB, cause there is nothing to change": { + instance: &v1alpha1.RpaasInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unchanged-instance", + Namespace: "rpaasv2", + }, + Spec: v1alpha1.RpaasInstanceSpec{ + EnablePodDisruptionBudget: func(b bool) *bool { return &b }(true), + Replicas: func(n int32) *int32 { return &n }(10), + Autoscale: &v1alpha1.RpaasInstanceAutoscaleSpec{ + MaxReplicas: int32(100), + MinReplicas: func(n int32) *int32 { return &n }(int32(50)), + }, + }, + }, + nginx: &nginxv1alpha1.Nginx{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unchanged-instance", + Namespace: "rpaasv2", + Labels: map[string]string{ + "rpaas_instance": "unchanged-instance", + "rpaas_service": "", + "rpaas.extensions.tsuru.io/instance-name": "unchanged-instance", + "rpaas.extensions.tsuru.io/service-name": "", + "rpaas.extensions.tsuru.io/plan-name": "", + "rpaas.extensions.tsuru.io/team-owner": "", + }, + }, + Status: nginxv1alpha1.NginxStatus{ + PodSelector: "nginx.tsuru.io/resource-name=unchanged-instance", + }, + }, + expectedChanged: false, + assert: func(t *testing.T, c client.Client) { + var pdb policyv1.PodDisruptionBudget + err := c.Get(context.TODO(), client.ObjectKey{Name: "unchanged-instance", Namespace: "rpaasv2"}, &pdb) + require.NoError(t, err) + assert.Equal(t, policyv1.PodDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "policy/v1", + Kind: "PodDisruptionBudget", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "unchanged-instance", + Namespace: "rpaasv2", + Labels: map[string]string{ + "rpaas.extensions.tsuru.io/instance-name": "unchanged-instance", + "rpaas.extensions.tsuru.io/plan-name": "", + "rpaas.extensions.tsuru.io/service-name": "", + "rpaas.extensions.tsuru.io/team-owner": "", + "rpaas_instance": "unchanged-instance", + "rpaas_service": "", + }, + ResourceVersion: "999", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "extensions.tsuru.io/v1alpha1", + Kind: "RpaasInstance", + Name: "unchanged-instance", + Controller: func(b bool) *bool { return &b }(true), + BlockOwnerDeletion: func(b bool) *bool { return &b }(true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &defaultMaxAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "unchanged-instance"}, + }, + }, + }, pdb) + }, + }, + "removing PDB": { instance: &v1alpha1.RpaasInstance{ ObjectMeta: metav1.ObjectMeta{ @@ -1697,6 +1963,7 @@ func Test_reconcilePDB(t *testing.T) { }, }, }, + expectedChanged: true, nginx: &nginxv1alpha1.Nginx{ ObjectMeta: metav1.ObjectMeta{ Name: "another-instance", @@ -1718,7 +1985,7 @@ func Test_reconcilePDB(t *testing.T) { var pdb policyv1.PodDisruptionBudget err := c.Get(context.TODO(), client.ObjectKey{Name: "another-instance", Namespace: "rpaasv2"}, &pdb) require.Error(t, err) - assert.True(t, k8serrors.IsNotFound(err)) + assert.True(t, k8sErrors.IsNotFound(err)) }, }, @@ -1750,6 +2017,7 @@ func Test_reconcilePDB(t *testing.T) { PodSelector: "nginx.tsuru.io/resource-name=my-instance", }, }, + expectedChanged: true, assert: func(t *testing.T, c client.Client) { var pdb policyv1.PodDisruptionBudget err := c.Get(context.TODO(), client.ObjectKey{Name: "my-instance", Namespace: "rpaasv2"}, &pdb) @@ -1782,7 +2050,7 @@ func Test_reconcilePDB(t *testing.T) { }, }, Spec: policyv1.PodDisruptionBudgetSpec{ - MaxUnavailable: func(n intstr.IntOrString) *intstr.IntOrString { return &n }(intstr.FromString("10%")), + MaxUnavailable: &defaultMaxAvailable, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"nginx.tsuru.io/resource-name": "my-instance"}, }, @@ -1819,11 +2087,12 @@ func Test_reconcilePDB(t *testing.T) { PodSelector: "nginx.tsuru.io/resource-name=my-instance", }, }, + expectedChanged: false, assert: func(t *testing.T, c client.Client) { var pdb policyv1.PodDisruptionBudget err := c.Get(context.TODO(), client.ObjectKey{Name: "my-instance", Namespace: "rpaasv2"}, &pdb) require.Error(t, err) - assert.True(t, k8serrors.IsNotFound(err)) + assert.True(t, k8sErrors.IsNotFound(err)) }, }, @@ -1838,6 +2107,7 @@ func Test_reconcilePDB(t *testing.T) { Replicas: func(n int32) *int32 { return &n }(10), }, }, + expectedChanged: false, nginx: &nginxv1alpha1.Nginx{ ObjectMeta: metav1.ObjectMeta{ Name: "my-instance", @@ -1856,7 +2126,7 @@ func Test_reconcilePDB(t *testing.T) { var pdb policyv1.PodDisruptionBudget err := c.Get(context.TODO(), client.ObjectKey{Name: "my-instance", Namespace: "rpaasv2"}, &pdb) require.Error(t, err) - assert.True(t, k8serrors.IsNotFound(err)) + assert.True(t, k8sErrors.IsNotFound(err)) }, }, } @@ -1866,9 +2136,10 @@ func Test_reconcilePDB(t *testing.T) { require.NotNil(t, tt.assert) r := newRpaasInstanceReconciler(resources...) - err := r.reconcilePDB(context.TODO(), tt.instance, tt.nginx) + changed, err := r.reconcilePDB(context.TODO(), tt.instance, tt.nginx) require.NoError(t, err) tt.assert(t, r.Client) + assert.Equal(t, tt.expectedChanged, changed) }) } } @@ -1980,6 +2251,44 @@ func TestReconcilePoolNamespaced(t *testing.T) { assert.Equal(t, "foobar", nginx.Spec.Service.Labels["tsuru.io/custom-flavor-label"]) } +func TestReconcilePopulateHash(t *testing.T) { + rpaas := &v1alpha1.RpaasInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-instance", + Namespace: "rpaasv2-my-pool", + }, + Spec: v1alpha1.RpaasInstanceSpec{ + PlanName: "my-plan", + PlanNamespace: "default", + }, + } + plan := &v1alpha1.RpaasPlan{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-plan", + Namespace: "default", + }, + Spec: v1alpha1.RpaasPlanSpec{ + Image: "tsuru:pool-namespaces-image:test", + }, + } + + reconciler := newRpaasInstanceReconciler(rpaas, plan) + result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "rpaasv2-my-pool", Name: "my-instance"}}) + require.NoError(t, err) + + assert.Equal(t, result, reconcile.Result{}) + + nginx := &nginxv1alpha1.Nginx{} + err = reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: rpaas.Name, Namespace: rpaas.Namespace}, nginx) + require.NoError(t, err) + + foundRpaas := &v1alpha1.RpaasInstance{} + err = reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: rpaas.Name, Namespace: rpaas.Namespace}, foundRpaas) + require.NoError(t, err) + + assert.Equal(t, "y3ildbchrsps4icpwoer62wky2a65c33cz72sa4bp35fuibegwqa", foundRpaas.Status.RevisionHash) +} + func resourceMustParsePtr(fmt string) *resource.Quantity { qty := resource.MustParse(fmt) return &qty @@ -2012,11 +2321,118 @@ func TestMinutesIntervalToSchedule(t *testing.T) { } func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { + cronjob1 := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-instance" + sessionTicketsCronJobSuffix, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "*/60 * * * *", + SuccessfulJobsHistoryLimit: pointer.Int32(1), + FailedJobsHistoryLimit: pointer.Int32(1), + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "rpaas.extensions.tsuru.io/instance-name": "my-instance", + "rpaas.extensions.tsuru.io/plan-name": "", + "rpaas.extensions.tsuru.io/service-name": "", + "rpaas.extensions.tsuru.io/team-owner": "", + "rpaas_instance": "my-instance", + "rpaas_service": "", + }, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + rotateTLSSessionTicketsScriptFilename: rotateTLSSessionTicketsScript, + }, + Labels: map[string]string{ + "rpaas.extensions.tsuru.io/component": "session-tickets", + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: "rpaas-session-tickets-rotator", + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "session-ticket-rotator", + Image: defaultRotateTLSSessionTicketsImage, + Command: []string{"/bin/bash"}, + Args: []string{rotateTLSSessionTicketsScriptPath}, + Env: []corev1.EnvVar{ + { + Name: "SECRET_NAME", + Value: "my-instance-session-tickets", + }, + { + Name: "SECRET_NAMESPACE", + Value: "default", + }, + { + Name: "SESSION_TICKET_KEY_LENGTH", + Value: "48", + }, + { + Name: "SESSION_TICKET_KEYS", + Value: "2", + }, + { + Name: "NGINX_LABEL_SELECTOR", + Value: "nginx.tsuru.io/app=nginx,nginx.tsuru.io/resource-name=my-instance", + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: rotateTLSSessionTicketsVolumeName, + MountPath: rotateTLSSessionTicketsScriptPath, + SubPath: rotateTLSSessionTicketsScriptFilename, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: rotateTLSSessionTicketsVolumeName, + VolumeSource: corev1.VolumeSource{ + DownwardAPI: &corev1.DownwardAPIVolumeSource{ + Items: []corev1.DownwardAPIVolumeFile{ + { + Path: rotateTLSSessionTicketsScriptFilename, + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", rotateTLSSessionTicketsScriptFilename), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + secret1 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-instance-session-tickets", + Namespace: "default", + }, + Data: map[string][]byte{ + "ticket.0.key": {'h', 'e', 'l', 'l', 'o'}, + "ticket.1.key": {'w', 'o', 'r', 'd', '!'}, + }, + } + tests := []struct { name string instance *v1alpha1.RpaasInstance objects []runtime.Object assert func(t *testing.T, err error, gotSecret *corev1.Secret, gotCronJob *batchv1.CronJob) + + expectedChanged bool }{ { name: "when no TLS session resumption is enabled", @@ -2122,6 +2538,7 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { }, }, gotCronJob.Spec.JobTemplate.Spec.Template) }, + expectedChanged: true, }, { name: "Session Ticket: update key length and rotatation interval", @@ -2172,6 +2589,7 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { assert.Contains(t, gotCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "SESSION_TICKET_KEYS", Value: "4"}) assert.Contains(t, gotCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "NGINX_LABEL_SELECTOR", Value: "nginx.tsuru.io/app=nginx,nginx.tsuru.io/resource-name=my-instance"}) }, + expectedChanged: true, }, { name: "when session ticket is disabled, should remove Secret and CronJob objects", @@ -2203,6 +2621,7 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { assert.Empty(t, gotSecret.Name) assert.Empty(t, gotCronJob.Name) }, + expectedChanged: true, }, { name: "when decreasing the number of keys", @@ -2241,6 +2660,35 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { assert.Equal(t, gotSecret.Data["ticket.0.key"], []byte{'h', 'e', 'l', 'l', 'o'}) assert.Equal(t, gotSecret.Data["ticket.1.key"], []byte{'w', 'o', 'r', 'd', '!'}) }, + expectedChanged: true, + }, + + { + name: "when there is nothing to update", + instance: &v1alpha1.RpaasInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-instance", + Namespace: "default", + }, + Spec: v1alpha1.RpaasInstanceSpec{ + TLSSessionResumption: &v1alpha1.TLSSessionResumption{ + SessionTicket: &v1alpha1.TLSSessionTicket{ + KeepLastKeys: uint32(1), + }, + }, + }, + }, + objects: []runtime.Object{ + cronjob1, + secret1, + }, + assert: func(t *testing.T, err error, gotSecret *corev1.Secret, gotCronJob *batchv1.CronJob) { + require.NoError(t, err) + + assert.Equal(t, cronjob1.Spec, gotCronJob.Spec) + assert.Equal(t, secret1.Data, gotSecret.Data) + }, + expectedChanged: false, }, } @@ -2253,7 +2701,7 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { r := newRpaasInstanceReconciler(resources...) - err := r.reconcileTLSSessionResumption(context.TODO(), tt.instance) + changed, err := r.reconcileTLSSessionResumption(context.TODO(), tt.instance) if tt.assert == nil { require.NoError(t, err) return @@ -2274,6 +2722,8 @@ func TestReconcileRpaasInstance_reconcileTLSSessionResumption(t *testing.T) { r.Client.Get(context.TODO(), cronJobName, &cronJob) tt.assert(t, err, &secret, &cronJob) + + assert.Equal(t, tt.expectedChanged, changed) }) } } diff --git a/controllers/rpaasinstance_controller.go b/controllers/rpaasinstance_controller.go index ee0eb785a..d7b340e9e 100644 --- a/controllers/rpaasinstance_controller.go +++ b/controllers/rpaasinstance_controller.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "reflect" + "time" cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" "github.com/go-logr/logr" @@ -29,8 +30,9 @@ import ( // RpaasInstanceReconciler reconciles a RpaasInstance object type RpaasInstanceReconciler struct { client.Client - Log logr.Logger - EventRecorder record.EventRecorder + Log logr.Logger + SystemRateLimiter SystemRolloutRateLimiter + EventRecorder record.EventRecorder } // +kubebuilder:rbac:groups="",resources=configmaps;secrets;services,verbs=get;list;watch;create;update;delete @@ -53,15 +55,46 @@ type RpaasInstanceReconciler struct { // +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects,verbs=get;list;watch;create;update;delete func (r *RpaasInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + instance, err := r.getRpaasInstance(ctx, req.NamespacedName) if k8sErrors.IsNotFound(err) { return reconcile.Result{}, nil } + logger := r.Log.WithName("Reconcile"). + WithValues("RpaasInstance", types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}) + if err != nil { return reconcile.Result{}, err } + instanceHash, err := generateSpecHash(&instance.Spec) + if err != nil { + return reconcile.Result{}, err + } + + systemRollout := isSystemRollout(instanceHash, instance) + + var rolloutAllowed bool + var reservation SystemRolloutReservation + if systemRollout { + rolloutAllowed, reservation = r.SystemRateLimiter.Reserve() + + if !rolloutAllowed { + logger.Info("modifications of rpaas instance is delayed") + r.EventRecorder.Eventf(instance, corev1.EventTypeWarning, "RpaasInstanceRolloutDelayed", "modifications of rpaas instance is delayed") + + if err = r.setStatusDelayed(ctx, instance); err != nil { + return ctrl.Result{ + Requeue: true, + RequeueAfter: time.Minute * 10, + }, err + } + } + } else { + reservation = NoopReservation() + } + if s := instance.Spec.Suspend; s != nil && *s { r.EventRecorder.Eventf(instance, corev1.EventTypeWarning, "RpaasInstanceSuspended", "no modifications will be done by RPaaS controller") return reconcile.Result{Requeue: true}, nil @@ -125,7 +158,7 @@ func (r *RpaasInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Reques } configMap := newConfigMap(instanceMergedWithFlavors, rendered) - err = r.reconcileConfigMap(ctx, configMap) + configMapChanged, err := r.reconcileConfigMap(ctx, configMap) if err != nil { return reconcile.Result{}, err } @@ -141,31 +174,44 @@ func (r *RpaasInstanceReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } + // Nginx CRD nginx := newNginx(instanceMergedWithFlavors, plan, configMap) - if err = r.reconcileNginx(ctx, instanceMergedWithFlavors, nginx); err != nil { + nginxChanged, err := r.reconcileNginx(ctx, instanceMergedWithFlavors, nginx) + if err != nil { return ctrl.Result{}, err } - if err = r.reconcileTLSSessionResumption(ctx, instanceMergedWithFlavors); err != nil { + // Session Resumption + sessionResumptionChanged, err := r.reconcileTLSSessionResumption(ctx, instanceMergedWithFlavors) + if err != nil { return ctrl.Result{}, err } - if err = r.reconcileHPA(ctx, instanceMergedWithFlavors, nginx); err != nil { + // HPA + hpaChanged, err := r.reconcileHPA(ctx, instanceMergedWithFlavors, nginx) + if err != nil { return ctrl.Result{}, err } - if err = r.reconcilePDB(ctx, instanceMergedWithFlavors, nginx); err != nil { + // PDB + pdbChanged, err := r.reconcilePDB(ctx, instanceMergedWithFlavors, nginx) + if err != nil { return ctrl.Result{}, err } - if err = r.refreshStatus(ctx, instance, nginx); err != nil { + if !configMapChanged && !nginxChanged && !sessionResumptionChanged && !hpaChanged && !pdbChanged { + logger.Info("no changes") + reservation.Cancel() + } + + if err = r.refreshStatus(ctx, instance, instanceHash, nginx); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } -func (r *RpaasInstanceReconciler) refreshStatus(ctx context.Context, instance *v1alpha1.RpaasInstance, newNginx *nginxv1alpha1.Nginx) error { +func (r *RpaasInstanceReconciler) refreshStatus(ctx context.Context, instance *v1alpha1.RpaasInstance, instanceHash string, newNginx *nginxv1alpha1.Nginx) error { existingNginx, err := r.getNginx(ctx, instance) if err != nil && !k8sErrors.IsNotFound(err) { return err @@ -181,6 +227,8 @@ func (r *RpaasInstanceReconciler) refreshStatus(ctx context.Context, instance *v } newStatus := v1alpha1.RpaasInstanceStatus{ + ReconcileDelayed: false, + RevisionHash: instanceHash, ObservedGeneration: instance.Generation, WantedNginxRevisionHash: newHash, ObservedNginxRevisionHash: existingHash, @@ -206,6 +254,20 @@ func (r *RpaasInstanceReconciler) refreshStatus(ctx context.Context, instance *v return nil } +func (r *RpaasInstanceReconciler) setStatusDelayed(ctx context.Context, instance *v1alpha1.RpaasInstance) error { + instance.Status.ReconcileDelayed = true + err := r.Client.Status().Update(ctx, instance) + if err != nil { + return fmt.Errorf("failed to update rpaas instance status: %v", err) + } + + return nil +} + +func isSystemRollout(currentHash string, instance *v1alpha1.RpaasInstance) bool { + return instance.Status.RevisionHash != "" && currentHash == instance.Status.RevisionHash +} + func (r *RpaasInstanceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.RpaasInstance{}). diff --git a/controllers/system_rate_limit.go b/controllers/system_rate_limit.go new file mode 100644 index 000000000..4c8665615 --- /dev/null +++ b/controllers/system_rate_limit.go @@ -0,0 +1,50 @@ +package controllers + +import ( + "time" + + "golang.org/x/time/rate" +) + +type SystemRolloutReservation interface { + Cancel() +} + +type SystemRolloutRateLimiter interface { + Reserve() (allowed bool, reservation SystemRolloutReservation) +} + +var _ SystemRolloutRateLimiter = &systemRolloutRateLimiter{} + +type systemRolloutRateLimiter struct { + rateLimit *rate.Limiter +} + +func NewSystemRolloutRateLimiter(operations int, interval time.Duration) SystemRolloutRateLimiter { + return &systemRolloutRateLimiter{ + rateLimit: rate.NewLimiter(rate.Every(interval), operations), + } +} + +func (r *systemRolloutRateLimiter) Reserve() (allowed bool, reservation SystemRolloutReservation) { + rateLimitReservation := r.rateLimit.Reserve() + + if !rateLimitReservation.OK() { + return false, nil + } + + if rateLimitReservation.Delay() > time.Second { + rateLimitReservation.Cancel() + return false, nil + } + + return true, rateLimitReservation +} + +type noopReservation struct{} + +func (*noopReservation) Cancel() {} + +func NoopReservation() SystemRolloutReservation { + return &noopReservation{} +} diff --git a/main.go b/main.go index acd23fd84..fafd91caf 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,9 @@ type configOpts struct { leaderElectionResourceName string namespace string syncPeriod time.Duration + + systemRateLimitInterval time.Duration + systemRateLimitOperations int } func (o *configOpts) bindFlags(fs *flag.FlagSet) { @@ -45,6 +48,9 @@ func (o *configOpts) bindFlags(fs *flag.FlagSet) { fs.DurationVar(&o.syncPeriod, "sync-period", 10*time.Hour, "The resync period for reconciling manager resources.") fs.StringVar(&o.namespace, "namespace", "", "Limit the observed RpaasInstance resources from specific namespace (empty means all namespaces)") + + fs.DurationVar(&o.systemRateLimitInterval, "system-rate-limit-interval", time.Minute, "interval of rate limit for periodic system reconciles, it is useful to apply new settings on the cluster gradual") + fs.IntVar(&o.systemRateLimitOperations, "system-rate-limit-operations", 1, "number of operations during a interval to perform a rate limit for system reconciles, it is useful to apply new settings on the cluster gradual") } func main() { @@ -73,9 +79,10 @@ func main() { } if err = (&controllers.RpaasInstanceReconciler{ - Client: mgr.GetClient(), - Log: mgr.GetLogger().WithName("controllers").WithName("RpaasInstance"), - EventRecorder: mgr.GetEventRecorderFor("rpaas-operator"), + Client: mgr.GetClient(), + SystemRateLimiter: controllers.NewSystemRolloutRateLimiter(opts.systemRateLimitOperations, opts.systemRateLimitInterval), + Log: mgr.GetLogger().WithName("controllers").WithName("RpaasInstance"), + EventRecorder: mgr.GetEventRecorderFor("rpaas-operator"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RpaasInstance") os.Exit(1)