diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go index 090742ac3d2..0e7dd87bce2 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go @@ -130,20 +130,20 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) { if len(noConflictIPs) == 0 { return m.Get() } - - for vrid := 0; vrid < VRIDMAXVALUE; vrid++ { + var vrid int + for ; vrid < VRIDMAXVALUE; vrid++ { if _, ok := m.ipVRIDs[vrid]; !ok { - m.ipVRIDs[vrid] = noConflictIPs + m.ipVRIDs[vrid] = append(m.ipVRIDs[vrid], noConflictIPs...) for _, ip := range noConflictIPs { m.ipPools[ip] = vrid } - - return IPVRID{IPs: noConflictIPs, VRID: vrid}, nil + break } } - return IPVRID{}, errors.New("no available IPs and VRID combination") + // Get fully vrid-ips pair + return IPVRID{VRID: vrid, IPs: m.ipVRIDs[vrid]}, nil } // Release release ips from vrid, if vrid is not assigned, return error diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go index a5b78523c6f..467610ceeb8 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go @@ -19,7 +19,9 @@ package viploadbalancer import ( "context" "fmt" + "sort" "strconv" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -51,19 +53,26 @@ var ( const ( AnnotationVipLoadBalancerVRID = "service.openyurt.io/vrid" + AnnotationVipLoadBalancerIPS = "service.openyurt.io/desired-vips" VipLoadBalancerClass = "service.openyurt.io/viplb" AnnotationServiceTopologyKey = "openyurt.io/topologyKeys" AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool" AnnotationNodePoolAddressPools = "openyurt.io/address-pools" AnnotationServiceVIPAddress = "service.openyurt.io/vip" - AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready" - AnnotationServiceVIPStatusReady = "true" - AnnotationServiceVIPStatusUnReady = "false" + AnnotationServiceVIPStatus = "service.openyurt.io/vip-status" + AnnotationServiceVIPStatusOnline = "online" + AnnotationServiceVIPStatusOffline = "offline" VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources" poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs" ) +const ( + ServiceVIPUnknown int = iota + ServiceVIPOnline + ServiceVIPOffline +) + func Format(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) return fmt.Sprintf("%s: %s", names.VipLoadBalancerController, s) @@ -187,6 +196,11 @@ func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolServ return err } + if err := r.syncPoolServiceStatus(ctx, poolService); err != nil { + klog.Errorf(Format("Failed to sync PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err)) + return err + } + return nil } @@ -233,6 +247,50 @@ func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, po return np.Annotations[AnnotationNodePoolAddressPools], nil } +func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) error { + klog.V(4).Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + + if !r.checkIfVipServiceOnline(ctx, poolService) { + klog.Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s is not online in the nodepool agent", poolService.Namespace, poolService.Name)) + return nil + } + + desiredLbStatus, err := r.desiredLbStatus(poolService) + if err != nil { + return fmt.Errorf("failed to calculate desire lb stattus for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err) + } + + poolService.Status.LoadBalancer = desiredLbStatus + if err := r.Update(ctx, poolService); err != nil { + klog.Errorf(Format("Failed to update PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err)) + return err + } + + return nil +} + +func (r *ReconcileVipLoadBalancer) desiredLbStatus(poolService *netv1alpha1.PoolService) (corev1.LoadBalancerStatus, error) { + ips := strings.Split(poolService.Annotations[AnnotationVipLoadBalancerIPS], ",") + if len(ips) == 0 { + // not ready in assign, wait to have next reconclie + klog.Infof(Format("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name)) + return corev1.LoadBalancerStatus{}, fmt.Errorf("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name) + } + + var lbIngress []corev1.LoadBalancerIngress + for _, ip := range ips { + lbIngress = append(lbIngress, corev1.LoadBalancerIngress{IP: ip}) + } + + sort.Slice(lbIngress, func(i, j int) bool { + return lbIngress[i].IP < lbIngress[j].IP + }) + + return corev1.LoadBalancerStatus{ + Ingress: lbIngress, + }, nil +} + func (r *ReconcileVipLoadBalancer) getCurrentIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) { // Get the poolservice list listSelector := &client.ListOptions{ @@ -311,8 +369,10 @@ func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolSe } ips := []string{} - for _, ip := range poolService.Status.LoadBalancer.Ingress { - ips = append(ips, ip.IP) + if poolService.Status.LoadBalancer.Ingress != nil { + for _, ip := range poolService.Status.LoadBalancer.Ingress { + ips = append(ips, ip.IP) + } } ipvrid := NewIPVRID(ips, vrid) @@ -370,10 +430,8 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService * poolService.Annotations = make(map[string]string) } - for _, ip := range ipvrid.IPs { - poolService.Status.LoadBalancer.Ingress = append(poolService.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip}) - } - + // add ips and vrid in annotions for status sync + poolService.Annotations[AnnotationVipLoadBalancerIPS] = strings.Join(ipvrid.IPs, ",") poolService.Annotations[AnnotationVipLoadBalancerVRID] = strconv.Itoa(ipvrid.VRID) // Update the PoolService @@ -395,22 +453,6 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps * return service, nil } -func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool { - if poolService.Annotations == nil { - return false - } - - if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok { - return false - } - - if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady { - return false - } - - return true -} - func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) { klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) poolName := poolService.Labels[network.LabelNodePoolName] @@ -443,7 +485,17 @@ func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolServ } func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, poolService *netv1alpha1.PoolService) bool { - klog.V(4).Infof(Format("CheckIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + klog.V(4).Infof(Format("checkIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOffline +} + +func (r *ReconcileVipLoadBalancer) checkIfVipServiceOnline(ctx context.Context, poolService *netv1alpha1.PoolService) bool { + klog.V(4).Infof(Format("checkIfVipServiceOnline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) + return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOnline +} + +func (r *ReconcileVipLoadBalancer) getVipServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) int { + klog.V(4).Infof(Format("getVipServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) // Get the reference endpoint listSelector := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -455,11 +507,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, endpointSlice := &corev1.EndpointsList{} if err := r.List(ctx, endpointSlice, listSelector); err != nil { klog.Errorf(Format("Failed to get Endpoints from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err)) - return false + return ServiceVIPUnknown + } + + if len(endpointSlice.Items) == 0 { + klog.Errorf(Format("get Endpoints from PoolService %s/%s is empty", poolService.Namespace, poolService.Name)) + return ServiceVIPUnknown } ready := 0 - target := len(endpointSlice.Items) / 2 + target := len(endpointSlice.Items)/2 + 1 for _, ep := range endpointSlice.Items { if ep.Annotations == nil { continue @@ -469,12 +526,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, continue } - if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady { + if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusOnline { ready++ } } - return ready >= target + if ready < target { + return ServiceVIPOffline + } + + return ServiceVIPOnline } func (r *ReconcileVipLoadBalancer) addFinalizer(ctx context.Context, poolService *netv1alpha1.PoolService) error { diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go index 75ca8cd8182..438cc47697b 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go @@ -45,9 +45,11 @@ import ( const ( mockServiceName = "test" mockPoolServiceName = "test-np123" + mockEndpointName = "test-ep" mockNodePoolLabel = "app=deploy" mockServiceUid = "c0af506a-7096-4ef9-b39a-eac2feb5c07g" mockNodePoolUid = "f47dd9db-d3bc-40f3-8d03-7409930b6289" + mockEndpointUid = "f7a5a351-3e33-47a9-bb00-685b357a62e5" vridLable = "service.openyurt.io/vrid" ) @@ -159,8 +161,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { np1 := newNodepool("np123", "name=np123,app=deploy") adressPool := "192.168.0.1-192.168.2.2" np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool} + endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName) + endpoint.Annotations = map[string]string{ + viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline, + } - c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build() + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build() rc := viploadbalancer.ReconcileVipLoadBalancer{ Client: c, @@ -185,8 +191,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { np1 := newNodepool("np123", "name=np123,app=deploy") adressPool := "192.168.0.1-192.168.2.2" np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool} + endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName) + endpoint.Annotations = map[string]string{ + viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline, + } - c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build() + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build() rc := viploadbalancer.ReconcileVipLoadBalancer{ Client: c, @@ -211,8 +221,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { np1 := newNodepool("np123", "name=np123,app=deploy") adressPool := "192.168.0.1-192.168.2.2, 10.1.0.1-10.1.0.2" np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool} + endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName) + endpoint.Annotations = map[string]string{ + viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline, + } - c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build() + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build() rc := viploadbalancer.ReconcileVipLoadBalancer{ Client: c, @@ -346,7 +360,7 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { t.Fatalf("Failed to get IPVRID: %v", err) } - ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusReady} + ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline} ps1.DeletionTimestamp = &v1.Time{Time: time.Now()} c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build() @@ -383,10 +397,15 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { t.Fatalf("Failed to get IPVRID: %v", err) } - ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusUnReady} + ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOffline} ps1.DeletionTimestamp = &v1.Time{Time: time.Now()} - c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build() + endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName) + endpoint.Annotations = map[string]string{ + viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOffline, + } + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).WithObjects(endpoint).Build() rc := viploadbalancer.ReconcileVipLoadBalancer{ Client: c, @@ -432,6 +451,23 @@ func assertErrNil(t testing.TB, err error) { } } +func newEndpoint(namespace string, name string) *corev1.Endpoints { + return &corev1.Endpoints{ + TypeMeta: v1.TypeMeta{ + Kind: "Endpoints", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: name, + Labels: map[string]string{ + network.LabelServiceName: mockServiceName, + }, + UID: mockEndpointUid, + }, + } +} + func newService(namespace string, name string) *corev1.Service { return &corev1.Service{ TypeMeta: v1.TypeMeta{ @@ -482,7 +518,7 @@ func assertPoolServiceIPAddress(t testing.TB, psl *v1alpha1.PoolServiceList, ips t.Helper() for _, ps := range psl.Items { - if ps.Status.LoadBalancer.Ingress == nil { + if ips != nil && ps.Status.LoadBalancer.Ingress == nil { t.Errorf("expected loadbalancer ingress is not nil, but got %v", ps.Status.LoadBalancer.Ingress) return }