Skip to content

Commit

Permalink
Implement the permit extension point in scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahg-g committed May 10, 2019
1 parent 21bec91 commit 98de316
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/v1alpha1/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ go_library(
"framework.go",
"interface.go",
"registry.go",
"waiting_pods_map.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand Down
86 changes: 86 additions & 0 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package v1alpha1

import (
"fmt"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
Expand All @@ -30,12 +32,19 @@ import (
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}

const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
)

var _ = Framework(&framework{})

// NewFramework initializes plugins given the configuration and the registry.
Expand All @@ -44,6 +53,7 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
plugins: make(map[string]Plugin),
waitingPods: newWaitingPodsMap(),
}

// TODO: The framework needs to read the scheduler config and initialize only
Expand All @@ -68,6 +78,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
if up, ok := p.(UnreservePlugin); ok {
f.unreservePlugins = append(f.unreservePlugins, up)
}
if pr, ok := p.(PermitPlugin); ok {
f.permitPlugins = append(f.permitPlugins, pr)
}
}
return f, nil
}
Expand Down Expand Up @@ -117,10 +130,83 @@ func (f *framework) RunUnreservePlugins(
}
}

// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
timeout := maxTimeout
statusCode := Success
for _, pl := range f.permitPlugins {
status, d := pl.Permit(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
if status.Code() == Wait {
// Use the minimum timeout duration.
if timeout > d {
timeout = d
}
statusCode = Wait
} else {
msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}

// We now wait for the minimum duration if at least one plugin asked to
// wait (and no plugin rejected the pod)
if statusCode == Wait {
w := newWaitingPod(pod)
f.waitingPods.add(w)
defer f.waitingPods.remove(pod.UID)
timer := time.NewTimer(timeout)
klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name)
select {
case <-timer.C:
msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout)
klog.V(4).Infof(msg)
return NewStatus(Unschedulable, msg)
case s := <-w.s:
if !s.IsSuccess() {
if s.Code() == Unschedulable {
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}

return nil
}

// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains
// unchanged after "Reserve".
func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot {
return f.nodeInfoSnapshot
}

// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
f.waitingPods.iterate(callback)
}

// GetWaitingPod returns a reference to a WaitingPod given its UID.
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid)
}
46 changes: 45 additions & 1 deletion pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package v1alpha1

import (
"errors"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)

Expand All @@ -38,6 +40,8 @@ const (
// Unschedulable is used when a plugin finds a pod unschedulable.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable Code = 2
// Wait is used when a permit plugin finds a pod scheduling should wait.
Wait Code = 3
)

// Status indicates the result of running a plugin. It consists of a code and a
Expand Down Expand Up @@ -86,6 +90,18 @@ func NewStatus(code Code, msg string) *Status {
}
}

// WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
Allow() bool
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// was successfully delivered, false otherwise.
Reject(msg string) bool
}

// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
Expand All @@ -105,7 +121,7 @@ type ReservePlugin interface {
}

// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled
// These plugins are called before a pod being scheduled.
type PrebindPlugin interface {
Plugin
// Prebind is called before binding a pod. All prebind plugins must return
Expand All @@ -124,6 +140,19 @@ type UnreservePlugin interface {
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
}

// PermitPlugin is an interface that must be implemented by "permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface {
Plugin
// Permit is called before binding a pod (and before prebind plugins). Permit
// plugins are used to prevent or delay the binding of a Pod. A permit plugin
// must return success or wait with timeout duration, or the pod will be rejected.
// The pod will also be rejected if the wait timeout or the pod is rejected while
// waiting. Note that if the plugin returns "wait", the framework will wait only
// after running the remaining plugins given that no other plugin rejects the pod.
Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
}

// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
Expand All @@ -142,6 +171,15 @@ type Framework interface {

// RunUnreservePlugins runs the set of configured unreserve plugins.
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)

// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}

// FrameworkHandle provides data and some tools that plugins can use. It is
Expand All @@ -153,4 +191,10 @@ type FrameworkHandle interface {
// a pod finishes "Reserve" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling.
NodeInfoSnapshot() *internalcache.NodeInfoSnapshot

// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod))

// GetWaitingPod returns a waiting pod given its UID.
GetWaitingPod(uid types.UID) WaitingPod
}
109 changes: 109 additions & 0 deletions pkg/scheduler/framework/v1alpha1/waiting_pods_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"sync"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
type waitingPodsMap struct {
pods map[types.UID]WaitingPod
mu sync.RWMutex
}

// newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{
pods: make(map[types.UID]WaitingPod),
}
}

// add a new WaitingPod to the map.
func (m *waitingPodsMap) add(wp WaitingPod) {
m.mu.Lock()
defer m.mu.Unlock()
m.pods[wp.GetPod().UID] = wp
}

// remove a WaitingPod from the map.
func (m *waitingPodsMap) remove(uid types.UID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.pods, uid)
}

// get a WaitingPod from the map.
func (m *waitingPodsMap) get(uid types.UID) WaitingPod {
m.mu.RLock()
defer m.mu.RUnlock()
return m.pods[uid]

}

// iterate acquires a read lock and iterates over the WaitingPods map.
func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.pods {
callback(v)
}
}

// waitingPod represents a pod waiting in the permit phase.
type waitingPod struct {
pod *v1.Pod
s chan *Status
}

// newWaitingPod returns a new waitingPod instance.
func newWaitingPod(pod *v1.Pod) *waitingPod {
return &waitingPod{
pod: pod,
s: make(chan *Status),
}
}

// GetPod returns a reference to the waiting pod.
func (w *waitingPod) GetPod() *v1.Pod {
return w.pod
}

// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
func (w *waitingPod) Allow() bool {
select {
case w.s <- NewStatus(Success, ""):
return true
default:
return false
}
}

// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// was successfully delivered, false otherwise.
func (w *waitingPod) Reject(msg string) bool {
select {
case w.s <- NewStatus(Unschedulable, msg):
return true
default:
return false
}
}
19 changes: 19 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,25 @@ func (sched *Scheduler) scheduleOne() {
}
}

// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.Code() == framework.Unschedulable {
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message())
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
return
}

// Run "prebind" plugins.
prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !prebindStatus.IsSuccess() {
Expand Down
Loading

0 comments on commit 98de316

Please sign in to comment.