Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support a loadbalancer controller framework #1938

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package loadbalancer

const (
VIPClass = "openyurt.io/vip"
MockClass = "openyurt.io/mock"
UnknownClass = "openyurt.io/unknown"
)

const (
name = "loadblancer"
LBFinalizer = "service.openyurt.io/loadbalancer"
FailedAddFinalizer = "FailedAddFinalizer"
FailedRemoveFinalizer = "FailedRemoveFinalizer"
)
105 changes: 105 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/driver/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package driver

Check failure on line 1 in pkg/yurtmanager/controller/loadbalancer/driver/driver.go

View workflow job for this annotation

GitHub Actions / golangci-lint

: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver] (typecheck)

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"

"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock"
)

type ActionType string

const (
Create = ActionType("Create")
Update = ActionType("Update")
Delete = ActionType("Delete")
)

type AccessPoint struct {
Name string
Address string
}

type Annotations interface {
Get(key string) (value string)
Has(key string) (exists bool)
}

type Set map[string]string

// Has returns whether the provided label exists in the map.
func (ls Set) Has(label string) bool {
_, exists := ls[label]
return exists
}

// Get returns the value in the map for the provided label.
func (ls Set) Get(label string) string {
return ls[label]
}

type Configuration struct {
PoolName string
Action ActionType
AccessPoints []AccessPoint
ErrorInfo error
Config map[string]string
}

type LoadBalancer interface {
ID() string
IsMatch(svc *v1.Service) bool
Backends() []*v1.Node
AccessPoints() []AccessPoint
Extra() map[string]string
}

// Service and EndpointSlice
type Predication interface {
Service() predicate.Predicate
EndpointSlice() predicate.Predicate
Node() predicate.Predicate
NodePool() predicate.Predicate
}

type Driver interface {
Predication
Init() error
List(ctx context.Context, svc *v1.Service) ([]LoadBalancer, error)
Filter(ctx context.Context, svc *v1.Service, nodes []*v1.Node) ([]*v1.Node, error)
Get(ctx context.Context, svc *v1.Service, nodes []*v1.Node) (LoadBalancer, error)
Create(ctx context.Context, svc *v1.Service, nodes []*v1.Node) (LoadBalancer, error)
Update(ctx context.Context, svc *v1.Service, nodes []*v1.Node, lb LoadBalancer) (LoadBalancer, error)
Delete(ctx context.Context, svc *v1.Service, lb LoadBalancer) (LoadBalancer, error)
}

type Drivers map[string]Driver

func NewDrivers(cfg *appconfig.CompletedConfig, mgr manager.Manager) (Drivers, error) {
drivers := make(map[string]Driver, 0)
err := Register(drivers, cfg, mgr, mock.MockClass, mock.NewMockLoadBalancer)
if err != nil {
return nil, err
}
return drivers, nil
}

type NewDriverFunc func(cfg *appconfig.CompletedConfig, mgr manager.Manager) (Driver, error)

func Register(drivers Drivers, cfg *appconfig.CompletedConfig, mgr manager.Manager, name string, fn NewDriverFunc) error {
if cfg == nil {
return fmt.Errorf("config is empty")
}
drv, err := fn(cfg, mgr)
if err != nil {
return fmt.Errorf("can not new driver %s, error %s", name, err.Error())
}
drivers[name] = drv
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package mock
109 changes: 109 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/driver/mock/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package mock

import (
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver"

Check failure on line 11 in pkg/yurtmanager/controller/loadbalancer/driver/mock/mock.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver (-: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver]) (typecheck)
reqCtx "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/sharedcontext"
)

const MockClass = "openyurt.io/mock"

type provider interface{}

func NewProvider(path string) provider {
return nil
}

type LBManager struct{}
type ListenerManager struct{}
type ServerGroupManager struct{}

type Builder struct {
lbMgr *LBManager
listenerMgr *ListenerManager
sgMgr *ServerGroupManager
}

type Applier struct {
lbMgr *LBManager
listenerMgr *ListenerManager
sgMgr *ServerGroupManager
}

var _ driver.Driver = (*MockLB)(nil)

type MockLB struct {
prvd provider
builder *Builder
applier *Applier
client client.Client
}

func (e *MockLB) Apply(reqCtx *reqCtx.RequestContext) (map[string]*driver.Configuration, error) {
return nil, nil
}

