Skip to content

Commit

Permalink
resmgr: handle UpdateContainer requests better.
Browse files Browse the repository at this point in the history
Try doing a better job of detecting and handling redundant
UpdateContainer requests which do not really change any of
a container's parameters. Such requests are typically sent
by an active CPU Manager with the static policy enabled in
kubelet.

Before evaluating an update request for redundancy fill in
any missing values from the container's current resources.
Then if the request looks redundant short-circuit handling
it by sending a reply with the current resources. Otherwise
pass it on to the policy.

Signed-off-by: Krisztian Litkey <[email protected]>
  • Loading branch information
klihub authored and askervin committed Feb 6, 2024
1 parent 1924adc commit df606a9
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 8 deletions.
19 changes: 18 additions & 1 deletion pkg/resmgr/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,25 @@ type Container interface {
SetCpusetCpus(string)
// SetCpusetMems sets the cgroup cpuset.mems of the container.
SetCpusetMems(string)
// SetmemoryLimit sets the memory limit in bytes for the container.
// SetMemoryLimit sets the memory limit in bytes for the container.
SetMemoryLimit(int64)
// SetMemorySwap sets the swap limit in bytes for the container.
SetMemorySwap(int64)

// GetCPUShares gets the CFS CPU shares of the container.
GetCPUShares() int64
// GetCPUQuota gets the CFS CPU quota of the container.
GetCPUQuota() int64
// GetCPUPeriod gets the CFS CPU period of the container.
GetCPUPeriod() int64
// GetCpusetCpu gets the cgroup cpuset.cpus of the container.
GetCpusetCpus() string
// GetCpusetMems gets the cgroup cpuset.mems of the container.
GetCpusetMems() string
// GetMemoryLimit gets the memory limit in bytes for the container.
GetMemoryLimit() int64
// GetMemorySwap gets the swap limit in bytes for the container.
GetMemorySwap() int64

// GetPendingAdjusmentn clears and returns any pending adjustment for the container.
GetPendingAdjustment() *nri.ContainerAdjustment
Expand Down
129 changes: 129 additions & 0 deletions pkg/resmgr/cache/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ func (c *container) GetResourceRequirements() v1.ResourceRequirements {
}

func (c *container) SetResourceUpdates(r *nri.LinuxResources) bool {
r = mergeNRIResources(r, c.Ctr.GetLinux().GetResources())

updated := estimateResourceRequirements(r, c.GetQOSClass())

same := true
Expand Down Expand Up @@ -473,6 +475,47 @@ func (c *container) SetResourceUpdates(r *nri.LinuxResources) bool {
return !same
}

func mergeNRIResources(u *nri.LinuxResources, orig *nri.LinuxResources) *nri.LinuxResources {
log.Debug("merging resource update %+v with fallback/orig %+v", u, orig)

if u.Cpu == nil {
u.Cpu = &nri.LinuxCPU{}
}
if orig.Cpu != nil {
if u.Cpu.GetShares().GetValue() == 0 {
u.Cpu.Shares = nri.UInt64(orig.Cpu.Shares)
}
if u.Cpu.GetQuota().GetValue() == 0 {
u.Cpu.Quota = nri.Int64(orig.Cpu.Quota)
}
if u.Cpu.GetPeriod().GetValue() == 0 {
u.Cpu.Period = nri.UInt64(orig.Cpu.Period)
}
if u.Cpu.Cpus == "" {
u.Cpu.Cpus = orig.Cpu.Cpus
}
if u.Cpu.Mems == "" {
u.Cpu.Mems = orig.Cpu.Mems
}
}

if u.Memory == nil {
u.Memory = &nri.LinuxMemory{}
}
if orig.Memory != nil {
if u.Memory.GetLimit().GetValue() == 0 {
u.Memory.Limit = nri.Int64(orig.Memory.Limit)
}
if u.Memory.GetSwap().GetValue() == 0 {
u.Memory.Swap = nri.Int64(orig.Memory.Swap)
}
}

log.Debug("merged resource update: %+v", u)

return u
}

func (c *container) GetResourceUpdates() (v1.ResourceRequirements, bool) {
if c.ResourceUpdates == nil {
return v1.ResourceRequirements{}, false
Expand Down Expand Up @@ -548,6 +591,29 @@ func (c *container) InsertMount(m *Mount) {
c.markPending(NRI)
}

func (c *container) ensureLinuxResources() {
if c.Ctr.Linux == nil {
c.Ctr.Linux = &nri.LinuxContainer{}
}
if c.Ctr.Linux.Resources == nil {
c.Ctr.Linux.Resources = &nri.LinuxResources{}
}
}

func (c *container) ensureLinuxResourcesCPU() {
c.ensureLinuxResources()
if c.Ctr.Linux.Resources.Cpu == nil {
c.Ctr.Linux.Resources.Cpu = &nri.LinuxCPU{}
}
}

func (c *container) ensureLinuxResourcesMemory() {
c.ensureLinuxResources()
if c.Ctr.Linux.Resources.Memory == nil {
c.Ctr.Linux.Resources.Memory = &nri.LinuxMemory{}
}
}

func (c *container) SetCPUShares(value int64) {
switch req := c.getPendingRequest().(type) {
case *nri.ContainerAdjustment:
Expand All @@ -560,6 +626,9 @@ func (c *container) SetCPUShares(value int64) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesCPU()
c.Ctr.Linux.Resources.Cpu.Shares = nri.UInt64(value)
}

func (c *container) SetCPUQuota(value int64) {
Expand All @@ -574,6 +643,9 @@ func (c *container) SetCPUQuota(value int64) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesCPU()
c.Ctr.Linux.Resources.Cpu.Quota = nri.Int64(value)
}

func (c *container) SetCPUPeriod(value int64) {
Expand All @@ -588,6 +660,9 @@ func (c *container) SetCPUPeriod(value int64) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesCPU()
c.Ctr.Linux.Resources.Cpu.Period = nri.UInt64(uint64(value))
}

func (c *container) SetCpusetCpus(value string) {
Expand All @@ -602,6 +677,9 @@ func (c *container) SetCpusetCpus(value string) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesCPU()
c.Ctr.Linux.Resources.Cpu.Cpus = value
}

func (c *container) SetCpusetMems(value string) {
Expand All @@ -616,6 +694,9 @@ func (c *container) SetCpusetMems(value string) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesCPU()
c.Ctr.Linux.Resources.Cpu.Mems = value
}

func (c *container) SetMemoryLimit(value int64) {
Expand All @@ -630,6 +711,54 @@ func (c *container) SetMemoryLimit(value int64) {
return
}
c.markPending(NRI)

c.ensureLinuxResourcesMemory()
c.Ctr.Linux.Resources.Memory.Limit = nri.Int64(value)
}

func (c *container) SetMemorySwap(value int64) {
switch req := c.getPendingRequest().(type) {
case *nri.ContainerAdjustment:
req.SetLinuxMemorySwap(value)
case *nri.ContainerUpdate:
req.SetLinuxMemorySwap(value)
default:
log.Error("%s: can't set memory swap (%d): incorrect pending request type %T",
c.PrettyName(), value, c.request)
return
}
c.markPending(NRI)

c.ensureLinuxResourcesMemory()
c.Ctr.Linux.Resources.Memory.Swap = nri.Int64(value)
}

func (c *container) GetCPUShares() int64 {
return int64(c.Ctr.GetLinux().GetResources().GetCpu().GetShares().GetValue())
}

func (c *container) GetCPUQuota() int64 {
return c.Ctr.GetLinux().GetResources().GetCpu().GetQuota().GetValue()
}

func (c *container) GetCPUPeriod() int64 {
return int64(c.Ctr.GetLinux().GetResources().GetCpu().GetPeriod().GetValue())
}

func (c *container) GetCpusetCpus() string {
return c.Ctr.GetLinux().GetResources().GetCpu().GetCpus()
}

func (c *container) GetCpusetMems() string {
return c.Ctr.GetLinux().GetResources().GetCpu().GetMems()
}

func (c *container) GetMemoryLimit() int64 {
return c.Ctr.GetLinux().GetResources().GetMemory().GetLimit().GetValue()
}

func (c *container) GetMemorySwap() int64 {
return c.Ctr.GetLinux().GetResources().GetMemory().GetSwap().GetValue()
}

var (
Expand Down
37 changes: 30 additions & 7 deletions pkg/resmgr/nri.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,36 @@ func (p *nriPlugin) UpdateContainer(ctx context.Context, pod *api.PodSandbox, co
}

if realUpdates := c.SetResourceUpdates(res); !realUpdates {
p.Warn("UpdateContainer with identical resources, ignoring it...")
return nil, nil
}
//r := cache.EstimateResourceRequirements(res, c.GetQOSClass())

if err := m.policy.UpdateResources(c); err != nil {
return nil, fmt.Errorf("failed to update resources: %w", err)
p.Warn("UpdateContainer with identical resources, short-circuiting it...")
if v := c.GetCPUShares(); v != 0 {
c.SetCPUShares(v)
}
if v := c.GetCPUQuota(); v != 0 {
c.SetCPUQuota(v)
}
if v := c.GetCPUPeriod(); v != 0 {
c.SetCPUPeriod(v)
}
if v := c.GetCpusetCpus(); v != "" {
c.SetCpusetCpus(v)
}
if v := c.GetCpusetMems(); v != "" {
c.SetCpusetMems(v)
}
if v := c.GetMemoryLimit(); v != 0 {
c.SetMemoryLimit(v)
}
if v := c.GetMemorySwap(); v != 0 {
c.SetMemorySwap(v)
}
} else {
old := c.GetResourceRequirements()
upd, _ := c.GetResourceUpdates()
p.Warn("UpdateContainer with real resource changes: %s -> %s",
old.String(), upd.String())
if err := m.policy.UpdateResources(c); err != nil {
return nil, fmt.Errorf("failed to update resources: %w", err)
}
}

return p.getPendingUpdates(nil), nil
Expand Down

0 comments on commit df606a9

Please sign in to comment.