Skip to content

Commit

Permalink
Remove ReplicaSet informer (#1204)
Browse files Browse the repository at this point in the history
* Remove ReplicaSet informer

* Make vale happy

* Fix network data

* Fix documentation
  • Loading branch information
mariomac authored Sep 30, 2024
1 parent 0130316 commit 5006cbd
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 286 deletions.
25 changes: 22 additions & 3 deletions docs/sources/configure/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Each `services` entry is a map where the properties can be grouped according to
| `name` | -- | string | (see description) |

Defines a name for the matching instrumented service. It will be used to populate the `service.name`
OTEL property and/or the `service_name` prometheus property in the exported metrics/traces.
OTEL property and the `service_name` Prometheus property in the exported metrics/traces.

If the property is not set, it will default to any of the following properties, in order of
precedence:
Expand All @@ -302,7 +302,7 @@ precedence:
1. The name of the Deployment that runs the instrumented process, if any.
2. The name of the ReplicaSet/DaemonSet/StatefulSet that runs the instrumented process, if any.
3. The name of the Pod that runs the instrumented process.
- If kubernetes is not enabled:
- If Kubernetes is not enabled:
1. The name of the process executable file.

If multiple processes match the service selection criteria described below,
Expand Down Expand Up @@ -511,7 +511,7 @@ To disable the automatic HTTP request timeout feature, set this option to zero,
| `high_request_volume` | `BEYLA_BPF_HIGH_REQUEST_VOLUME` | boolean | (false) |

Configures the HTTP tracer heuristic to send telemetry events as soon as a response is detected.
Setting this option reduces the acuracy of timings for requests with large responses, however,
Setting this option reduces the accuracy of timings for requests with large responses, however,
in high request volume scenarios this option will reduce the number of dropped trace events.

## Configuration of metrics and traces attributes
Expand Down Expand Up @@ -662,6 +662,25 @@ establish communication with the Kubernetes Cluster.

Usually you won't need to change this value.

| YAML | Environment variable | Type | Default |
|---------------------|--------------------------------|--------|---------|
| `disable_informers` | `BEYLA_KUBE_DISABLE_INFORMERS` | string | (empty) |

The accepted value is a list that might contain `node` and `service`.

This option allows you to selectively disable some Kubernetes informers, which are continuously
listening to the Kubernetes API to obtain the metadata that is required for decorating
network metrics or application metrics and traces.

When Beyla is deployed as a DaemonSet in very large clusters, all the Beyla instances
creating multiple informers might end up overloading the Kubernetes API.

Disabling some informers would cause reported metadata to be incomplete, but
reduces the load of the Kubernetes API.

The Pods informer can't be disabled. For that purpose, you should disable the whole
Kubernetes metadata decoration.

## Routes decorator

YAML section `routes`.
Expand Down
102 changes: 4 additions & 98 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ var (

// kubeMetadata is implemented by kube.Metadata
type kubeMetadata interface {
FetchPodOwnerInfo(pod *kube.PodInfo)
GetContainerPod(containerID string) (*kube.PodInfo, bool)
AddPodEventHandler(handler cache.ResourceEventHandler) error
AddReplicaSetEventHandler(handler cache.ResourceEventHandler) error
}

// watcherKubeEnricher keeps an update relational snapshot of the in-host process-pods-deployments,
Expand All @@ -47,7 +45,6 @@ type watcherKubeEnricher struct {
podsByOwner maps.Map2[nsName, string, *kube.PodInfo]

podsInfoCh chan Event[*kube.PodInfo]
rsInfoCh chan Event[*kube.ReplicaSetInfo]
}

type nsName struct {
Expand Down Expand Up @@ -112,27 +109,6 @@ func (wk *watcherKubeEnricher) init() error {
return fmt.Errorf("can't register watcherKubeEnricher as Pod event handler in the K8s informer: %w", err)
}

// the rsInfoCh channel will receive any update about replicasets being created or deleted
wk.rsInfoCh = make(chan Event[*kube.ReplicaSetInfo], 10)
if err := wk.informer.AddReplicaSetEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rs := obj.(*kube.ReplicaSetInfo)
d := time.Since(rs.CreationTimestamp.Time)
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: obj.(*kube.ReplicaSetInfo)}
wk.m.InformerAddDuration("replicaset", d)
},
UpdateFunc: func(_, newObj interface{}) {
rs := newObj.(*kube.ReplicaSetInfo)
d := time.Since(rs.CreationTimestamp.Time)
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: newObj.(*kube.ReplicaSetInfo)}
wk.m.InformerUpdateDuration("replicaset", d)
},
DeleteFunc: func(obj interface{}) {
wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventDeleted, Obj: obj.(*kube.ReplicaSetInfo)}
},
}); err != nil {
return fmt.Errorf("can't register watcherKubeEnricher as ReplicaSet event handler in the K8s informer: %w", err)
}
return nil
}

