Skip to content

Commit

Permalink
feat: support lb status sync with agent status
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Jun 16, 2024
1 parent ed10e74 commit aae406d
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,20 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) {
if len(noConflictIPs) == 0 {
return m.Get()
}

for vrid := 0; vrid < VRIDMAXVALUE; vrid++ {
var vrid int
for ; vrid < VRIDMAXVALUE; vrid++ {
if _, ok := m.ipVRIDs[vrid]; !ok {
m.ipVRIDs[vrid] = noConflictIPs
m.ipVRIDs[vrid] = append(m.ipVRIDs[vrid], noConflictIPs...)

for _, ip := range noConflictIPs {
m.ipPools[ip] = vrid
}

return IPVRID{IPs: noConflictIPs, VRID: vrid}, nil
break
}
}

return IPVRID{}, errors.New("no available IPs and VRID combination")
// Get fully vrid-ips pair
return IPVRID{VRID: vrid, IPs: m.ipVRIDs[vrid]}, nil
}

// Release release ips from vrid, if vrid is not assigned, return error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package viploadbalancer
import (
"context"
"fmt"
"sort"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -51,19 +53,26 @@ var (

const (
AnnotationVipLoadBalancerVRID = "service.openyurt.io/vrid"
AnnotationVipLoadBalancerIPS = "service.openyurt.io/desired-vips"
VipLoadBalancerClass = "service.openyurt.io/viplb"
AnnotationServiceTopologyKey = "openyurt.io/topologyKeys"
AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool"
AnnotationNodePoolAddressPools = "openyurt.io/address-pools"
AnnotationServiceVIPAddress = "service.openyurt.io/vip"
AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready"
AnnotationServiceVIPStatusReady = "true"
AnnotationServiceVIPStatusUnReady = "false"
AnnotationServiceVIPStatus = "service.openyurt.io/vip-status"
AnnotationServiceVIPStatusOnline = "online"
AnnotationServiceVIPStatusOffline = "offline"

VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources"
poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs"
)

const (
ServiceVIPUnknown int = iota
ServiceVIPOnline
ServiceVIPOffline
)

func Format(format string, args ...interface{}) string {
s := fmt.Sprintf(format, args...)
return fmt.Sprintf("%s: %s", names.VipLoadBalancerController, s)
Expand Down Expand Up @@ -187,6 +196,11 @@ func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolServ
return err

Check warning on line 196 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L196

Added line #L196 was not covered by tests
}

if err := r.syncPoolServiceStatus(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to sync PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err))
return err

Check warning on line 201 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

return nil
}

Expand Down Expand Up @@ -233,6 +247,50 @@ func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, po
return np.Annotations[AnnotationNodePoolAddressPools], nil
}

func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) error {
klog.V(4).Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))

if !r.checkIfVipServiceOnline(ctx, poolService) {
klog.Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s is not online in the nodepool agent", poolService.Namespace, poolService.Name))
return nil
}

desiredLbStatus, err := r.desiredLbStatus(poolService)
if err != nil {
return fmt.Errorf("failed to calculate desire lb stattus for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err)

Check warning on line 260 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L260

Added line #L260 was not covered by tests
}

poolService.Status.LoadBalancer = desiredLbStatus
if err := r.Update(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to update PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err))
return err

Check warning on line 266 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L265-L266

Added lines #L265 - L266 were not covered by tests
}

return nil
}

