diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index fb860d4209a78..d4be81c9c27c1 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -33,6 +33,7 @@ type framework struct { plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. reservePlugins []ReservePlugin prebindPlugins []PrebindPlugin + unreservePlugins []UnreservePlugin } var _ = Framework(&framework{}) @@ -64,6 +65,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) { if pp, ok := p.(PrebindPlugin); ok { f.prebindPlugins = append(f.prebindPlugins, pp) } + if up, ok := p.(UnreservePlugin); ok { + f.unreservePlugins = append(f.unreservePlugins, up) + } } return f, nil } @@ -105,6 +109,14 @@ func (f *framework) RunReservePlugins( return nil } +// RunUnreservePlugins runs the set of configured unreserve plugins. +func (f *framework) RunUnreservePlugins( + pc *PluginContext, pod *v1.Pod, nodeName string) { + for _, pl := range f.unreservePlugins { + pl.Unreserve(pc, pod, nodeName) + } +} + // NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until a // pod finishes "Reserve". There is no guarantee that the information remains diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 49b4a2351699f..a3764adf616fd 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -113,6 +113,17 @@ type PrebindPlugin interface { Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status } +// UnreservePlugin is an interface for Unreserve plugins. This is an informational +// extension point. If a pod was reserved and then rejected in a later phase, then +// un-reserve plugins will be notified. Un-reserve plugins should clean up state +// associated with the reserved Pod. +type UnreservePlugin interface { + Plugin + // Unreserve is called by the scheduling framework when a reserved pod was + // rejected in a later phase. + Unreserve(pc *PluginContext, p *v1.Pod, nodeName string) +} + // Framework manages the set of plugins in use by the scheduling framework. // Configured plugins are called at specified points in a scheduling context. type Framework interface { @@ -128,6 +139,9 @@ type Framework interface { // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status + + // RunUnreservePlugins runs the set of configured unreserve plugins. + RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) } // FrameworkHandle provides data and some tools that plugins can use. It is diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6fb7151a32d60..770866cb2ef8b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -515,6 +515,8 @@ func (sched *Scheduler) scheduleOne() { if err != nil { klog.Errorf("error assuming pod: %v", err) metrics.PodScheduleErrors.Inc() + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). @@ -525,6 +527,8 @@ func (sched *Scheduler) scheduleOne() { if err != nil { klog.Errorf("error binding volumes: %v", err) metrics.PodScheduleErrors.Inc() + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) return } } @@ -543,6 +547,8 @@ func (sched *Scheduler) scheduleOne() { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message()) + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) return } @@ -558,6 +564,8 @@ func (sched *Scheduler) scheduleOne() { if err != nil { klog.Errorf("error binding pod: %v", err) metrics.PodScheduleErrors.Inc() + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) } else { klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes) metrics.PodScheduleSuccesses.Inc() diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index c900bd6a7e956..4b55e36a3b1df 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -18,11 +18,11 @@ package scheduler import ( "fmt" - "k8s.io/apimachinery/pkg/runtime" "testing" "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -30,11 +30,12 @@ import ( // TesterPlugin is common ancestor for a test plugin that allows injection of // failures and some other test functionalities. type TesterPlugin struct { - numReserveCalled int - numPrebindCalled int - failReserve bool - failPrebind bool - rejectPrebind bool + numReserveCalled int + numPrebindCalled int + numUnreserveCalled int + failReserve bool + failPrebind bool + rejectPrebind bool } type ReservePlugin struct { @@ -45,19 +46,27 @@ type PrebindPlugin struct { TesterPlugin } +type UnreservePlugin struct { + TesterPlugin +} + const ( - reservePluginName = "reserve-plugin" - prebindPluginName = "prebind-plugin" + reservePluginName = "reserve-plugin" + prebindPluginName = "prebind-plugin" + unreservePluginName = "unreserve-plugin" ) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) +var _ = framework.UnreservePlugin(&UnreservePlugin{}) // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { return reservePluginName } +var resPlugin = &ReservePlugin{} + // Reserve is a test function that returns an error or nil, depending on the // value of "failReserve". func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { @@ -68,14 +77,13 @@ func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeN return nil } -var resPlugin = &ReservePlugin{} -var pbdPlugin = &PrebindPlugin{} - // NewReservePlugin is the factory for reserve plugin. func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { return resPlugin, nil } +var pbdPlugin = &PrebindPlugin{} + // Name returns name of the plugin. func (pp *PrebindPlugin) Name() string { return prebindPluginName @@ -93,11 +101,39 @@ func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeN return nil } +// reset used to reset numPrebindCalled. +func (pp *PrebindPlugin) reset() { + pp.numPrebindCalled = 0 +} + // NewPrebindPlugin is the factory for prebind plugin. func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { return pbdPlugin, nil } +var unresPlugin = &UnreservePlugin{} + +// Name returns name of the plugin. +func (up *UnreservePlugin) Name() string { + return unreservePluginName +} + +// Unreserve is a test function that returns an error or nil, depending on the +// value of "failUnreserve". +func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) { + up.numUnreserveCalled++ +} + +// reset used to reset numUnreserveCalled. +func (up *UnreservePlugin) reset() { + up.numUnreserveCalled = 0 +} + +// NewUnreservePlugin is the factory for unreserve plugin. +func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return unresPlugin, nil +} + // TestReservePlugin tests invocation of reserve plugins. func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. @@ -216,3 +252,87 @@ func TestPrebindPlugin(t *testing.T) { cleanupPods(cs, t, []*v1.Pod{pod}) } } + +// TestUnreservePlugin tests invocation of un-reserve plugin +func TestUnreservePlugin(t *testing.T) { + // TODO: register more plugin which would trigger un-reserve plugin + registry := framework.Registry{ + unreservePluginName: NewUnreservePlugin, + prebindPluginName: NewPrebindPlugin, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "unreserve-plugin", nil), + false, nil, registry, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + prebindFail bool + prebindReject bool + }{ + { + prebindFail: false, + prebindReject: false, + }, + { + prebindFail: true, + prebindReject: false, + }, + { + prebindFail: false, + prebindReject: true, + }, + { + prebindFail: true, + prebindReject: true, + }, + } + + for i, test := range tests { + pbdPlugin.failPrebind = test.prebindFail + pbdPlugin.rejectPrebind = test.prebindReject + + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if test.prebindFail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) + } + if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled { + t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled) + } + } else { + if test.prebindReject { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err) + } + if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled { + t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + } + if unresPlugin.numUnreserveCalled > 0 { + t.Errorf("test #%v: Didn't expected the unreserve plugin to be called, was called %d times.", i, unresPlugin.numUnreserveCalled) + } + } + } + unresPlugin.reset() + pbdPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{pod}) + } +}