Skip to content

Commit

Permalink
support a loadbalancer controller framework
Browse files Browse the repository at this point in the history
  • Loading branch information
珩轩 committed Jan 30, 2024
1 parent 612864a commit 0fe650a
Show file tree
Hide file tree
Showing 9 changed files with 863 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package loadbalancer

const (
VIPClass = "openyurt.io/vip"
MockClass = "openyurt.io/mock"
UnknownClass = "openyurt.io/unknown"
)

const (
name = "loadblancer"
LBFinalizer = "service.openyurt.io/loadbalancer"
FailedAddFinalizer = "FailedAddFinalizer"
FailedRemoveFinalizer = "FailedRemoveFinalizer"
)
74 changes: 74 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/driver/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package driver

Check failure on line 1 in pkg/yurtmanager/controller/loadbalancer/driver/driver.go

View workflow job for this annotation

GitHub Actions / golangci-lint

: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver] (typecheck)

import (
"fmt"

"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock"

appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"sigs.k8s.io/controller-runtime/pkg/predicate"

reqCtx "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/sharedcontext"
)

type ActionType string

const (
Create = ActionType("Create")
Update = ActionType("Update")
Delete = ActionType("Delete")
)

type AccessPoint struct {
Name string
Address string
}

type Configuration struct {
PoolName string
Action ActionType
AccessPoints []AccessPoint
ErrorInfo error
Config map[string]string
}

type Predication interface {
Service() predicate.Predicate
EndpointSlice() predicate.Predicate
Node() predicate.Predicate
NodePool() predicate.Predicate
}

type Driver interface {
Predication
Init() error
Apply(reqCtx *reqCtx.RequestContext) (map[string]*Configuration, error)
Cleanup(reqCtx *reqCtx.RequestContext) (map[string]*Configuration, error)
}

type Drivers map[string]Driver

func NewDrivers(cfg *appconfig.CompletedConfig, mgr manager.Manager) (Drivers, error) {
drivers := make(map[string]Driver, 0)
err := Register(drivers, cfg, mgr, mock.MockClass, mock.NewMockLoadBalancer)
if err != nil {
return nil, err
}
return drivers, nil
}

type NewDriverFunc func(cfg *appconfig.CompletedConfig, mgr manager.Manager) (Driver, error)

func Register(drivers Drivers, cfg *appconfig.CompletedConfig, mgr manager.Manager, name string, fn NewDriverFunc) error {
if cfg == nil {
return fmt.Errorf("config is empty")
}
drv, err := fn(cfg, mgr)
if err != nil {
return fmt.Errorf("can not new driver %s, error %s", name, err.Error())
}
drivers[name] = drv
return nil
}
109 changes: 109 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/driver/mock/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package mock

import (
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver"

Check failure on line 11 in pkg/yurtmanager/controller/loadbalancer/driver/mock/mock.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver (-: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver]) (typecheck)
reqCtx "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/sharedcontext"
)

const MockClass = "openyurt.io/mock"

type provider interface{}

func NewProvider(path string) provider {
return nil
}

type LBManager struct{}
type ListenerManager struct{}
type ServerGroupManager struct{}

type Builder struct {
lbMgr *LBManager
listenerMgr *ListenerManager
sgMgr *ServerGroupManager
}

type Applier struct {
lbMgr *LBManager
listenerMgr *ListenerManager
sgMgr *ServerGroupManager
}

var _ driver.Driver = (*MockLB)(nil)

type MockLB struct {
prvd provider
builder *Builder
applier *Applier
client client.Client
}

func (e *MockLB) Apply(reqCtx *reqCtx.RequestContext) (map[string]*driver.Configuration, error) {
return nil, nil
}

func (e *MockLB) Cleanup(reqCtx *reqCtx.RequestContext) (map[string]*driver.Configuration, error) {
return nil, nil
}

