Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Dec 4, 2024
1 parent f2e07f0 commit a3d603c
Show file tree
Hide file tree
Showing 32 changed files with 2,048 additions and 109 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var DefaultMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
MultiplexerCacheManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: DefaultMultiplexerResources,
MultiplexerCacheManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
}
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)

Check warning on line 139 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L139

Added line #L139 was not covered by tests
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type CacheAgent struct {
store StorageWrapper
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName),
store: store,
}
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
nodeName string,
storagewrapper StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
) CacheManager {
cacheAgents := NewCacheAgents(sharedFactory, storagewrapper)
cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper)
cm := &cacheManager{
storage: storagewrapper,
serializerManager: serializerMgr,
Expand Down
12 changes: 6 additions & 6 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
group string
Expand Down Expand Up @@ -992,7 +992,7 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo storage.KeyBuildInfo
Expand Down Expand Up @@ -2312,7 +2312,7 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo *storage.KeyBuildInfo
Expand Down Expand Up @@ -3136,7 +3136,7 @@ func TestCanCacheFor(t *testing.T) {
defer close(stop)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
m := NewCacheManager(s, nil, nil, informerFactory)
m := NewCacheManager("node1", s, nil, nil, informerFactory)
informerFactory.Start(nil)
cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced)
if tt.preRequest != nil {
Expand Down
34 changes: 34 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package filter
import (
"io"
"net/http"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
)

type NodesInPoolGetter func(poolName string) ([]string, error)
Expand Down Expand Up @@ -59,4 +62,35 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterManager interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)

type UnionObjectFilter []ObjectFilter

func (chain UnionObjectFilter) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())

Check warning on line 77 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}
return strings.Join(names, ",")

Check warning on line 79 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L79

Added line #L79 was not covered by tests
}

func (chain UnionObjectFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] {

Check warning on line 82 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L82

Added line #L82 was not covered by tests
// do nothing
return map[string]sets.Set[string]{}

Check warning on line 84 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L84

Added line #L84 was not covered by tests
}

func (chain UnionObjectFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break

Check warning on line 91 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L87-L91

Added lines #L87 - L91 were not covered by tests
}
}

return obj

Check warning on line 95 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L95

Added line #L95 was not covered by tests
}
17 changes: 17 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil

Check warning on line 119 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L115-L119

Added lines #L115 - L119 were not covered by tests
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)

Check warning on line 124 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L122-L124

Added lines #L122 - L124 were not covered by tests
}
}

filters := filter.UnionObjectFilter(objectFilters)
return filters

Check warning on line 129 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L128-L129

Added lines #L128 - L129 were not covered by tests
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
96 changes: 50 additions & 46 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,60 @@ var newServiceListFunc = func() runtime.Object {
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
storage := ystorage.NewFakeServiceStorage(
[]v1.Service{
*newService(metav1.NamespaceSystem, "coredns"),
*newService(metav1.NamespaceDefault, "nginx"),
})

cache, _, _ := NewResourceCache(
storage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
for _, tc := range []struct {
name string
key string
expectedServiceList *v1.ServiceList
}{
{
"all namespace",
"",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
*newService(metav1.NamespaceSystem, "coredns"),
},
},
},
{
"default namespace",
"/default",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
},
},
},
} {
serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items)
}
}

func mockListOptions() storage.ListOptions {
Expand All @@ -74,27 +115,17 @@ func mockListOptions() storage.ListOptions {
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

Expand All @@ -117,7 +148,7 @@ func mockWatchOptions() storage.ListOptions {
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())
receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
Expand All @@ -127,30 +158,3 @@ func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceSto
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}

func TestResourceCache_Get(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)
assert.Nil(t, err)
assertCacheGet(t, cache)
}

func assertCacheGet(t testing.TB, cache Interface) {
t.Helper()

service := &v1.Service{}
err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{
ResourceVersion: "1",
}, service)

assert.Nil(t, err)
assert.Equal(t, "coredns", service.Name)
}
Loading

0 comments on commit a3d603c

Please sign in to comment.