From a904f693b4ef5ec853fbdbb0a4d4d2e241bc9ac0 Mon Sep 17 00:00:00 2001 From: wangxye Date: Sun, 16 Jun 2024 22:54:56 +0800 Subject: [PATCH] feat: support lb status sync with agent status Signed-off-by: wangxye --- .../viploadbalancer/ip_utils.go | 10 +- .../viploadbalancer_controller.go | 116 +++++++++++++----- 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go index 090742ac3d2..cb5e356795c 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go @@ -130,20 +130,20 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) { if len(noConflictIPs) == 0 { return m.Get() } - - for vrid := 0; vrid < VRIDMAXVALUE; vrid++ { + var vrid int + for ; vrid < VRIDMAXVALUE; vrid++ { if _, ok := m.ipVRIDs[vrid]; !ok { - m.ipVRIDs[vrid] = noConflictIPs + m.ipVRIDs[vrid] = append(m.ipVRIDs[vrid], noConflictIPs...) for _, ip := range noConflictIPs { m.ipPools[ip] = vrid } - return IPVRID{IPs: noConflictIPs, VRID: vrid}, nil } } - return IPVRID{}, errors.New("no available IPs and VRID combination") + // Get fully vrid-ips pair + return IPVRID{VRID: vrid, IPs: m.ipVRIDs[vrid]}, nil } // Release release ips from vrid, if vrid is not assigned, return error diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go index a5b78523c6f..2aee6dce7d5 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go @@ -19,7 +19,9 @@ package viploadbalancer import ( "context" "fmt" + "sort" "strconv" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -51,19 +53,26 @@ var ( const ( AnnotationVipLoadBalancerVRID = "service.openyurt.io/vrid" + AnnotationVipLoadBalancerIPS = "service.openyurt.io/desired-vips" VipLoadBalancerClass = "service.openyurt.io/viplb" AnnotationServiceTopologyKey = "openyurt.io/topologyKeys" AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool" AnnotationNodePoolAddressPools = "openyurt.io/address-pools" AnnotationServiceVIPAddress = "service.openyurt.io/vip" - AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready" - AnnotationServiceVIPStatusReady = "true" - AnnotationServiceVIPStatusUnReady = "false" + AnnotationServiceVIPStatus = "service.openyurt.io/vip-status" + AnnotationServiceVIPStatusOnline = "online" + AnnotationServiceVIPStatusOffline = "offline" VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources" poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs" ) +const ( + ServiceVIPUnknown int = iota + ServiceVIPOnline + ServiceVIPOffline +) + func Format(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) return fmt.Sprintf("%s: %s", names.VipLoadBalancerController, s) @@ -187,6 +196,11 @@ func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolServ return err } + if err := r.syncPoolServiceStatus(ctx, poolService); err != nil { + klog.Errorf(Format("Failed to sync PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err)) + return err + } + return nil } @@ -233,6 +247,50 @@ func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, po return np.Annotations[AnnotationNodePoolAddressPools], nil } +func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) error { + klog.V(4).Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + + if !r.checkIfVipServiceOnline(ctx, poolService) { + klog.Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s is not online in the nodepool agent", poolService.Namespace, poolService.Name)) + return nil + } + + desiredLbStatus, err := r.desiredLbStatus(poolService) + if err != nil { + return fmt.Errorf("failed to calculate desire lb stattus for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err) + } + + poolService.Status.LoadBalancer = desiredLbStatus + if err := r.Status().Update(ctx, poolService); err != nil { + klog.Errorf(Format("Failed to update PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err)) + return err + } + + return nil +} + +func (r *ReconcileVipLoadBalancer) desiredLbStatus(poolService *netv1alpha1.PoolService) (corev1.LoadBalancerStatus, error) { + ips := strings.Split(poolService.Annotations[AnnotationVipLoadBalancerIPS], ",") + if len(ips) == 0 { + // not ready in assign, wait to have next reconclie + klog.Infof(Format("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name)) + return corev1.LoadBalancerStatus{}, fmt.Errorf("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name) + } + + var lbIngress []corev1.LoadBalancerIngress + for _, ip := range ips { + lbIngress = append(lbIngress, corev1.LoadBalancerIngress{IP: ip}) + } + + sort.Slice(lbIngress, func(i, j int) bool { + return lbIngress[i].IP < lbIngress[j].IP + }) + + return corev1.LoadBalancerStatus{ + Ingress: lbIngress, + }, nil +} + func (r *ReconcileVipLoadBalancer) getCurrentIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) { // Get the poolservice list listSelector := &client.ListOptions{ @@ -311,8 +369,10 @@ func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolSe } ips := []string{} - for _, ip := range poolService.Status.LoadBalancer.Ingress { - ips = append(ips, ip.IP) + if poolService.Status.LoadBalancer.Ingress != nil { + for _, ip := range poolService.Status.LoadBalancer.Ingress { + ips = append(ips, ip.IP) + } } ipvrid := NewIPVRID(ips, vrid) @@ -370,10 +430,8 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService * poolService.Annotations = make(map[string]string) } - for _, ip := range ipvrid.IPs { - poolService.Status.LoadBalancer.Ingress = append(poolService.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip}) - } - + // add ips and vrid in annotions for status sync + poolService.Annotations[AnnotationVipLoadBalancerIPS] = strings.Join(ipvrid.IPs, ",") poolService.Annotations[AnnotationVipLoadBalancerVRID] = strconv.Itoa(ipvrid.VRID) // Update the PoolService @@ -395,22 +453,6 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps * return service, nil } -func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool { - if poolService.Annotations == nil { - return false - } - - if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok { - return false - } - - if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady { - return false - } - - return true -} - func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) { klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) poolName := poolService.Labels[network.LabelNodePoolName] @@ -443,7 +485,17 @@ func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolServ } func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, poolService *netv1alpha1.PoolService) bool { - klog.V(4).Infof(Format("CheckIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + klog.V(4).Infof(Format("checkIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOffline +} + +func (r *ReconcileVipLoadBalancer) checkIfVipServiceOnline(ctx context.Context, poolService *netv1alpha1.PoolService) bool { + klog.V(4).Infof(Format("checkIfVipServiceOnline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOnline +} + +func (r *ReconcileVipLoadBalancer) getVipServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) int { + klog.V(4).Infof(Format("getVipServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) // Get the reference endpoint listSelector := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -455,11 +507,11 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, endpointSlice := &corev1.EndpointsList{} if err := r.List(ctx, endpointSlice, listSelector); err != nil { klog.Errorf(Format("Failed to get Endpoints from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err)) - return false + return ServiceVIPUnknown } ready := 0 - target := len(endpointSlice.Items) / 2 + target := len(endpointSlice.Items)/2 + 1 for _, ep := range endpointSlice.Items { if ep.Annotations == nil { continue @@ -469,12 +521,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, continue } - if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady { + if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusOnline { ready++ } } - return ready >= target + if ready < target { + return ServiceVIPOffline + } + + return ServiceVIPOnline } func (r *ReconcileVipLoadBalancer) addFinalizer(ctx context.Context, poolService *netv1alpha1.PoolService) error {