Expand All @@ -147,8 +123,6 @@ func (wk *watcherKubeEnricher) enrich(in <-chan []Event[processAttrs], out chan<
select {
case podEvent := <-wk.podsInfoCh:
wk.enrichPodEvent(podEvent, out)
case rsEvent := <-wk.rsInfoCh:
wk.enrichReplicaSetEvent(rsEvent, out)
case processEvents, ok := <-in:
if !ok {
wk.log.Debug("input channel closed. Stopping")
Expand All @@ -175,20 +149,6 @@ func (wk *watcherKubeEnricher) enrichPodEvent(podEvent Event[*kube.PodInfo], out
}
}

func (wk *watcherKubeEnricher) enrichReplicaSetEvent(rsEvent Event[*kube.ReplicaSetInfo], out chan<- []Event[processAttrs]) {
switch rsEvent.Type {
case EventCreated:
wk.log.Debug("ReplicaSet added", "namespace",
rsEvent.Obj.Namespace, "name", rsEvent.Obj.Name, "owner", rsEvent.Obj.Owner)
out <- wk.onNewReplicaSet(rsEvent.Obj)
case EventDeleted:
wk.log.Debug("ReplicaSet deleted", "namespace", rsEvent.Obj.Namespace, "name", rsEvent.Obj.Name)
wk.onDeletedReplicaSet(rsEvent.Obj)
// we don't forward replicaset deletion, as it will be eventually done
// when the process is removed
}
}

// enrichProcessEvent creates a copy of the process information in the input slice, but decorated with
// K8s attributes, if any. It also handles deletion of processes
func (wk *watcherKubeEnricher) enrichProcessEvent(processEvents []Event[processAttrs], out chan<- []Event[processAttrs]) {
Expand Down Expand Up @@ -225,7 +185,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs

wk.processByContainer[containerInfo.ContainerID] = procInfo

if pod, ok := wk.getPodInfo(containerInfo.ContainerID); ok {
if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok {
procInfo = withMetadata(procInfo, pod)
}
return procInfo, true
Expand All @@ -234,12 +194,6 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs
func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] {
wk.updateNewPodsByOwnerIndex(pod)

// update PodInfo with its owner's info, if any
// for each container in the Pod
// - get matching process, if available
// - forward enriched processAttrs data
wk.informer.FetchPodOwnerInfo(pod)

var events []Event[processAttrs]
for _, containerID := range pod.ContainerIDs {
if procInfo, ok := wk.processByContainer[containerID]; ok {
Expand All @@ -259,35 +213,6 @@ func (wk *watcherKubeEnricher) onDeletedPod(pod *kube.PodInfo) {
}
}

func (wk *watcherKubeEnricher) onNewReplicaSet(rsInfo *kube.ReplicaSetInfo) []Event[processAttrs] {
// for each Pod in the ReplicaSet
// for each container in the Pod
// - get matching process, if any
// - enrich and forward it
podInfos := wk.getReplicaSetPods(rsInfo.Namespace, rsInfo.Name)
var allProcesses []Event[processAttrs]
for _, pod := range podInfos {
for _, containerID := range pod.ContainerIDs {
if procInfo, ok := wk.processByContainer[containerID]; ok {
pod.Owner = &kube.Owner{
LabelName: kube.OwnerReplicaSet,
Name: rsInfo.Name,
Owner: rsInfo.Owner,
}
allProcesses = append(allProcesses, Event[processAttrs]{
Type: EventCreated,
Obj: withMetadata(procInfo, pod),
})
}
}
}
return allProcesses
}

func (wk *watcherKubeEnricher) onDeletedReplicaSet(rsInfo *kube.ReplicaSetInfo) {
wk.podsByOwner.DeleteAll(nsName{namespace: rsInfo.Namespace, name: rsInfo.Name})
}

func (wk *watcherKubeEnricher) getContainerInfo(pid PID) (container.Info, error) {
if cntInfo, ok := wk.containerByPID[pid]; ok {
return cntInfo, nil
Expand All @@ -300,25 +225,6 @@ func (wk *watcherKubeEnricher) getContainerInfo(pid PID) (container.Info, error)
return cntInfo, nil
}

func (wk *watcherKubeEnricher) getPodInfo(containerID string) (*kube.PodInfo, bool) {
if pod, ok := wk.informer.GetContainerPod(containerID); ok {
wk.informer.FetchPodOwnerInfo(pod)
return pod, true
}
return nil, false
}

func (wk *watcherKubeEnricher) getReplicaSetPods(namespace, name string) []*kube.PodInfo {
var podInfos []*kube.PodInfo
if pods, ok := wk.podsByOwner[nsName{namespace: namespace, name: name}]; ok {
podInfos = make([]*kube.PodInfo, 0, len(pods))
for _, pod := range pods {
podInfos = append(podInfos, pod)
}
}
return podInfos
}

func (wk *watcherKubeEnricher) updateNewPodsByOwnerIndex(pod *kube.PodInfo) {
if pod.Owner != nil {
wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name, pod)
Expand All @@ -342,9 +248,9 @@ func withMetadata(pp processAttrs, info *kube.PodInfo) processAttrs {

if info.Owner != nil {
ret.metadata[attr.Name(info.Owner.LabelName).Prom()] = info.Owner.Name
topName, topLabel := info.Owner.TopOwnerNameLabel()
ret.metadata[attr.Name(topLabel).Prom()] = topName
ret.metadata[services.AttrOwnerName] = topName
topOwner := info.Owner.TopOwner()
ret.metadata[attr.Name(topOwner.LabelName).Prom()] = topOwner.Name
ret.metadata[services.AttrOwnerName] = topOwner.Name
}
return ret
}
8 changes: 8 additions & 0 deletions pkg/internal/helpers/maps/bits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func TestBits_Empty(t *testing.T) {
assert.False(t, bits.Has(0b1000))
}

func TestBits_IgnoreUnknownEnums(t *testing.T) {
bits := MappedBits([]key{1, 2, 3, 40}, mapper)
assert.True(t, bits.Has(0b0001))
assert.True(t, bits.Has(0b0010))
assert.True(t, bits.Has(0b0100))
assert.False(t, bits.Has(0b1000))
}

func TestBits_Transform(t *testing.T) {
bits := MappedBits([]key{10, 30, 8910}, mapper,
WithTransform(func(k key) key { return k / 10 }))
Expand Down
Loading

0 comments on commit 5006cbd

Please sign in to comment.