func NewMockLoadBalancer(c *appconfig.CompletedConfig, mgr manager.Manager) (driver.Driver, error) {
return &MockLB{prvd: NewProvider(""), client: mgr.GetClient()}, nil
}

func (e *MockLB) Service() predicate.Predicate {
return predicate.NewPredicateFuncs(
func(object client.Object) bool {
svc, ok := object.(*v1.Service)
if !ok {
return false
}
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
return false
}
if *svc.Spec.LoadBalancerClass != MockClass {
return false
}
return true
},
)
}

func (e *MockLB) EndpointSlice() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
// TODO
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
// TODO
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return false
},
}
}

func (e *MockLB) Node() predicate.Predicate {
return predicate.NewPredicateFuncs(func(object client.Object) bool { return false })
}

func (e *MockLB) NodePool() predicate.Predicate {
return predicate.NewPredicateFuncs(func(object client.Object) bool { return false })
}

func (e *MockLB) Init() error {
lbMgr := &LBManager{}
sgMgr := &ServerGroupManager{}
lsMgr := &ListenerManager{}
e.builder = &Builder{lbMgr: lbMgr, sgMgr: sgMgr, listenerMgr: lsMgr}
e.applier = &Applier{lbMgr: lbMgr, sgMgr: sgMgr, listenerMgr: lsMgr}
return nil
}
123 changes: 123 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package loadbalancer

import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver"

Check failure on line 16 in pkg/yurtmanager/controller/loadbalancer/event_handler.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver (-: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver]) (typecheck)
)

type enqueueRequestForEndpointSliceEvent struct{}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func NewEnqueueRequestForEndpointSliceEvent() *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{}
}

func (e *enqueueRequestForEndpointSliceEvent) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)

}

func (e *enqueueRequestForEndpointSliceEvent) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.ObjectNew, q)
}

func (e *enqueueRequestForEndpointSliceEvent) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)
}

func (e *enqueueRequestForEndpointSliceEvent) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)
}

func enqueueServiceForEndpointSlice(obj client.Object, q workqueue.RateLimitingInterface) {
es, ok := obj.(*discovery.EndpointSlice)
if ok {
if svcName, ok := es.Labels[discovery.LabelServiceName]; ok {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: es.Namespace, Name: svcName}})
}
}
}

type enqueueRequestForNode struct {
client client.Client
drivers driver.Drivers
}

var _ handler.EventHandler = (*enqueueRequestForNode)(nil)

func NewEnqueueRequestForNodeEvent(client client.Client, drivers driver.Drivers) *enqueueRequestForNode {
return &enqueueRequestForNode{client: client, drivers: drivers}
}

func (e *enqueueRequestForNode) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNode) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.ObjectNew), Type(evt.ObjectNew))
}

func (e *enqueueRequestForNode) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNode) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

type enqueueRequestForNodePool struct {
client client.Client
drivers driver.Drivers
}

var _ handler.EventHandler = (*enqueueRequestForNodePool)(nil)

func NewEnqueueRequestForNodePoolEvent(client client.Client, drivers driver.Drivers) *enqueueRequestForNodePool {
return &enqueueRequestForNodePool{client: client, drivers: drivers}
}

func (e *enqueueRequestForNodePool) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNodePool) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.ObjectNew), Type(evt.ObjectNew))
}

func (e *enqueueRequestForNodePool) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNodePool) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func enqueueLoadBalancerService(client client.Client, drivers driver.Drivers, q workqueue.RateLimitingInterface, objectKey string, objectType string) {
var svcList v1.ServiceList
err := client.List(context.TODO(), &svcList)
if err != nil {
klog.Error(err, "fail to list services for object", "object type: ", objectKey, "object key: ", objectType)
return
}
for _, svc := range svcList.Items {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
continue
}
if _, ok := drivers[GetLoadBalancerClass(&svc)]; !ok {
continue
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}})
}
}
Loading

0 comments on commit 0fe650a

Please sign in to comment.