func (r *ReconcileVipLoadBalancer) desiredLbStatus(poolService *netv1alpha1.PoolService) (corev1.LoadBalancerStatus, error) {
ips := strings.Split(poolService.Annotations[AnnotationVipLoadBalancerIPS], ",")
if len(ips) == 0 {
// not ready in assign, wait to have next reconclie
klog.Infof(Format("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name))
return corev1.LoadBalancerStatus{}, fmt.Errorf("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name)

Check warning on line 277 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L276-L277

Added lines #L276 - L277 were not covered by tests
}

var lbIngress []corev1.LoadBalancerIngress
for _, ip := range ips {
lbIngress = append(lbIngress, corev1.LoadBalancerIngress{IP: ip})
}

sort.Slice(lbIngress, func(i, j int) bool {
return lbIngress[i].IP < lbIngress[j].IP
})

return corev1.LoadBalancerStatus{
Ingress: lbIngress,
}, nil
}

func (r *ReconcileVipLoadBalancer) getCurrentIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) {
// Get the poolservice list
listSelector := &client.ListOptions{
Expand Down Expand Up @@ -311,8 +369,10 @@ func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolSe
}

ips := []string{}
for _, ip := range poolService.Status.LoadBalancer.Ingress {
ips = append(ips, ip.IP)
if poolService.Status.LoadBalancer.Ingress != nil {
for _, ip := range poolService.Status.LoadBalancer.Ingress {
ips = append(ips, ip.IP)
}
}

ipvrid := NewIPVRID(ips, vrid)
Expand Down Expand Up @@ -370,10 +430,8 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService *
poolService.Annotations = make(map[string]string)
}

for _, ip := range ipvrid.IPs {
poolService.Status.LoadBalancer.Ingress = append(poolService.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip})
}

// add ips and vrid in annotions for status sync
poolService.Annotations[AnnotationVipLoadBalancerIPS] = strings.Join(ipvrid.IPs, ",")
poolService.Annotations[AnnotationVipLoadBalancerVRID] = strconv.Itoa(ipvrid.VRID)

// Update the PoolService
Expand All @@ -395,22 +453,6 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps *
return service, nil
}

func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool {
if poolService.Annotations == nil {
return false
}

if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok {
return false
}

if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady {
return false
}

return true
}

func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) {
klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
poolName := poolService.Labels[network.LabelNodePoolName]
Expand Down Expand Up @@ -443,7 +485,17 @@ func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolServ
}

