diff --git a/pkg/resmgr/policy/metrics.go b/pkg/resmgr/policy/metrics.go index d288ea7c9..6ad9ca234 100644 --- a/pkg/resmgr/policy/metrics.go +++ b/pkg/resmgr/policy/metrics.go @@ -29,11 +29,28 @@ type Collector struct { var _ prometheus.Collector = &Collector{} -func (p *policy) registerCollector() error { - collector := &Collector{policy: p} - p.collector = collector +func (p *policy) registerMetrics() (*Collector, *SystemMetrics, error) { + pc := p.NewCollector() + sm := p.NewSystemMetrics() - return metrics.Register(p.ActivePolicy(), collector, metrics.WithGroup("policy")) + if err := pc.Register(); err != nil { + return nil, nil, err + } + if err := sm.Register(); err != nil { + return nil, nil, err + } + + return pc, sm, nil +} + +func (p *policy) NewCollector() *Collector { + return &Collector{ + policy: p, + } +} + +func (c *Collector) Register() error { + return metrics.Register(c.policy.ActivePolicy(), c, metrics.WithGroup("policy")) } func (c *Collector) Describe(ch chan<- *prometheus.Desc) { diff --git a/pkg/resmgr/policy/policy.go b/pkg/resmgr/policy/policy.go index 1fadd5166..853d1cbac 100644 --- a/pkg/resmgr/policy/policy.go +++ b/pkg/resmgr/policy/policy.go @@ -198,12 +198,13 @@ type ZoneAttribute struct { // Policy instance/state. type policy struct { - options Options // policy options - cache cache.Cache // system state cache - active Backend // our active backend - system system.System // system/HW/topology info - sendEvent SendEventFn // function to send event up to the resource manager - collector *Collector // policy metrics collector + options Options // policy options + cache cache.Cache // system state cache + active Backend // our active backend + system system.System // system/HW/topology info + sendEvent SendEventFn // function to send event up to the resource manager + pcollect *Collector // policy metrics collector + smetrics *SystemMetrics // system metrics collector } // backend is a registered Backend. @@ -226,10 +227,20 @@ func NewPolicy(backend Backend, cache cache.Cache, o *Options) (Policy, error) { active: backend, } - if err := p.registerCollector(); err != nil { + sys, err := system.DiscoverSystem() + if err != nil { + return nil, policyError("failed to discover system topology: %v", err) + } + p.system = sys + + pc, sm, err := p.registerMetrics() + if err != nil { return nil, err } + p.pcollect = pc + p.smetrics = sm + return p, nil } @@ -242,12 +253,6 @@ func (p *policy) ActivePolicy() string { // Start starts up policy, preparing it for serving requests. func (p *policy) Start(cfg interface{}) error { - sys, err := system.DiscoverSystem() - if err != nil { - return policyError("failed to discover system topology: %v", err) - } - p.system = sys - log.Info("activating '%s' policy...", p.active.Name()) if err := p.active.Setup(&BackendOptions{ @@ -273,29 +278,29 @@ func (p *policy) Reconfigure(cfg interface{}) error { // Sync synchronizes the active policy state. func (p *policy) Sync(add []cache.Container, del []cache.Container) error { - p.collector.Block() - defer p.collector.Unblock() + p.pcollect.Block() + defer p.pcollect.Unblock() return p.active.Sync(add, del) } // AllocateResources allocates resources for a container. func (p *policy) AllocateResources(c cache.Container) error { - p.collector.Block() - defer p.collector.Unblock() + p.pcollect.Block() + defer p.pcollect.Unblock() return p.active.AllocateResources(c) } // ReleaseResources release resources of a container. func (p *policy) ReleaseResources(c cache.Container) error { - p.collector.Block() - defer p.collector.Unblock() + p.pcollect.Block() + defer p.pcollect.Unblock() return p.active.ReleaseResources(c) } // UpdateResources updates resource allocations of a container. func (p *policy) UpdateResources(c cache.Container) error { - p.collector.Block() - defer p.collector.Unblock() + p.pcollect.Block() + defer p.pcollect.Unblock() return p.active.UpdateResources(c) } diff --git a/pkg/resmgr/policy/system-metrics.go b/pkg/resmgr/policy/system-metrics.go new file mode 100644 index 000000000..c8d267802 --- /dev/null +++ b/pkg/resmgr/policy/system-metrics.go @@ -0,0 +1,244 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policy + +import ( + "strconv" + + "github.com/containers/nri-plugins/pkg/metrics" + "github.com/containers/nri-plugins/pkg/resmgr/cache" + system "github.com/containers/nri-plugins/pkg/sysfs" + "github.com/containers/nri-plugins/pkg/utils/cpuset" + "github.com/prometheus/client_golang/prometheus" + v1 "k8s.io/api/core/v1" +) + +const ( + nodeCapacity = iota + nodeUsage + nodeContainers + cpuAllocation + cpuContainers + metricsCount +) + +type ( + SystemMetrics struct { + cache cache.Cache + system system.System + Nodes map[int]*NodeMetric + Cpus map[int]*CpuMetric + Metrics []*prometheus.GaugeVec + } + NodeMetric struct { + Id int + IdLabel string + Type string + Capacity int64 + Usage int64 + ContainerCount int + } + CpuMetric struct { + Id int + IdLabel string + Allocation int + ContainerCount int + } +) + +func (p *policy) NewSystemMetrics() *SystemMetrics { + s := &SystemMetrics{ + cache: p.cache, + system: p.system, + Nodes: map[int]*NodeMetric{}, + Cpus: map[int]*CpuMetric{}, + Metrics: make([]*prometheus.GaugeVec, metricsCount, metricsCount), + } + + s.Metrics[nodeCapacity] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_capacity", + Help: "Capacity of the memory node.", + }, + []string{ + "node_id", + }, + ) + s.Metrics[nodeUsage] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_usage", + Help: "Usage of the memory node", + }, + []string{ + "node_id", + }, + ) + s.Metrics[nodeContainers] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_container_count", + Help: "Number of containers assigned to the memory node.", + }, + []string{ + "node_id", + }, + ) + s.Metrics[cpuAllocation] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cpu_allocation", + Help: "Total allocation of the CPU.", + }, + []string{ + "cpu_id", + }, + ) + s.Metrics[cpuContainers] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cpu_container_count", + Help: "Number of containers assigned to the CPU.", + }, + []string{ + "cpu_id", + }, + ) + + for _, id := range s.system.NodeIDs() { + var ( + sys = s.system.Node(id) + capa, used = s.getMemInfo(sys) + node = &NodeMetric{ + Id: sys.ID(), + IdLabel: strconv.Itoa(sys.ID()), + Type: sys.GetMemoryType().String(), + Capacity: capa, + Usage: used, + } + ) + s.Nodes[id] = node + } + + for _, id := range s.system.CPUIDs() { + cpu := &CpuMetric{ + Id: id, + IdLabel: strconv.Itoa(id), + } + s.Cpus[id] = cpu + } + + return s +} + +func (s *SystemMetrics) Describe(ch chan<- *prometheus.Desc) { + s.Metrics[nodeCapacity].Describe(ch) + s.Metrics[nodeUsage].Describe(ch) + s.Metrics[nodeContainers].Describe(ch) + s.Metrics[cpuAllocation].Describe(ch) + s.Metrics[cpuContainers].Describe(ch) +} + +func (s *SystemMetrics) Collect(ch chan<- prometheus.Metric) { + s.Update() + s.Metrics[nodeCapacity].Collect(ch) + s.Metrics[nodeUsage].Collect(ch) + s.Metrics[nodeContainers].Collect(ch) + s.Metrics[cpuAllocation].Collect(ch) + s.Metrics[cpuContainers].Collect(ch) +} + +func (s *SystemMetrics) Register() error { + return metrics.Register("system", s, metrics.WithGroup("policy")) +} + +func (s *SystemMetrics) Update() { + for _, n := range s.Nodes { + sys := s.system.Node(n.Id) + capa, used := s.getMemInfo(sys) + + if n.Capacity == 0 { + n.Capacity = capa + s.Metrics[nodeCapacity].WithLabelValues(n.IdLabel).Set(float64(n.Capacity)) + } + + n.Usage = used + n.ContainerCount = 0 + } + + for _, c := range s.Cpus { + c.ContainerCount = 0 + } + + for _, ctr := range s.cache.GetContainers() { + switch ctr.GetState() { + case cache.ContainerStateCreated: + case cache.ContainerStateRunning: + default: + continue + } + + var ( + cpu, mem = s.getCpuAndMemset(ctr) + req, _ = s.getCpuResources(ctr) + ) + + for _, id := range mem.List() { + if n, ok := s.Nodes[id]; ok { + n.ContainerCount++ + } + } + + for _, id := range cpu.List() { + if c, ok := s.Cpus[id]; ok { + c.ContainerCount++ + if cpu.Size() > 0 { + c.Allocation += req / cpu.Size() + } + } + } + } + + for _, n := range s.Nodes { + s.Metrics[nodeUsage].WithLabelValues(n.IdLabel).Set(float64(n.Usage)) + } + for _, c := range s.Cpus { + s.Metrics[cpuAllocation].WithLabelValues(c.IdLabel).Set(float64(c.Allocation)) + s.Metrics[cpuContainers].WithLabelValues(c.IdLabel).Set(float64(c.ContainerCount)) + } +} + +func (s *SystemMetrics) getMemInfo(n system.Node) (capacity, used int64) { + if n != nil { + if i, _ := n.MemoryInfo(); i != nil { + return int64(i.MemTotal), int64(i.MemUsed) + } + } + return 0, 0 +} + +func (s *SystemMetrics) getCpuAndMemset(ctr cache.Container) (cpu, mem cpuset.CPUSet) { + cset, _ := cpuset.Parse(ctr.GetCpusetCpus()) + mset, _ := cpuset.Parse(ctr.GetCpusetMems()) + return cset, mset +} + +func (s *SystemMetrics) getCpuResources(ctr cache.Container) (request, limit int) { + res := ctr.GetResourceRequirements() + if qty, ok := res.Requests[v1.ResourceCPU]; ok { + request = int(qty.MilliValue()) + } + if qty, ok := res.Limits[v1.ResourceCPU]; ok { + limit = int(qty.MilliValue()) + } + + return request, limit +}