From 9e138a0d7a7c73ad76ba3f89086eed802710d263 Mon Sep 17 00:00:00 2001 From: wangxye Date: Tue, 21 May 2024 11:24:46 +0800 Subject: [PATCH] feat: add the servicevipstatus annotation for agent to check the vip offline status Signed-off-by: wangxye --- go.mod | 2 + .../viploadbalancer/ip_utils.go | 50 +++--- .../viploadbalancer_controller.go | 47 ++++-- .../viploadbalancer_controller_test.go | 146 +++++++++++++++++- 4 files changed, 199 insertions(+), 46 deletions(-) diff --git a/go.mod b/go.mod index e91b29723a7..89eeeec9826 100644 --- a/go.mod +++ b/go.mod @@ -193,6 +193,8 @@ replace ( k8s.io/controller-manager => k8s.io/controller-manager v0.28.9 k8s.io/cri-api => k8s.io/cri-api v0.28.9 k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.28.9 + k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.28.9 + k8s.io/endpointslice => k8s.io/endpointslice v0.28.9 k8s.io/klog/v2 => k8s.io/klog/v2 v2.100.1 k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.28.9 k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.28.9 diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go index 15743cc9d13..090742ac3d2 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go @@ -27,19 +27,12 @@ const ( VRIDEVICTED = -1 ) -type IPRange struct { - // IPRange: [startIP, endIP) - startIP net.IP - endIP net.IP -} - type IPVRID struct { IPs []string VRID int } type IPManager struct { - ranges []IPRange // ipPools indicates if ip is assign ipPools map[string]int // ipVRIDs indicates which IPs are assigned to vrid @@ -59,7 +52,7 @@ func NewIPManager(ipRanges string) (*IPManager, error) { ipVRIDs: make(map[int][]string), } - iprs := parseIP(ipRanges) + iprs := ParseIP(ipRanges) for _, ipr := range iprs { manager.ipPools[ipr] = VRIDEVICTED } @@ -67,7 +60,9 @@ func NewIPManager(ipRanges string) (*IPManager, error) { return manager, nil } -func parseIP(ipr string) []string { +// ParseIP support ipv4 / ipv6 parse, like "192.168.0.1-192.168.0.3", "192.168.0.1, 10.0.1.1", "2001:db8::2-2001:db8::4" +// Output is a list of ip strings, which is left closed and right open, like [192.168.0.1, 192.168.0.2], [192.168.0.1, 10.0.1.1],[2001:db8::2, 2001:db8::3] +func ParseIP(ipr string) []string { var ips []string ipRanges := strings.Split(ipr, ",") @@ -103,8 +98,8 @@ func incrementIP(ip net.IP) net.IP { return ip } +// Get return a IPVRID with a available IP and VRID combination func (m *IPManager) Get() (IPVRID, error) { - for vrid := 0; vrid < VRIDMAXVALUE; vrid++ { if ips, ok := m.ipVRIDs[vrid]; !ok || len(ips) == 0 { for ip, used := range m.ipPools { @@ -120,8 +115,8 @@ func (m *IPManager) Get() (IPVRID, error) { return IPVRID{}, errors.New("no available IP and VRID combination") } +// Assign assign ips to a vrid, if no available ip, get a new ipvrid from Get() func (m *IPManager) Assign(ips []string) (IPVRID, error) { - var noConflictIPs []string for _, ip := range ips { // if conflict, just use no conflict @@ -151,29 +146,27 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) { return IPVRID{}, errors.New("no available IPs and VRID combination") } +// Release release ips from vrid, if vrid is not assigned, return error func (m *IPManager) Release(ipVRID IPVRID) error { - if err := m.IsValid(ipVRID); err != nil { return err } - ips := ipVRID.IPs - vrid := ipVRID.VRID - if _, ok := m.ipVRIDs[vrid]; !ok { - return fmt.Errorf("VRID %d does not assign ips", vrid) + if _, ok := m.ipVRIDs[ipVRID.VRID]; !ok { + return fmt.Errorf("VRID %d does not assign ips", ipVRID.VRID) } - remain := make([]string, len(m.ipVRIDs[vrid])-len(ips)) + remain := make([]string, len(m.ipVRIDs[ipVRID.VRID])-len(ipVRID.IPs)) - for _, ip := range m.ipVRIDs[vrid] { - if m.isIPPresent(ip, ips) { + for _, ip := range m.ipVRIDs[ipVRID.VRID] { + if m.isIPPresent(ip, ipVRID.IPs) { continue } remain = append(remain, ip) } - if len(remain) == len(m.ipVRIDs[vrid]) { - return fmt.Errorf("IP %v is not assigned", ips) + if len(remain) == len(m.ipVRIDs[ipVRID.VRID]) { + return fmt.Errorf("IP %v is not assigned", ipVRID.IPs) } for _, ip := range remain { @@ -183,27 +176,26 @@ func (m *IPManager) Release(ipVRID IPVRID) error { return nil } +// check if ip and vrid is valid in this ip-pools, if not return error func (m *IPManager) IsValid(ipvrid IPVRID) error { - ips := ipvrid.IPs - vrid := ipvrid.VRID - if len(ips) == 0 { + if len(ipvrid.IPs) == 0 { return fmt.Errorf("IPs is empty") } - for _, ip := range ips { + for _, ip := range ipvrid.IPs { if _, ok := m.ipPools[ip]; !ok { return fmt.Errorf("IP: %s is not found in IP-Pools", ip) } } - if vrid < 0 || vrid >= VRIDMAXVALUE { - return fmt.Errorf("VIRD: %d out of range", vrid) + if ipvrid.VRID < 0 || ipvrid.VRID >= VRIDMAXVALUE { + return fmt.Errorf("VIRD: %d out of range", ipvrid.VRID) } return nil } +// Sync sync ips from vrid, if vrid is not assigned, return error func (m *IPManager) Sync(ipVRIDs []IPVRID) error { - for _, ipVRID := range ipVRIDs { if err := m.IsValid(ipVRID); err != nil { return err @@ -227,6 +219,7 @@ func (m *IPManager) Sync(ipVRIDs []IPVRID) error { return nil } +// findDiffIPs find the difference between des and cur, return the difference between des and cur func (m *IPManager) findDiffIPs(des, cur []string) (app, del []string) { for _, dip := range des { if exsit := m.isIPPresent(dip, cur); !exsit { @@ -243,6 +236,7 @@ func (m *IPManager) findDiffIPs(des, cur []string) (app, del []string) { return } +// isIPPresent check if ip is present in ips, if present return true, else return false func (m *IPManager) isIPPresent(tip string, ips []string) bool { for _, ip := range ips { if ip == tip { diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go index 2dc2b0f9f4d..408c0eaf8af 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go @@ -19,17 +19,16 @@ package viploadbalancer import ( "context" "fmt" + "k8s.io/klog/v2" "strconv" - "k8s.io/apimachinery/pkg/types" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -57,6 +56,10 @@ const ( AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool" AnnotationNodePoolAddressPools = "openyurt.io/address-pools" AnnotationServiceVIPAddress = "service.openyurt.io/vip" + AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready" + AnnotationServiceVIPStatusReady = "true" + AnnotationServiceVIPStatusUnReady = "false" + VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources" poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs" ) @@ -115,7 +118,7 @@ func add(mgr manager.Manager, cfg *appconfig.CompletedConfig, r reconcile.Reconc } // Watch for changes to PoolService - err = c.Watch(&source.Kind{Type: &netv1alpha1.PoolService{}}, &handler.EnqueueRequestForObject{}, NewPoolServicePredicated()) + err = c.Watch(source.Kind(mgr.GetCache(), &netv1alpha1.PoolService{}), &handler.EnqueueRequestForObject{}, NewPoolServicePredicated()) if err != nil { return err } @@ -346,15 +349,16 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService * } var vips []string - // if specify ip for poolservice + // if specify ip for poolservice annotation, use it as vip if svc.Annotations != nil { if vipAddress, ok := svc.Annotations[AnnotationServiceVIPAddress]; ok { - vips = parseIP(vipAddress) + vips = ParseIP(vipAddress) } } ipvrid, err := r.IPManagers[poolName].Assign(vips) if err != nil { + // if no available ipvrid, return nil, and wait for next reconcile klog.Errorf(Format("Failed to get a new VRID: %v", err)) r.recorder.Eventf(poolService, corev1.EventTypeWarning, "VRIDExhausted", poolServiceVRIDExhaustedEventMsgFormat, poolService.Namespace, poolService.Name, poolName) @@ -381,6 +385,7 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService * } func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps *netv1alpha1.PoolService) (*corev1.Service, error) { + // get the reference service from poolservice service := &corev1.Service{} svcName := ps.Labels[network.LabelServiceName] if err := r.Get(ctx, types.NamespacedName{Name: svcName, Namespace: ps.Namespace}, service); err != nil { @@ -390,24 +395,42 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps * return service, nil } +func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool { + if poolService.Annotations == nil { + return false + } + + if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok { + return false + } + + if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady { + return false + } + + return true +} + func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) { klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name)) poolName := poolService.Labels[network.LabelNodePoolName] - // Check if the PoolService has a VRID + // Check if the PoolService has a valid IP-VRID ipvrid, err := r.checkIPVRIDs(*poolService) if err != nil { return reconcile.Result{}, nil } - // Release the VRID + // Release the IP-VRID r.IPManagers[poolName].Release(*ipvrid) - // remove the finalizer in the PoolService - if err := r.removeFinalizer(ctx, poolService); err != nil { - klog.Errorf(Format("Failed to remove finalizer from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err)) - return reconcile.Result{}, err + if canRemoveFinalizer(poolService) { + // remove the finalizer in the PoolService + if err := r.removeFinalizer(ctx, poolService); err != nil { + klog.Errorf(Format("Failed to remove finalizer from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err)) + return reconcile.Result{}, err + } } return reconcile.Result{}, nil diff --git a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go index 70ac91ca247..75ca8cd8182 100644 --- a/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go +++ b/pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller_test.go @@ -18,6 +18,7 @@ package viploadbalancer_test import ( "context" + "fmt" "reflect" "sort" "strconv" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/strings/slices" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -119,7 +121,7 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { assertPoolServiceLabels(t, psl, svc.Name) assertPoolServiceFinalizer(t, psl) assertPoolServiceVRIDLabels(t, psl, "0") - assertPoolServiceIPAddress(t, psl, "") + assertPoolServiceIPAddress(t, psl, nil) }) t.Run("test update pool services with invalid vrid", func(t *testing.T) { @@ -171,13 +173,66 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { assertErrNil(t, err) assertPoolServiceVRIDLabels(t, psl, "0") - assertPoolServiceIPAddress(t, psl, "192.168.1.1") + assertPoolServiceIPAddress(t, psl, []string{"192.168.1.1"}) + }) + + t.Run("test update pool services with specify ip range", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svcIPRange := "192.168.1.1-192.168.1.3" + expectIPs := viploadbalancer.ParseIP(svcIPRange) + svc.Annotations = map[string]string{viploadbalancer.AnnotationServiceVIPAddress: svcIPRange} + poolsvc := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + np1 := newNodepool("np123", "name=np123,app=deploy") + adressPool := "192.168.0.1-192.168.2.2" + np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool} + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build() + + rc := viploadbalancer.ReconcileVipLoadBalancer{ + Client: c, + IPManagers: map[string]*viploadbalancer.IPManager{}, + } + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockPoolServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServiceVRIDLabels(t, psl, "0") + assertPoolServiceIPAddress(t, psl, expectIPs) + }) + + t.Run("test update pool services with specify ip mix", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + svcIPRange := "192.168.1.1-192.168.1.3, 10.1.0.1" + exceptIPs := viploadbalancer.ParseIP(svcIPRange) + svc.Annotations = map[string]string{viploadbalancer.AnnotationServiceVIPAddress: svcIPRange} + poolsvc := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + np1 := newNodepool("np123", "name=np123,app=deploy") + adressPool := "192.168.0.1-192.168.2.2, 10.1.0.1-10.1.0.2" + np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool} + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build() + + rc := viploadbalancer.ReconcileVipLoadBalancer{ + Client: c, + IPManagers: map[string]*viploadbalancer.IPManager{}, + } + _, err := rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockPoolServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServiceVRIDLabels(t, psl, "0") + assertPoolServiceIPAddress(t, psl, exceptIPs) }) t.Run("test delete pool services with invalid vrid", func(t *testing.T) { svc := newService(v1.NamespaceDefault, mockServiceName) np1 := newNodepool("np123", "name=np123,app=deploy") ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps1.Finalizers = []string{viploadbalancer.VipLoadBalancerFinalizer} ipManage, err := viploadbalancer.NewIPManager("192.168.0.1-192.168.1.1") if err != nil { @@ -210,6 +265,7 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { svc := newService(v1.NamespaceDefault, mockServiceName) np1 := newNodepool("np123", "name=np123,app=deploy") ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps1.Finalizers = []string{viploadbalancer.VipLoadBalancerFinalizer} ipManage, err := viploadbalancer.NewIPManager("192.168.0.1-192.168.1.1") if err != nil { @@ -237,10 +293,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { assertPoolServiceFinalizer(t, psl) }) - t.Run("test delete pool services with delete time not zero", func(t *testing.T) { + t.Run("test delete pool services with delete time not zero and vip empty", func(t *testing.T) { svc := newService(v1.NamespaceDefault, mockServiceName) np1 := newNodepool("np123", "name=np123,app=deploy") ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps1.Finalizers = []string{viploadbalancer.VipLoadBalancerFinalizer} + ps1.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{IP: "192.168.0.1"}} ipManage, err := viploadbalancer.NewIPManager("192.168.0.1-192.168.1.1") if err != nil { @@ -268,6 +326,80 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) { psl := &v1alpha1.PoolServiceList{} c.List(context.Background(), psl) + assertErrNil(t, err) + assertPoolServiceFinalizer(t, psl) + }) + + t.Run("test delete pool services with delete time not zero and vip ready", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np1 := newNodepool("np123", "name=np123,app=deploy") + ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps1.Finalizers = []string{viploadbalancer.VipLoadBalancerFinalizer} + ps1.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{IP: "192.168.0.1"}} + + ipManage, err := viploadbalancer.NewIPManager("192.168.0.1-192.168.1.1") + if err != nil { + t.Fatalf("Failed to create IPManager: %v", err) + } + ipvrid, err := ipManage.Get() + if err != nil { + t.Fatalf("Failed to get IPVRID: %v", err) + } + + ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusReady} + ps1.DeletionTimestamp = &v1.Time{Time: time.Now()} + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build() + + rc := viploadbalancer.ReconcileVipLoadBalancer{ + Client: c, + IPManagers: map[string]*viploadbalancer.IPManager{ + "np123": ipManage, + }, + } + + _, err = rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockPoolServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + + assertErrNil(t, err) + assertPoolServiceFinalizer(t, psl) + }) + + t.Run("test delete pool services with delete time not zero and vip unready", func(t *testing.T) { + svc := newService(v1.NamespaceDefault, mockServiceName) + np1 := newNodepool("np123", "name=np123,app=deploy") + ps1 := newPoolService(v1.NamespaceDefault, "np123", nil, nil) + ps1.Finalizers = []string{viploadbalancer.VipLoadBalancerFinalizer} + ps1.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{IP: "192.168.0.1"}} + + ipManage, err := viploadbalancer.NewIPManager("192.168.0.1-192.168.1.1") + if err != nil { + t.Fatalf("Failed to create IPManager: %v", err) + } + ipvrid, err := ipManage.Get() + if err != nil { + t.Fatalf("Failed to get IPVRID: %v", err) + } + + ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusUnReady} + ps1.DeletionTimestamp = &v1.Time{Time: time.Now()} + + c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build() + + rc := viploadbalancer.ReconcileVipLoadBalancer{ + Client: c, + IPManagers: map[string]*viploadbalancer.IPManager{ + "np123": ipManage, + }, + } + + _, err = rc.Reconcile(context.Background(), newReconcileRequest(v1.NamespaceDefault, mockPoolServiceName)) + + psl := &v1alpha1.PoolServiceList{} + c.List(context.Background(), psl) + assertErrNil(t, err) assertPoolServiceFinalizerNil(t, psl) }) @@ -346,7 +478,7 @@ func newNodepool(name string, labelStr string) *v1beta1.NodePool { } } -func assertPoolServiceIPAddress(t testing.TB, psl *v1alpha1.PoolServiceList, ip string) { +func assertPoolServiceIPAddress(t testing.TB, psl *v1alpha1.PoolServiceList, ips []string) { t.Helper() for _, ps := range psl.Items { @@ -360,9 +492,11 @@ func assertPoolServiceIPAddress(t testing.TB, psl *v1alpha1.PoolServiceList, ip t.Errorf("expected loadbalancer ingress IP is not empty, but got %s", lbIngress.IP) } - if ip != "" && lbIngress.IP != ip { - t.Errorf("expected loadbalancer ingress IP is %s, but got %s", ip, lbIngress.IP) + if len(ips) != 0 && !slices.Contains(ips, lbIngress.IP) { + t.Errorf("excepted IPs are %v, but got %s", ips, lbIngress.IP) } + + fmt.Printf("IP: %s\n", lbIngress.IP) } }