Skip to content

Commit

Permalink
feat: add the servicevipstatus annotation for agent to check the vip …
Browse files Browse the repository at this point in the history
…offline status

Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed May 21, 2024
1 parent 4bcbfa9 commit 9e138a0
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 46 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ replace (
k8s.io/controller-manager => k8s.io/controller-manager v0.28.9
k8s.io/cri-api => k8s.io/cri-api v0.28.9
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.28.9
k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.28.9
k8s.io/endpointslice => k8s.io/endpointslice v0.28.9
k8s.io/klog/v2 => k8s.io/klog/v2 v2.100.1
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.28.9
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.28.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,12 @@ const (
VRIDEVICTED = -1
)

type IPRange struct {
// IPRange: [startIP, endIP)
startIP net.IP
endIP net.IP
}

type IPVRID struct {
IPs []string
VRID int
}

type IPManager struct {
ranges []IPRange
// ipPools indicates if ip is assign
ipPools map[string]int
// ipVRIDs indicates which IPs are assigned to vrid
Expand All @@ -59,15 +52,17 @@ func NewIPManager(ipRanges string) (*IPManager, error) {
ipVRIDs: make(map[int][]string),
}

iprs := parseIP(ipRanges)
iprs := ParseIP(ipRanges)
for _, ipr := range iprs {
manager.ipPools[ipr] = VRIDEVICTED
}

return manager, nil
}

func parseIP(ipr string) []string {
// ParseIP support ipv4 / ipv6 parse, like "192.168.0.1-192.168.0.3", "192.168.0.1, 10.0.1.1", "2001:db8::2-2001:db8::4"
// Output is a list of ip strings, which is left closed and right open, like [192.168.0.1, 192.168.0.2], [192.168.0.1, 10.0.1.1],[2001:db8::2, 2001:db8::3]
func ParseIP(ipr string) []string {
var ips []string
ipRanges := strings.Split(ipr, ",")

Expand Down Expand Up @@ -103,8 +98,8 @@ func incrementIP(ip net.IP) net.IP {
return ip
}

// Get return a IPVRID with a available IP and VRID combination
func (m *IPManager) Get() (IPVRID, error) {

for vrid := 0; vrid < VRIDMAXVALUE; vrid++ {
if ips, ok := m.ipVRIDs[vrid]; !ok || len(ips) == 0 {
for ip, used := range m.ipPools {
Expand All @@ -120,8 +115,8 @@ func (m *IPManager) Get() (IPVRID, error) {
return IPVRID{}, errors.New("no available IP and VRID combination")
}

// Assign assign ips to a vrid, if no available ip, get a new ipvrid from Get()
func (m *IPManager) Assign(ips []string) (IPVRID, error) {

var noConflictIPs []string
for _, ip := range ips {
// if conflict, just use no conflict
Expand Down Expand Up @@ -151,29 +146,27 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) {
return IPVRID{}, errors.New("no available IPs and VRID combination")

Check warning on line 146 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go#L146

Added line #L146 was not covered by tests
}

// Release release ips from vrid, if vrid is not assigned, return error
func (m *IPManager) Release(ipVRID IPVRID) error {

if err := m.IsValid(ipVRID); err != nil {
return err

Check warning on line 152 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go#L152

Added line #L152 was not covered by tests
}
ips := ipVRID.IPs
vrid := ipVRID.VRID

if _, ok := m.ipVRIDs[vrid]; !ok {
return fmt.Errorf("VRID %d does not assign ips", vrid)
if _, ok := m.ipVRIDs[ipVRID.VRID]; !ok {
return fmt.Errorf("VRID %d does not assign ips", ipVRID.VRID)

Check warning on line 156 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/ip_utils.go#L156

Added line #L156 was not covered by tests
}
remain := make([]string, len(m.ipVRIDs[vrid])-len(ips))
remain := make([]string, len(m.ipVRIDs[ipVRID.VRID])-len(ipVRID.IPs))

for _, ip := range m.ipVRIDs[vrid] {
if m.isIPPresent(ip, ips) {
for _, ip := range m.ipVRIDs[ipVRID.VRID] {
if m.isIPPresent(ip, ipVRID.IPs) {
continue
}

remain = append(remain, ip)
}

if len(remain) == len(m.ipVRIDs[vrid]) {
return fmt.Errorf("IP %v is not assigned", ips)
if len(remain) == len(m.ipVRIDs[ipVRID.VRID]) {
return fmt.Errorf("IP %v is not assigned", ipVRID.IPs)
}

for _, ip := range remain {
Expand All @@ -183,27 +176,26 @@ func (m *IPManager) Release(ipVRID IPVRID) error {
return nil
}

// check if ip and vrid is valid in this ip-pools, if not return error
func (m *IPManager) IsValid(ipvrid IPVRID) error {
ips := ipvrid.IPs
vrid := ipvrid.VRID
if len(ips) == 0 {
if len(ipvrid.IPs) == 0 {
return fmt.Errorf("IPs is empty")
}

for _, ip := range ips {
for _, ip := range ipvrid.IPs {
if _, ok := m.ipPools[ip]; !ok {
return fmt.Errorf("IP: %s is not found in IP-Pools", ip)
}
}

if vrid < 0 || vrid >= VRIDMAXVALUE {
return fmt.Errorf("VIRD: %d out of range", vrid)
if ipvrid.VRID < 0 || ipvrid.VRID >= VRIDMAXVALUE {
return fmt.Errorf("VIRD: %d out of range", ipvrid.VRID)
}
return nil
}

// Sync sync ips from vrid, if vrid is not assigned, return error
func (m *IPManager) Sync(ipVRIDs []IPVRID) error {

for _, ipVRID := range ipVRIDs {
if err := m.IsValid(ipVRID); err != nil {
return err
Expand All @@ -227,6 +219,7 @@ func (m *IPManager) Sync(ipVRIDs []IPVRID) error {
return nil
}

// findDiffIPs find the difference between des and cur, return the difference between des and cur
func (m *IPManager) findDiffIPs(des, cur []string) (app, del []string) {
for _, dip := range des {
if exsit := m.isIPPresent(dip, cur); !exsit {
Expand All @@ -243,6 +236,7 @@ func (m *IPManager) findDiffIPs(des, cur []string) (app, del []string) {
return
}

// isIPPresent check if ip is present in ips, if present return true, else return false
func (m *IPManager) isIPPresent(tip string, ips []string) bool {
for _, ip := range ips {
if ip == tip {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package viploadbalancer
import (
"context"
"fmt"
"k8s.io/klog/v2"
"strconv"

"k8s.io/apimachinery/pkg/types"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -57,6 +56,10 @@ const (
AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool"
AnnotationNodePoolAddressPools = "openyurt.io/address-pools"
AnnotationServiceVIPAddress = "service.openyurt.io/vip"
AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready"
AnnotationServiceVIPStatusReady = "true"
AnnotationServiceVIPStatusUnReady = "false"

VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources"
poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs"
)
Expand Down Expand Up @@ -115,7 +118,7 @@ func add(mgr manager.Manager, cfg *appconfig.CompletedConfig, r reconcile.Reconc
}

// Watch for changes to PoolService
err = c.Watch(&source.Kind{Type: &netv1alpha1.PoolService{}}, &handler.EnqueueRequestForObject{}, NewPoolServicePredicated())
err = c.Watch(source.Kind(mgr.GetCache(), &netv1alpha1.PoolService{}), &handler.EnqueueRequestForObject{}, NewPoolServicePredicated())
if err != nil {
return err

Check warning on line 123 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L121-L123

Added lines #L121 - L123 were not covered by tests
}
Expand Down Expand Up @@ -346,15 +349,16 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService *
}

var vips []string
// if specify ip for poolservice
// if specify ip for poolservice annotation, use it as vip
if svc.Annotations != nil {
if vipAddress, ok := svc.Annotations[AnnotationServiceVIPAddress]; ok {
vips = parseIP(vipAddress)
vips = ParseIP(vipAddress)
}
}

ipvrid, err := r.IPManagers[poolName].Assign(vips)
if err != nil {
// if no available ipvrid, return nil, and wait for next reconcile
klog.Errorf(Format("Failed to get a new VRID: %v", err))
r.recorder.Eventf(poolService, corev1.EventTypeWarning, "VRIDExhausted", poolServiceVRIDExhaustedEventMsgFormat,
poolService.Namespace, poolService.Name, poolName)
Expand All @@ -381,6 +385,7 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService *
}

func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps *netv1alpha1.PoolService) (*corev1.Service, error) {
// get the reference service from poolservice
service := &corev1.Service{}
svcName := ps.Labels[network.LabelServiceName]
if err := r.Get(ctx, types.NamespacedName{Name: svcName, Namespace: ps.Namespace}, service); err != nil {
Expand All @@ -390,24 +395,42 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps *
return service, nil
}

func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool {
if poolService.Annotations == nil {
return false

Check warning on line 400 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L400

Added line #L400 was not covered by tests
}

if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok {
return false
}

if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady {
return false
}

return true
}

func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) {
klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
poolName := poolService.Labels[network.LabelNodePoolName]

// Check if the PoolService has a VRID
// Check if the PoolService has a valid IP-VRID
ipvrid, err := r.checkIPVRIDs(*poolService)

if err != nil {
return reconcile.Result{}, nil
}

// Release the VRID
// Release the IP-VRID
r.IPManagers[poolName].Release(*ipvrid)

// remove the finalizer in the PoolService
if err := r.removeFinalizer(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to remove finalizer from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err))
return reconcile.Result{}, err
if canRemoveFinalizer(poolService) {
// remove the finalizer in the PoolService
if err := r.removeFinalizer(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to remove finalizer from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err))
return reconcile.Result{}, err

Check warning on line 432 in pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/loadbalancerset/viploadbalancer/viploadbalancer_controller.go#L431-L432

Added lines #L431 - L432 were not covered by tests
}
}

return reconcile.Result{}, nil
Expand Down
Loading

0 comments on commit 9e138a0

Please sign in to comment.