-
Notifications
You must be signed in to change notification settings - Fork 18
/
scheduler.go
142 lines (114 loc) · 4.23 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package scheduler
import (
"context"
"fmt"
"github.com/sanposhiho/mini-kube-scheduler/minisched"
"golang.org/x/xerrors"
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
v1beta2config "k8s.io/kube-scheduler/config/v1beta2"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta2"
"github.com/sanposhiho/mini-kube-scheduler/scheduler/defaultconfig"
"github.com/sanposhiho/mini-kube-scheduler/scheduler/plugin"
)
// Service manages scheduler.
type Service struct {
// function to shutdown scheduler.
shutdownfn func()
clientset clientset.Interface
restclientCfg *restclient.Config
currentSchedulerCfg *v1beta2config.KubeSchedulerConfiguration
}
// NewSchedulerService starts scheduler and return *Service.
func NewSchedulerService(client clientset.Interface, restclientCfg *restclient.Config) *Service {
return &Service{clientset: client, restclientCfg: restclientCfg}
}
func (s *Service) RestartScheduler(cfg *v1beta2config.KubeSchedulerConfiguration) error {
s.ShutdownScheduler()
if err := s.StartScheduler(cfg); err != nil {
return xerrors.Errorf("start scheduler: %w", err)
}
return nil
}
// StartScheduler starts scheduler.
func (s *Service) StartScheduler(versionedcfg *v1beta2config.KubeSchedulerConfiguration) error {
clientSet := s.clientset
ctx, cancel := context.WithCancel(context.Background())
informerFactory := scheduler.NewInformerFactory(clientSet, 0)
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: clientSet.EventsV1(),
})
evtBroadcaster.StartRecordingToSink(ctx.Done())
s.currentSchedulerCfg = versionedcfg.DeepCopy()
sched, err := minisched.New(
clientSet,
informerFactory,
)
if err != nil {
cancel()
return fmt.Errorf("create minisched: %w", err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
go sched.Run(ctx)
s.shutdownfn = cancel
return nil
}
func (s *Service) ShutdownScheduler() {
if s.shutdownfn != nil {
klog.Info("shutdown scheduler...")
s.shutdownfn()
}
}
func (s *Service) GetSchedulerConfig() *v1beta2config.KubeSchedulerConfiguration {
return s.currentSchedulerCfg
}
// convertConfigurationForSimulator convert KubeSchedulerConfiguration to apply scheduler on simulator
// (1) It excludes non-allowed changes. Now, we accept only changes to Profiles.Plugins field.
// (2) It replaces filter/score default-plugins with plugins for simulator.
// (3) It convert KubeSchedulerConfiguration from v1beta2config.KubeSchedulerConfiguration to config.KubeSchedulerConfiguration.
func convertConfigurationForSimulator(versioned *v1beta2config.KubeSchedulerConfiguration) (*config.KubeSchedulerConfiguration, error) {
if len(versioned.Profiles) == 0 {
defaultSchedulerName := v1.DefaultSchedulerName
versioned.Profiles = []v1beta2config.KubeSchedulerProfile{
{
SchedulerName: &defaultSchedulerName,
Plugins: &v1beta2config.Plugins{},
},
}
}
for i := range versioned.Profiles {
if versioned.Profiles[i].Plugins == nil {
versioned.Profiles[i].Plugins = &v1beta2config.Plugins{}
}
plugins, err := plugin.ConvertForSimulator(versioned.Profiles[i].Plugins)
if err != nil {
return nil, xerrors.Errorf("convert plugins for simulator: %w", err)
}
versioned.Profiles[i].Plugins = plugins
pluginConfigForSimulatorPlugins, err := plugin.NewPluginConfig(versioned.Profiles[i].PluginConfig)
if err != nil {
return nil, xerrors.Errorf("get plugin configs: %w", err)
}
versioned.Profiles[i].PluginConfig = pluginConfigForSimulatorPlugins
}
defaultCfg, err := defaultconfig.DefaultSchedulerConfig()
if err != nil {
return nil, xerrors.Errorf("get default scheduler config: %w", err)
}
// set default value to all field other than Profiles.
defaultCfg.Profiles = versioned.Profiles
versioned = defaultCfg
v1beta2.SetDefaults_KubeSchedulerConfiguration(versioned)
cfg := config.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(versioned, &cfg, nil); err != nil {
return nil, xerrors.Errorf("convert configuration: %w", err)
}
return &cfg, nil
}