Skip to content

Commit

Permalink
Add Un-reserve extension point for the scheduling framework
Browse files Browse the repository at this point in the history
  • Loading branch information
danielqsj committed May 10, 2019
1 parent 086a86b commit 997648a
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 11 deletions.
12 changes: 12 additions & 0 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type framework struct {
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
unreservePlugins []UnreservePlugin
}

var _ = Framework(&framework{})
Expand Down Expand Up @@ -64,6 +65,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
if pp, ok := p.(PrebindPlugin); ok {
f.prebindPlugins = append(f.prebindPlugins, pp)
}
if up, ok := p.(UnreservePlugin); ok {
f.unreservePlugins = append(f.unreservePlugins, up)
}
}
return f, nil
}
Expand Down Expand Up @@ -105,6 +109,14 @@ func (f *framework) RunReservePlugins(
return nil
}

// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) {
for _, pl := range f.unreservePlugins {
pl.Unreserve(pc, pod, nodeName)
}
}

// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains
Expand Down
14 changes: 14 additions & 0 deletions pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ type PrebindPlugin interface {
Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
}

// UnreservePlugin is an interface for Unreserve plugins. This is an informational
// extension point. If a pod was reserved and then rejected in a later phase, then
// un-reserve plugins will be notified. Un-reserve plugins should clean up state
// associated with the reserved Pod.
type UnreservePlugin interface {
Plugin
// Unreserve is called by the scheduling framework when a reserved pod was
// rejected in a later phase.
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
}

// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
Expand All @@ -128,6 +139,9 @@ type Framework interface {
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status

// RunUnreservePlugins runs the set of configured unreserve plugins.
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
}

// FrameworkHandle provides data and some tools that plugins can use. It is
Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (sched *Scheduler) scheduleOne() {
if err != nil {
klog.Errorf("error assuming pod: %v", err)
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
Expand All @@ -525,6 +527,8 @@ func (sched *Scheduler) scheduleOne() {
if err != nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
return
}
}
Expand All @@ -543,6 +547,8 @@ func (sched *Scheduler) scheduleOne() {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
return
}

Expand All @@ -558,6 +564,8 @@ func (sched *Scheduler) scheduleOne() {
if err != nil {
klog.Errorf("error binding pod: %v", err)
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
} else {
klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
metrics.PodScheduleSuccesses.Inc()
Expand Down
142 changes: 131 additions & 11 deletions test/integration/scheduler/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@ package scheduler

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"testing"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)

// TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
numReserveCalled int
numPrebindCalled int
numUnreserveCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
}

type ReservePlugin struct {
Expand All @@ -45,19 +46,27 @@ type PrebindPlugin struct {
TesterPlugin
}

type UnreservePlugin struct {
TesterPlugin
}

const (
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
)

var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.UnreservePlugin(&UnreservePlugin{})

// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
return reservePluginName
}

var resPlugin = &ReservePlugin{}

// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
Expand All @@ -68,14 +77,13 @@ func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeN
return nil
}

var resPlugin = &ReservePlugin{}
var pbdPlugin = &PrebindPlugin{}

// NewReservePlugin is the factory for reserve plugin.
func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return resPlugin, nil
}

var pbdPlugin = &PrebindPlugin{}

// Name returns name of the plugin.
func (pp *PrebindPlugin) Name() string {
return prebindPluginName
Expand All @@ -93,11 +101,39 @@ func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeN
return nil
}

// reset used to reset numPrebindCalled.
func (pp *PrebindPlugin) reset() {
pp.numPrebindCalled = 0
}

// NewPrebindPlugin is the factory for prebind plugin.
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pbdPlugin, nil
}

var unresPlugin = &UnreservePlugin{}

// Name returns name of the plugin.
func (up *UnreservePlugin) Name() string {
return unreservePluginName
}

// Unreserve is a test function that returns an error or nil, depending on the
// value of "failUnreserve".
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
up.numUnreserveCalled++
}

// reset used to reset numUnreserveCalled.
func (up *UnreservePlugin) reset() {
up.numUnreserveCalled = 0
}

// NewUnreservePlugin is the factory for unreserve plugin.
func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return unresPlugin, nil
}

// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
Expand Down Expand Up @@ -216,3 +252,87 @@ func TestPrebindPlugin(t *testing.T) {
cleanupPods(cs, t, []*v1.Pod{pod})
}
}

// TestUnreservePlugin tests invocation of un-reserve plugin
func TestUnreservePlugin(t *testing.T) {
// TODO: register more plugin which would trigger un-reserve plugin
registry := framework.Registry{
unreservePluginName: NewUnreservePlugin,
prebindPluginName: NewPrebindPlugin,
}

// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "unreserve-plugin", nil),
false, nil, registry, false, time.Second)
defer cleanupTest(t, context)

cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}

tests := []struct {
prebindFail bool
prebindReject bool
}{
{
prebindFail: false,
prebindReject: false,
},
{
prebindFail: true,
prebindReject: false,
},
{
prebindFail: false,
prebindReject: true,
},
{
prebindFail: true,
prebindReject: true,
},
}

for i, test := range tests {
pbdPlugin.failPrebind = test.prebindFail
pbdPlugin.rejectPrebind = test.prebindReject

// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}

if test.prebindFail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
}
} else {
if test.prebindReject {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
}
if unresPlugin.numUnreserveCalled == 0 || unresPlugin.numUnreserveCalled != pbdPlugin.numPrebindCalled {
t.Errorf("test #%v: Expected the unreserve plugin to be called %d times, was called %d times.", i, pbdPlugin.numPrebindCalled, unresPlugin.numUnreserveCalled)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
if unresPlugin.numUnreserveCalled > 0 {
t.Errorf("test #%v: Didn't expected the unreserve plugin to be called, was called %d times.", i, unresPlugin.numUnreserveCalled)
}
}
}
unresPlugin.reset()
pbdPlugin.reset()
cleanupPods(cs, t, []*v1.Pod{pod})
}
}

0 comments on commit 997648a

Please sign in to comment.