Skip to content

Commit

Permalink
discovery: fix "too many open files" issue (#31957)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yumasi authored Dec 10, 2024
1 parent 8d89378 commit f0211e1
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 86 deletions.
6 changes: 2 additions & 4 deletions pkg/collector/corechecks/servicediscovery/module/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)
Expand All @@ -41,11 +39,11 @@ var (
)

// shouldIgnoreComm returns true if process should be ignored
func (s *discovery) shouldIgnoreComm(proc *process.Process) bool {
func (s *discovery) shouldIgnoreComm(pid int32) bool {
if s.config.ignoreComms == nil {
return false
}
commPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "comm")
commPath := kernel.HostProc(strconv.Itoa(int(pid)), "comm")
file, err := os.Open(commPath)
if err != nil {
return true
Expand Down
21 changes: 5 additions & 16 deletions pkg/collector/corechecks/servicediscovery/module/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,10 @@ func TestShouldIgnoreComm(t *testing.T) {
_ = cmd.Process.Kill()
})

var proc *process.Process
require.EventuallyWithT(t, func(collect *assert.CollectT) {
proc, err = customNewProcess(int32(cmd.Process.Pid))
assert.NoError(collect, err)
}, 2*time.Second, 100*time.Millisecond)

require.EventuallyWithT(t, func(collect *assert.CollectT) {
ignore := discovery.shouldIgnoreComm(proc)
ignore := discovery.shouldIgnoreComm(int32(cmd.Process.Pid))
assert.Equal(collect, test.ignore, ignore)
}, 500*time.Millisecond, 100*time.Millisecond)
}, 2*time.Second, 100*time.Millisecond)
})
}
}
Expand Down Expand Up @@ -193,9 +187,8 @@ func BenchmarkProcName(b *testing.B) {

for i := 0; i < b.N; i++ {
// create a new process on each iteration to eliminate name caching from the calculation
proc, err := customNewProcess(int32(cmd.Process.Pid))
if err != nil {
b.Fatal(err)
proc := &process.Process{
Pid: int32(cmd.Process.Pid),
}
comm, err := proc.Name()
if err != nil {
Expand All @@ -216,11 +209,7 @@ func BenchmarkShouldIgnoreComm(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
proc, err := customNewProcess(int32(cmd.Process.Pid))
if err != nil {
b.Fatal(err)
}
ok := discovery.shouldIgnoreComm(proc)
ok := discovery.shouldIgnoreComm(int32(cmd.Process.Pid))
if ok {
b.Fatalf("process should not have been ignored")
}
Expand Down
17 changes: 5 additions & 12 deletions pkg/collector/corechecks/servicediscovery/module/envs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,32 +151,25 @@ func TestTargetEnvs(t *testing.T) {

// BenchmarkGetEnvs benchmarks reading of all environment variables from /proc/<pid>/environ.
func BenchmarkGetEnvs(b *testing.B) {
proc, err := customNewProcess(int32(os.Getpid()))
if err != nil {
return
}
proc := &process.Process{Pid: int32(os.Getpid())}

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err = getEnvs(proc)
if err != nil {
if _, err := getEnvs(proc); err != nil {
return
}
}
}

// BenchmarkGetEnvsTarget benchmarks reading of target environment variables only from /proc/<pid>/environ.
func BenchmarkGetEnvsTarget(b *testing.B) {
proc, err := customNewProcess(int32(os.Getpid()))
if err != nil {
return
}
proc := &process.Process{Pid: int32(os.Getpid())}

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err = getTargetEnvs(proc)
if err != nil {
if _, err := getTargetEnvs(proc); err != nil {
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,9 @@ func TestShouldIgnorePid(t *testing.T) {
discovery := newDiscovery(nil)
require.NotEmpty(t, discovery)

proc, err := customNewProcess(int32(cmd.Process.Pid))
require.NoError(t, err)

require.EventuallyWithT(t, func(collect *assert.CollectT) {
// wait until the service name becomes available
info, err := discovery.getServiceInfo(proc)
info, err := discovery.getServiceInfo(int32(cmd.Process.Pid))
assert.NoError(collect, err)
assert.Equal(collect, test.service, info.ddServiceName)
}, 3*time.Second, 100*time.Millisecond)
Expand All @@ -92,7 +89,7 @@ func TestShouldIgnorePid(t *testing.T) {
}

// check saved pid to ignore
ignore := discovery.shouldIgnorePid(proc.Pid)
ignore := discovery.shouldIgnorePid(int32(cmd.Process.Pid))
require.Equal(t, test.ignore, ignore)
})
}
Expand Down
40 changes: 11 additions & 29 deletions pkg/collector/corechecks/servicediscovery/module/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,11 @@ func (s *discovery) cleanIgnoredPids(alivePids map[int32]struct{}) {

// getServiceInfo gets the service information for a process using the
// servicedetector module.
func (s *discovery) getServiceInfo(proc *process.Process) (*serviceInfo, error) {
func (s *discovery) getServiceInfo(pid int32) (*serviceInfo, error) {
proc := &process.Process{
Pid: pid,
}

cmdline, err := proc.CmdlineSlice()
if err != nil {
return nil, err
Expand Down Expand Up @@ -432,39 +436,17 @@ func (s *discovery) getServiceInfo(proc *process.Process) (*serviceInfo, error)
}, nil
}

// customNewProcess is the same implementation as process.NewProcess but without calling CreateTimeWithContext, which
// is not needed and costly for the discovery module.
func customNewProcess(pid int32) (*process.Process, error) {
p := &process.Process{
Pid: pid,
}

exists, err := process.PidExists(pid)
if err != nil {
return p, err
}
if !exists {
return p, process.ErrorProcessNotRunning
}
return p, nil
}

// maxNumberOfPorts is the maximum number of listening ports which we report per
// service.
const maxNumberOfPorts = 50

// getService gets information for a single service.
func (s *discovery) getService(context parsingContext, pid int32) *model.Service {
proc, err := customNewProcess(pid)
if err != nil {
return nil
}

if s.shouldIgnorePid(proc.Pid) {
if s.shouldIgnorePid(pid) {
return nil
}
if s.shouldIgnoreComm(proc) {
s.addIgnoredPid(proc.Pid)
if s.shouldIgnoreComm(pid) {
s.addIgnoredPid(pid)
return nil
}

Expand Down Expand Up @@ -523,7 +505,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
ports = ports[:maxNumberOfPorts]
}

rss, err := getRSS(proc)
rss, err := getRSS(pid)
if err != nil {
return nil
}
Expand All @@ -535,7 +517,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
if ok {
info = cached
} else {
info, err = s.getServiceInfo(proc)
info, err = s.getServiceInfo(pid)
if err != nil {
return nil
}
Expand All @@ -550,7 +532,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
name = info.generatedName
}
if s.shouldIgnoreService(name) {
s.addIgnoredPid(proc.Pid)
s.addIgnoredPid(pid)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,22 +1025,6 @@ func TestTagsPriority(t *testing.T) {
}
}

func BenchmarkOldProcess(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
process.NewProcess(int32(os.Getpid()))
}
}

func BenchmarkNewProcess(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
customNewProcess(int32(os.Getpid()))
}
}

func getSocketsOld(p *process.Process) ([]uint64, error) {
FDs, err := p.OpenFiles()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/corechecks/servicediscovery/module/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

Expand All @@ -28,8 +26,8 @@ var pageSize = uint64(os.Getpagesize())
// getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in
// gopsutil which does the same thing but which parses several other fields
// which we're not interested in.
func getRSS(proc *process.Process) (uint64, error) {
statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm")
func getRSS(pid int32) (uint64, error) {
statmPath := kernel.HostProc(strconv.Itoa(int(pid)), "statm")

// This file is very small so just read it fully.
contents, err := os.ReadFile(statmPath)
Expand Down

0 comments on commit f0211e1

Please sign in to comment.