func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, poolService *netv1alpha1.PoolService) bool {
klog.V(4).Infof(Format("CheckIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
klog.V(4).Infof(Format("checkIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOffline
}

func (r *ReconcileVipLoadBalancer) checkIfVipServiceOnline(ctx context.Context, poolService *netv1alpha1.PoolService) bool {
klog.V(4).Infof(Format("checkIfVipServiceOnline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOnline
}

func (r *ReconcileVipLoadBalancer) getVipServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) int {
klog.V(4).Infof(Format("getVipServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
// Get the reference endpoint
listSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
Expand All @@ -455,11 +507,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context,
endpointSlice := &corev1.EndpointsList{}
if err := r.List(ctx, endpointSlice, listSelector); err != nil {
klog.Errorf(Format("Failed to get Endpoints from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err))
return false
return ServiceVIPUnknown

Check warning on line 510 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L509-L510

Added lines #L509 - L510 were not covered by tests
}

if len(endpointSlice.Items) == 0 {
klog.Errorf(Format("get Endpoints from PoolService %s/%s is empty", poolService.Namespace, poolService.Name))
return ServiceVIPUnknown
}

ready := 0
target := len(endpointSlice.Items) / 2
target := len(endpointSlice.Items)/2 + 1
for _, ep := range endpointSlice.Items {
if ep.Annotations == nil {
continue

Check warning on line 522 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L522

Added line #L522 was not covered by tests
Expand All @@ -469,12 +526,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context,
continue

Check warning on line 526 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L526

Added line #L526 was not covered by tests
}

if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady {
if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusOnline {
ready++
}
}

return ready >= target
if ready < target {
return ServiceVIPOffline
}

return ServiceVIPOnline
}

func (r *ReconcileVipLoadBalancer) addFinalizer(ctx context.Context, poolService *netv1alpha1.PoolService) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ import (
const (
mockServiceName = "test"
mockPoolServiceName = "test-np123"
mockEndpointName = "test-ep"
mockNodePoolLabel = "app=deploy"
mockServiceUid = "c0af506a-7096-4ef9-b39a-eac2feb5c07g"
mockNodePoolUid = "f47dd9db-d3bc-40f3-8d03-7409930b6289"
mockEndpointUid = "f7a5a351-3e33-47a9-bb00-685b357a62e5"
vridLable = "service.openyurt.io/vrid"
)

Expand Down Expand Up @@ -159,8 +161,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) {
np1 := newNodepool("np123", "name=np123,app=deploy")
adressPool := "192.168.0.1-192.168.2.2"
np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool}
endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName)
endpoint.Annotations = map[string]string{
viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline,
}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build()
c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build()

rc := viploadbalancer.ReconcileVipLoadBalancer{
Client: c,
Expand All @@ -185,8 +191,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) {
np1 := newNodepool("np123", "name=np123,app=deploy")
adressPool := "192.168.0.1-192.168.2.2"
np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool}
endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName)
endpoint.Annotations = map[string]string{
viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline,
}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build()
c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build()

rc := viploadbalancer.ReconcileVipLoadBalancer{
Client: c,
Expand All @@ -211,8 +221,12 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) {
np1 := newNodepool("np123", "name=np123,app=deploy")
adressPool := "192.168.0.1-192.168.2.2, 10.1.0.1-10.1.0.2"
np1.Annotations = map[string]string{viploadbalancer.AnnotationNodePoolAddressPools: adressPool}
endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName)
endpoint.Annotations = map[string]string{
viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline,
}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).Build()
c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(poolsvc).WithObjects(endpoint).Build()

rc := viploadbalancer.ReconcileVipLoadBalancer{
Client: c,
Expand Down Expand Up @@ -346,7 +360,7 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) {
t.Fatalf("Failed to get IPVRID: %v", err)
}

ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusReady}
ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOnline}
ps1.DeletionTimestamp = &v1.Time{Time: time.Now()}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build()
Expand Down Expand Up @@ -383,10 +397,15 @@ func TestReconcileVipLoadBalancer_Reconcile(t *testing.T) {
t.Fatalf("Failed to get IPVRID: %v", err)
}

ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusUnReady}
ps1.Annotations = map[string]string{viploadbalancer.AnnotationVipLoadBalancerVRID: strconv.Itoa(ipvrid.VRID), viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOffline}
ps1.DeletionTimestamp = &v1.Time{Time: time.Now()}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).Build()
endpoint := newEndpoint(v1.NamespaceDefault, mockEndpointName)
endpoint.Annotations = map[string]string{
viploadbalancer.AnnotationServiceVIPStatus: viploadbalancer.AnnotationServiceVIPStatusOffline,
}

c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(svc).WithObjects(np1).WithObjects(ps1).WithObjects(endpoint).Build()

rc := viploadbalancer.ReconcileVipLoadBalancer{
Client: c,
Expand Down Expand Up @@ -432,6 +451,23 @@ func assertErrNil(t testing.TB, err error) {
}
}

func newEndpoint(namespace string, name string) *corev1.Endpoints {
return &corev1.Endpoints{
TypeMeta: v1.TypeMeta{
Kind: "Endpoints",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Namespace: namespace,
Name: name,
Labels: map[string]string{
network.LabelServiceName: mockServiceName,
},
UID: mockEndpointUid,
},
}
}

func newService(namespace string, name string) *corev1.Service {
return &corev1.Service{
TypeMeta: v1.TypeMeta{
Expand Down Expand Up @@ -482,7 +518,7 @@ func assertPoolServiceIPAddress(t testing.TB, psl *v1alpha1.PoolServiceList, ips
t.Helper()

for _, ps := range psl.Items {
if ps.Status.LoadBalancer.Ingress == nil {
if ips != nil && ps.Status.LoadBalancer.Ingress == nil {
t.Errorf("expected loadbalancer ingress is not nil, but got %v", ps.Status.LoadBalancer.Ingress)
return
}
Expand Down

0 comments on commit aae406d

Please sign in to comment.