From 98de316436503f88204bb8e3eb49e685973d7cbe Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Fri, 10 May 2019 09:05:59 -0400 Subject: [PATCH] Implement the permit extension point in scheduler. --- pkg/scheduler/framework/v1alpha1/BUILD | 2 + pkg/scheduler/framework/v1alpha1/framework.go | 86 +++++++ pkg/scheduler/framework/v1alpha1/interface.go | 46 +++- .../framework/v1alpha1/waiting_pods_map.go | 109 ++++++++ pkg/scheduler/scheduler.go | 19 ++ test/integration/scheduler/framework_test.go | 242 +++++++++++++++++- 6 files changed, 496 insertions(+), 8 deletions(-) create mode 100644 pkg/scheduler/framework/v1alpha1/waiting_pods_map.go diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index a258da876f9dd..98cdc3565f393 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -7,6 +7,7 @@ go_library( "framework.go", "interface.go", "registry.go", + "waiting_pods_map.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1", visibility = ["//visibility:public"], @@ -14,6 +15,7 @@ go_library( "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index d4be81c9c27c1..752c98cd631d7 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -18,9 +18,11 @@ package v1alpha1 import ( "fmt" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -30,12 +32,19 @@ import ( type framework struct { registry Registry nodeInfoSnapshot *cache.NodeInfoSnapshot + waitingPods *waitingPodsMap plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. reservePlugins []ReservePlugin prebindPlugins []PrebindPlugin unreservePlugins []UnreservePlugin + permitPlugins []PermitPlugin } +const ( + // Specifies the maximum timeout a permit plugin can return. + maxTimeout time.Duration = 15 * time.Minute +) + var _ = Framework(&framework{}) // NewFramework initializes plugins given the configuration and the registry. @@ -44,6 +53,7 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) { registry: r, nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), plugins: make(map[string]Plugin), + waitingPods: newWaitingPodsMap(), } // TODO: The framework needs to read the scheduler config and initialize only @@ -68,6 +78,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) { if up, ok := p.(UnreservePlugin); ok { f.unreservePlugins = append(f.unreservePlugins, up) } + if pr, ok := p.(PermitPlugin); ok { + f.permitPlugins = append(f.permitPlugins, pr) + } } return f, nil } @@ -117,6 +130,69 @@ func (f *framework) RunUnreservePlugins( } } +// RunPermitPlugins runs the set of configured permit plugins. If any of these +// plugins returns a status other than "Success" or "Wait", it does not continue +// running the remaining plugins and returns an error. Otherwise, if any of the +// plugins returns "Wait", then this function will block for the timeout period +// returned by the plugin, if the time expires, then it will return an error. +// Note that if multiple plugins asked to wait, then we wait for the minimum +// timeout duration. +func (f *framework) RunPermitPlugins( + pc *PluginContext, pod *v1.Pod, nodeName string) *Status { + timeout := maxTimeout + statusCode := Success + for _, pl := range f.permitPlugins { + status, d := pl.Permit(pc, pod, nodeName) + if !status.IsSuccess() { + if status.Code() == Unschedulable { + msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message()) + klog.V(4).Infof(msg) + return NewStatus(status.Code(), msg) + } + if status.Code() == Wait { + // Use the minimum timeout duration. + if timeout > d { + timeout = d + } + statusCode = Wait + } else { + msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + // We now wait for the minimum duration if at least one plugin asked to + // wait (and no plugin rejected the pod) + if statusCode == Wait { + w := newWaitingPod(pod) + f.waitingPods.add(w) + defer f.waitingPods.remove(pod.UID) + timer := time.NewTimer(timeout) + klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name) + select { + case <-timer.C: + msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout) + klog.V(4).Infof(msg) + return NewStatus(Unschedulable, msg) + case s := <-w.s: + if !s.IsSuccess() { + if s.Code() == Unschedulable { + msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message()) + klog.V(4).Infof(msg) + return NewStatus(s.Code(), msg) + } + msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + return nil +} + // NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until a // pod finishes "Reserve". There is no guarantee that the information remains @@ -124,3 +200,13 @@ func (f *framework) RunUnreservePlugins( func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot { return f.nodeInfoSnapshot } + +// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map. +func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) { + f.waitingPods.iterate(callback) +} + +// GetWaitingPod returns a reference to a WaitingPod given its UID. +func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { + return f.waitingPods.get(uid) +} diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index a3764adf616fd..e8ce959b51242 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -20,8 +20,10 @@ package v1alpha1 import ( "errors" + "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -38,6 +40,8 @@ const ( // Unschedulable is used when a plugin finds a pod unschedulable. // The accompanying status message should explain why the pod is unschedulable. Unschedulable Code = 2 + // Wait is used when a permit plugin finds a pod scheduling should wait. + Wait Code = 3 ) // Status indicates the result of running a plugin. It consists of a code and a @@ -86,6 +90,18 @@ func NewStatus(code Code, msg string) *Status { } } +// WaitingPod represents a pod currently waiting in the permit phase. +type WaitingPod interface { + // GetPod returns a reference to the waiting pod. + GetPod() *v1.Pod + // Allow the waiting pod to be scheduled. Returns true if the allow signal was + // successfully delivered, false otherwise. + Allow() bool + // Reject declares the waiting pod unschedulable. Returns true if the allow signal + // was successfully delivered, false otherwise. + Reject(msg string) bool +} + // Plugin is the parent type for all the scheduling framework plugins. type Plugin interface { Name() string @@ -105,7 +121,7 @@ type ReservePlugin interface { } // PrebindPlugin is an interface that must be implemented by "prebind" plugins. -// These plugins are called before a pod being scheduled +// These plugins are called before a pod being scheduled. type PrebindPlugin interface { Plugin // Prebind is called before binding a pod. All prebind plugins must return @@ -124,6 +140,19 @@ type UnreservePlugin interface { Unreserve(pc *PluginContext, p *v1.Pod, nodeName string) } +// PermitPlugin is an interface that must be implemented by "permit" plugins. +// These plugins are called before a pod is bound to a node. +type PermitPlugin interface { + Plugin + // Permit is called before binding a pod (and before prebind plugins). Permit + // plugins are used to prevent or delay the binding of a Pod. A permit plugin + // must return success or wait with timeout duration, or the pod will be rejected. + // The pod will also be rejected if the wait timeout or the pod is rejected while + // waiting. Note that if the plugin returns "wait", the framework will wait only + // after running the remaining plugins given that no other plugin rejects the pod. + Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration) +} + // Framework manages the set of plugins in use by the scheduling framework. // Configured plugins are called at specified points in a scheduling context. type Framework interface { @@ -142,6 +171,15 @@ type Framework interface { // RunUnreservePlugins runs the set of configured unreserve plugins. RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) + + // RunPermitPlugins runs the set of configured permit plugins. If any of these + // plugins returns a status other than "Success" or "Wait", it does not continue + // running the remaining plugins and returns an error. Otherwise, if any of the + // plugins returns "Wait", then this function will block for the timeout period + // returned by the plugin, if the time expires, then it will return an error. + // Note that if multiple plugins asked to wait, then we wait for the minimum + // timeout duration. + RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status } // FrameworkHandle provides data and some tools that plugins can use. It is @@ -153,4 +191,10 @@ type FrameworkHandle interface { // a pod finishes "Reserve" point. There is no guarantee that the information // remains unchanged in the binding phase of scheduling. NodeInfoSnapshot() *internalcache.NodeInfoSnapshot + + // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map. + IterateOverWaitingPods(callback func(WaitingPod)) + + // GetWaitingPod returns a waiting pod given its UID. + GetWaitingPod(uid types.UID) WaitingPod } diff --git a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go new file mode 100644 index 0000000000000..842eff5e538de --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go @@ -0,0 +1,109 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "sync" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase. +type waitingPodsMap struct { + pods map[types.UID]WaitingPod + mu sync.RWMutex +} + +// newWaitingPodsMap returns a new waitingPodsMap. +func newWaitingPodsMap() *waitingPodsMap { + return &waitingPodsMap{ + pods: make(map[types.UID]WaitingPod), + } +} + +// add a new WaitingPod to the map. +func (m *waitingPodsMap) add(wp WaitingPod) { + m.mu.Lock() + defer m.mu.Unlock() + m.pods[wp.GetPod().UID] = wp +} + +// remove a WaitingPod from the map. +func (m *waitingPodsMap) remove(uid types.UID) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.pods, uid) +} + +// get a WaitingPod from the map. +func (m *waitingPodsMap) get(uid types.UID) WaitingPod { + m.mu.RLock() + defer m.mu.RUnlock() + return m.pods[uid] + +} + +// iterate acquires a read lock and iterates over the WaitingPods map. +func (m *waitingPodsMap) iterate(callback func(WaitingPod)) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, v := range m.pods { + callback(v) + } +} + +// waitingPod represents a pod waiting in the permit phase. +type waitingPod struct { + pod *v1.Pod + s chan *Status +} + +// newWaitingPod returns a new waitingPod instance. +func newWaitingPod(pod *v1.Pod) *waitingPod { + return &waitingPod{ + pod: pod, + s: make(chan *Status), + } +} + +// GetPod returns a reference to the waiting pod. +func (w *waitingPod) GetPod() *v1.Pod { + return w.pod +} + +// Allow the waiting pod to be scheduled. Returns true if the allow signal was +// successfully delivered, false otherwise. +func (w *waitingPod) Allow() bool { + select { + case w.s <- NewStatus(Success, ""): + return true + default: + return false + } +} + +// Reject declares the waiting pod unschedulable. Returns true if the allow signal +// was successfully delivered, false otherwise. +func (w *waitingPod) Reject(msg string) bool { + select { + case w.s <- NewStatus(Unschedulable, msg): + return true + default: + return false + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 770866cb2ef8b..cf411188a137f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -533,6 +533,25 @@ func (sched *Scheduler) scheduleOne() { } } + // Run "permit" plugins. + permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) + if !permitStatus.IsSuccess() { + var reason string + if permitStatus.Code() == framework.Unschedulable { + reason = v1.PodReasonUnschedulable + } else { + metrics.PodScheduleErrors.Inc() + reason = SchedulerError + } + if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { + klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) + } + sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message()) + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) + return + } + // Run "prebind" plugins. prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) if !prebindStatus.IsSuccess() { diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 4b55e36a3b1df..81e07cb4ac60d 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -30,12 +30,18 @@ import ( // TesterPlugin is common ancestor for a test plugin that allows injection of // failures and some other test functionalities. type TesterPlugin struct { - numReserveCalled int - numPrebindCalled int - numUnreserveCalled int - failReserve bool - failPrebind bool - rejectPrebind bool + numReserveCalled int + numPrebindCalled int + numUnreserveCalled int + failReserve bool + failPrebind bool + rejectPrebind bool + numPermitCalled int + failPermit bool + rejectPermit bool + timeoutPermit bool + waitAndRejectPermit bool + waitAndAllowPermit bool } type ReservePlugin struct { @@ -50,15 +56,22 @@ type UnreservePlugin struct { TesterPlugin } +type PermitPlugin struct { + TesterPlugin + fh framework.FrameworkHandle +} + const ( reservePluginName = "reserve-plugin" prebindPluginName = "prebind-plugin" unreservePluginName = "unreserve-plugin" + permitPluginName = "permit-plugin" ) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.UnreservePlugin(&UnreservePlugin{}) +var _ = framework.PermitPlugin(&PermitPlugin{}) // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { @@ -134,6 +147,55 @@ func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framew return unresPlugin, nil } +var perPlugin = &PermitPlugin{} + +// Name returns name of the plugin. +func (pp *PermitPlugin) Name() string { + return permitPluginName +} + +// Permit implements the permit test plugin. +func (pp *PermitPlugin) Permit(pc *framework.PluginContext, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + pp.numPermitCalled++ + if pp.failPermit { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0 + } + if pp.rejectPermit { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 + } + if pp.timeoutPermit { + return framework.NewStatus(framework.Wait, ""), 3 * time.Second + } + if pp.waitAndRejectPermit || pp.waitAndAllowPermit { + if pod.Name == "waiting-pod" { + return framework.NewStatus(framework.Wait, ""), 30 * time.Second + } + // This is the signalling pod, wait until the waiting-pod is actually waiting and then either reject or allow it. + wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + w := false + pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) + return w, nil + }) + if pp.waitAndRejectPermit { + pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { + wp.Reject(fmt.Sprintf("reject pod %v", wp.GetPod().Name)) + }) + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 + } + if pp.waitAndAllowPermit { + pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow() }) + return nil, 0 + } + } + return nil, 0 +} + +// NewPermitPlugin is the factory for permit plugin. +func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) { + perPlugin.fh = fh + return perPlugin, nil +} + // TestReservePlugin tests invocation of reserve plugins. func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. @@ -181,7 +243,7 @@ func TestReservePlugin(t *testing.T) { // TestPrebindPlugin tests invocation of prebind plugins. func TestPrebindPlugin(t *testing.T) { - // Create a plugin registry for testing. Register only a reserve plugin. + // Create a plugin registry for testing. Register only a prebind plugin. registry := framework.Registry{prebindPluginName: NewPrebindPlugin} // Create the master and the scheduler with the test plugin set. @@ -336,3 +398,169 @@ func TestUnreservePlugin(t *testing.T) { cleanupPods(cs, t, []*v1.Pod{pod}) } } + +// TestPermitPlugin tests invocation of permit plugins. +func TestPermitPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a permit plugin. + registry := framework.Registry{permitPluginName: NewPermitPlugin} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "permit-plugin", nil), + false, nil, registry, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + fail bool + reject bool + timeout bool + }{ + { + fail: false, + reject: false, + timeout: false, + }, + { + fail: true, + reject: false, + timeout: false, + }, + { + fail: false, + reject: true, + timeout: false, + }, + { + fail: true, + reject: true, + timeout: false, + }, + { + fail: false, + reject: false, + timeout: true, + }, + { + fail: false, + reject: false, + timeout: true, + }, + } + + for i, test := range tests { + perPlugin.failPermit = test.fail + perPlugin.rejectPermit = test.reject + perPlugin.timeoutPermit = test.timeout + perPlugin.waitAndRejectPermit = false + perPlugin.waitAndAllowPermit = false + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + if test.fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) + } + } else { + if test.reject || test.timeout { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + } + } + } + + if perPlugin.numPermitCalled == 0 { + t.Errorf("Expected the permit plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + +// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins. +func TestCoSchedulingWithPermitPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a permit plugin. + registry := framework.Registry{permitPluginName: NewPermitPlugin} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "permit-plugin", nil), + false, nil, registry, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + waitReject bool + waitAllow bool + }{ + { + waitReject: true, + waitAllow: false, + }, + { + waitReject: false, + waitAllow: true, + }, + } + + for i, test := range tests { + perPlugin.failPermit = false + perPlugin.rejectPermit = false + perPlugin.timeoutPermit = false + perPlugin.waitAndRejectPermit = test.waitReject + perPlugin.waitAndAllowPermit = test.waitAllow + + // Create two pods. + waitingPod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "waiting-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating the waiting pod: %v", err) + } + signallingPod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "signalling-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating the signalling pod: %v", err) + } + + if test.waitReject { + if err = waitForPodUnschedulable(cs, waitingPod); err != nil { + t.Errorf("test #%v: Didn't expect the waiting pod to be scheduled. error: %v", i, err) + } + if err = waitForPodUnschedulable(cs, signallingPod); err != nil { + t.Errorf("test #%v: Didn't expect the signalling pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, waitingPod); err != nil { + t.Errorf("test #%v: Expected the waiting pod to be scheduled. error: %v", i, err) + } + if err = waitForPodToSchedule(cs, signallingPod); err != nil { + t.Errorf("test #%v: Expected the signalling pod to be scheduled. error: %v", i, err) + } + } + + if perPlugin.numPermitCalled == 0 { + t.Errorf("Expected the permit plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{waitingPod, signallingPod}) + } +}