From f0211e154ff254b01a16d09a8f770c2536a8c9c4 Mon Sep 17 00:00:00 2001 From: Guillaume Pagnoux Date: Tue, 10 Dec 2024 14:35:48 +0100 Subject: [PATCH] discovery: fix "too many open files" issue (#31957) --- .../servicediscovery/module/comm.go | 6 +-- .../servicediscovery/module/comm_test.go | 21 +++------- .../servicediscovery/module/envs_test.go | 17 +++----- .../module/ignore_proc_test.go | 7 +--- .../servicediscovery/module/impl_linux.go | 40 +++++-------------- .../module/impl_linux_test.go | 16 -------- .../servicediscovery/module/stat.go | 6 +-- 7 files changed, 27 insertions(+), 86 deletions(-) diff --git a/pkg/collector/corechecks/servicediscovery/module/comm.go b/pkg/collector/corechecks/servicediscovery/module/comm.go index 7b513080aef18..675eac0d2f246 100644 --- a/pkg/collector/corechecks/servicediscovery/module/comm.go +++ b/pkg/collector/corechecks/servicediscovery/module/comm.go @@ -13,8 +13,6 @@ import ( "strconv" "strings" - "github.com/shirou/gopsutil/v3/process" - "github.com/DataDog/datadog-agent/pkg/util/kernel" ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) @@ -41,11 +39,11 @@ var ( ) // shouldIgnoreComm returns true if process should be ignored -func (s *discovery) shouldIgnoreComm(proc *process.Process) bool { +func (s *discovery) shouldIgnoreComm(pid int32) bool { if s.config.ignoreComms == nil { return false } - commPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "comm") + commPath := kernel.HostProc(strconv.Itoa(int(pid)), "comm") file, err := os.Open(commPath) if err != nil { return true diff --git a/pkg/collector/corechecks/servicediscovery/module/comm_test.go b/pkg/collector/corechecks/servicediscovery/module/comm_test.go index 1bc296913ecf3..23568d19ecd2c 100644 --- a/pkg/collector/corechecks/servicediscovery/module/comm_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/comm_test.go @@ -137,16 +137,10 @@ func TestShouldIgnoreComm(t *testing.T) { _ = cmd.Process.Kill() }) - var proc *process.Process require.EventuallyWithT(t, func(collect *assert.CollectT) { - proc, err = customNewProcess(int32(cmd.Process.Pid)) - assert.NoError(collect, err) - }, 2*time.Second, 100*time.Millisecond) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - ignore := discovery.shouldIgnoreComm(proc) + ignore := discovery.shouldIgnoreComm(int32(cmd.Process.Pid)) assert.Equal(collect, test.ignore, ignore) - }, 500*time.Millisecond, 100*time.Millisecond) + }, 2*time.Second, 100*time.Millisecond) }) } } @@ -193,9 +187,8 @@ func BenchmarkProcName(b *testing.B) { for i := 0; i < b.N; i++ { // create a new process on each iteration to eliminate name caching from the calculation - proc, err := customNewProcess(int32(cmd.Process.Pid)) - if err != nil { - b.Fatal(err) + proc := &process.Process{ + Pid: int32(cmd.Process.Pid), } comm, err := proc.Name() if err != nil { @@ -216,11 +209,7 @@ func BenchmarkShouldIgnoreComm(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - proc, err := customNewProcess(int32(cmd.Process.Pid)) - if err != nil { - b.Fatal(err) - } - ok := discovery.shouldIgnoreComm(proc) + ok := discovery.shouldIgnoreComm(int32(cmd.Process.Pid)) if ok { b.Fatalf("process should not have been ignored") } diff --git a/pkg/collector/corechecks/servicediscovery/module/envs_test.go b/pkg/collector/corechecks/servicediscovery/module/envs_test.go index 31cb3c8816fe7..3d2987797ff0a 100644 --- a/pkg/collector/corechecks/servicediscovery/module/envs_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/envs_test.go @@ -151,15 +151,12 @@ func TestTargetEnvs(t *testing.T) { // BenchmarkGetEnvs benchmarks reading of all environment variables from /proc//environ. func BenchmarkGetEnvs(b *testing.B) { - proc, err := customNewProcess(int32(os.Getpid())) - if err != nil { - return - } + proc := &process.Process{Pid: int32(os.Getpid())} + b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err = getEnvs(proc) - if err != nil { + if _, err := getEnvs(proc); err != nil { return } } @@ -167,16 +164,12 @@ func BenchmarkGetEnvs(b *testing.B) { // BenchmarkGetEnvsTarget benchmarks reading of target environment variables only from /proc//environ. func BenchmarkGetEnvsTarget(b *testing.B) { - proc, err := customNewProcess(int32(os.Getpid())) - if err != nil { - return - } + proc := &process.Process{Pid: int32(os.Getpid())} b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err = getTargetEnvs(proc) - if err != nil { + if _, err := getTargetEnvs(proc); err != nil { return } } diff --git a/pkg/collector/corechecks/servicediscovery/module/ignore_proc_test.go b/pkg/collector/corechecks/servicediscovery/module/ignore_proc_test.go index 90d0dc9d8dcb3..317455f84fbbe 100644 --- a/pkg/collector/corechecks/servicediscovery/module/ignore_proc_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/ignore_proc_test.go @@ -69,12 +69,9 @@ func TestShouldIgnorePid(t *testing.T) { discovery := newDiscovery(nil) require.NotEmpty(t, discovery) - proc, err := customNewProcess(int32(cmd.Process.Pid)) - require.NoError(t, err) - require.EventuallyWithT(t, func(collect *assert.CollectT) { // wait until the service name becomes available - info, err := discovery.getServiceInfo(proc) + info, err := discovery.getServiceInfo(int32(cmd.Process.Pid)) assert.NoError(collect, err) assert.Equal(collect, test.service, info.ddServiceName) }, 3*time.Second, 100*time.Millisecond) @@ -92,7 +89,7 @@ func TestShouldIgnorePid(t *testing.T) { } // check saved pid to ignore - ignore := discovery.shouldIgnorePid(proc.Pid) + ignore := discovery.shouldIgnorePid(int32(cmd.Process.Pid)) require.Equal(t, test.ignore, ignore) }) } diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go index 8c2b276d8f56b..c70f64efeae37 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go @@ -383,7 +383,11 @@ func (s *discovery) cleanIgnoredPids(alivePids map[int32]struct{}) { // getServiceInfo gets the service information for a process using the // servicedetector module. -func (s *discovery) getServiceInfo(proc *process.Process) (*serviceInfo, error) { +func (s *discovery) getServiceInfo(pid int32) (*serviceInfo, error) { + proc := &process.Process{ + Pid: pid, + } + cmdline, err := proc.CmdlineSlice() if err != nil { return nil, err @@ -432,39 +436,17 @@ func (s *discovery) getServiceInfo(proc *process.Process) (*serviceInfo, error) }, nil } -// customNewProcess is the same implementation as process.NewProcess but without calling CreateTimeWithContext, which -// is not needed and costly for the discovery module. -func customNewProcess(pid int32) (*process.Process, error) { - p := &process.Process{ - Pid: pid, - } - - exists, err := process.PidExists(pid) - if err != nil { - return p, err - } - if !exists { - return p, process.ErrorProcessNotRunning - } - return p, nil -} - // maxNumberOfPorts is the maximum number of listening ports which we report per // service. const maxNumberOfPorts = 50 // getService gets information for a single service. func (s *discovery) getService(context parsingContext, pid int32) *model.Service { - proc, err := customNewProcess(pid) - if err != nil { - return nil - } - - if s.shouldIgnorePid(proc.Pid) { + if s.shouldIgnorePid(pid) { return nil } - if s.shouldIgnoreComm(proc) { - s.addIgnoredPid(proc.Pid) + if s.shouldIgnoreComm(pid) { + s.addIgnoredPid(pid) return nil } @@ -523,7 +505,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service ports = ports[:maxNumberOfPorts] } - rss, err := getRSS(proc) + rss, err := getRSS(pid) if err != nil { return nil } @@ -535,7 +517,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service if ok { info = cached } else { - info, err = s.getServiceInfo(proc) + info, err = s.getServiceInfo(pid) if err != nil { return nil } @@ -550,7 +532,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service name = info.generatedName } if s.shouldIgnoreService(name) { - s.addIgnoredPid(proc.Pid) + s.addIgnoredPid(pid) return nil } diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go index e584b9816c474..6970f36651c8f 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go @@ -1025,22 +1025,6 @@ func TestTagsPriority(t *testing.T) { } } -func BenchmarkOldProcess(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - process.NewProcess(int32(os.Getpid())) - } -} - -func BenchmarkNewProcess(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - customNewProcess(int32(os.Getpid())) - } -} - func getSocketsOld(p *process.Process) ([]uint64, error) { FDs, err := p.OpenFiles() if err != nil { diff --git a/pkg/collector/corechecks/servicediscovery/module/stat.go b/pkg/collector/corechecks/servicediscovery/module/stat.go index ca894aaf0e727..9ea526f7f829d 100644 --- a/pkg/collector/corechecks/servicediscovery/module/stat.go +++ b/pkg/collector/corechecks/servicediscovery/module/stat.go @@ -16,8 +16,6 @@ import ( "strconv" "strings" - "github.com/shirou/gopsutil/v3/process" - "github.com/DataDog/datadog-agent/pkg/util/kernel" ) @@ -28,8 +26,8 @@ var pageSize = uint64(os.Getpagesize()) // getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in // gopsutil which does the same thing but which parses several other fields // which we're not interested in. -func getRSS(proc *process.Process) (uint64, error) { - statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm") +func getRSS(pid int32) (uint64, error) { + statmPath := kernel.HostProc(strconv.Itoa(int(pid)), "statm") // This file is very small so just read it fully. contents, err := os.ReadFile(statmPath)