func (e *MockLB) Cleanup(reqCtx *reqCtx.RequestContext) (map[string]*driver.Configuration, error) {
return nil, nil
}

func NewMockLoadBalancer(c *appconfig.CompletedConfig, mgr manager.Manager) (driver.Driver, error) {
return &MockLB{prvd: NewProvider(""), client: mgr.GetClient()}, nil
}

func (e *MockLB) Service() predicate.Predicate {
return predicate.NewPredicateFuncs(
func(object client.Object) bool {
svc, ok := object.(*v1.Service)
if !ok {
return false
}
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
return false
}
if *svc.Spec.LoadBalancerClass != MockClass {
return false
}
return true
},
)
}

func (e *MockLB) EndpointSlice() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
// TODO
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
// TODO
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return false
},
}
}

func (e *MockLB) Node() predicate.Predicate {
return predicate.NewPredicateFuncs(func(object client.Object) bool { return false })
}

func (e *MockLB) NodePool() predicate.Predicate {
return predicate.NewPredicateFuncs(func(object client.Object) bool { return false })
}

func (e *MockLB) Init() error {
lbMgr := &LBManager{}
sgMgr := &ServerGroupManager{}
lsMgr := &ListenerManager{}
e.builder = &Builder{lbMgr: lbMgr, sgMgr: sgMgr, listenerMgr: lsMgr}
e.applier = &Applier{lbMgr: lbMgr, sgMgr: sgMgr, listenerMgr: lsMgr}
return nil
}
123 changes: 123 additions & 0 deletions pkg/yurtmanager/controller/loadbalancer/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package loadbalancer

import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver"

Check failure on line 16 in pkg/yurtmanager/controller/loadbalancer/event_handler.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver (-: import cycle not allowed: import stack: [github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver/mock github.com/openyurtio/openyurt/pkg/yurtmanager/controller/loadbalancer/driver]) (typecheck)
)

type enqueueRequestForEndpointSliceEvent struct{}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func NewEnqueueRequestForEndpointSliceEvent() *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{}
}

func (e *enqueueRequestForEndpointSliceEvent) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)

}

func (e *enqueueRequestForEndpointSliceEvent) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.ObjectNew, q)
}

func (e *enqueueRequestForEndpointSliceEvent) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)
}

func (e *enqueueRequestForEndpointSliceEvent) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueServiceForEndpointSlice(evt.Object, q)
}

func enqueueServiceForEndpointSlice(obj client.Object, q workqueue.RateLimitingInterface) {
es, ok := obj.(*discovery.EndpointSlice)
if ok {
if svcName, ok := es.Labels[discovery.LabelServiceName]; ok {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: es.Namespace, Name: svcName}})
}
}
}

type enqueueRequestForNode struct {
client client.Client
drivers driver.Drivers
}

var _ handler.EventHandler = (*enqueueRequestForNode)(nil)

func NewEnqueueRequestForNodeEvent(client client.Client, drivers driver.Drivers) *enqueueRequestForNode {
return &enqueueRequestForNode{client: client, drivers: drivers}
}

func (e *enqueueRequestForNode) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNode) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.ObjectNew), Type(evt.ObjectNew))
}

func (e *enqueueRequestForNode) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNode) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

type enqueueRequestForNodePool struct {
client client.Client
drivers driver.Drivers
}

var _ handler.EventHandler = (*enqueueRequestForNodePool)(nil)

func NewEnqueueRequestForNodePoolEvent(client client.Client, drivers driver.Drivers) *enqueueRequestForNodePool {
return &enqueueRequestForNodePool{client: client, drivers: drivers}
}

func (e *enqueueRequestForNodePool) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNodePool) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.ObjectNew), Type(evt.ObjectNew))
}

func (e *enqueueRequestForNodePool) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func (e *enqueueRequestForNodePool) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueLoadBalancerService(e.client, e.drivers, q, Key(evt.Object), Type(evt.Object))
}

func enqueueLoadBalancerService(client client.Client, drivers driver.Drivers, q workqueue.RateLimitingInterface, objectKey string, objectType string) {
var svcList v1.ServiceList
err := client.List(context.TODO(), &svcList)
if err != nil {
klog.Error(err, "fail to list services for object", "object type: ", objectKey, "object key: ", objectType)
return
}
for _, svc := range svcList.Items {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
continue
}
if _, ok := drivers[GetLoadBalancerClass(&svc)]; !ok {
continue
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}})
}
}
Loading
Loading