diff --git a/comp/core/tagger/server/README.md b/comp/core/tagger/server/README.md new file mode 100644 index 0000000000000..60e1c0cd90014 --- /dev/null +++ b/comp/core/tagger/server/README.md @@ -0,0 +1,77 @@ +## Introduction + +This package server implements a gRPC server that streams Tagger entities to remote tagger clients. + +## Behaviour + +When a client connects to the tagger grpc server, the server creates a subscription to the local tagger in order to stream tags. + +Before streaming new tag events, the server sends an initial burst to the client over the stream. This initial burst contains a snapshot of the tagger content. After the initial burst has been processed, the server will stream new tag events to the client based on the filters provided in the streaming request. + +### Cutting Events into chunks + +Sending very large messages over the grpc stream can cause the message to be dropped or rejected by the client. The limit is 4MB by default. + +To avoid such scenarios, especially when sending the initial burst, the server cuts each message into small chunks that can be easily transmitted over the stream. + +This logic is implemented in the `util.go` folder. + +We provide 2 implementations: +- `processChunksWithSplit`: splits an event slice into a small chunks where each chunk contains a contiguous slice of events. It then processes the generated chunks sequentially. This will load all chunks into memory (stored in a slice) and then return them. +- `processChunksInPlace`: this has a similar functionality as `processChunksWithSplit`, but it is more optimized in terms of memory and cpu because it processes chunks in place without any extra memory allocation. + +We keep both implementations for at least release candidate to ensure everything works well and be able to quickly revert in case of regressions. + +#### Benchmark Testing + +Benchmark tests show that using lazy chunking results in significant memory and cpu improvement: + +Go to `util_benchmark_test.go`: + +``` +// with processChunksFunc = processChunksWithSplit[int] +go test -bench BenchmarkProcessChunks -benchmem -count 6 -benchtime 100x > old.txt + +// with processChunksFunc = processChunksInPlace[int] +go test -bench BenchmarkProcessChunks -benchmem -count 6 -benchtime 100x > new.txt + +// Compare results +benchstat old.txt new.txt + +goos: linux +goarch: arm64 +pkg: github.com/DataDog/datadog-agent/comp/core/tagger/server + │ old.txt │ new.txt │ + │ sec/op │ sec/op vs base │ +ProcessChunks/100-items-10 2399.5n ± 47% 239.8n ± 4% -90.01% (p=0.002 n=6) +ProcessChunks/1000-items-10 35.072µ ± 13% 2.344µ ± 11% -93.32% (p=0.002 n=6) +ProcessChunks/10000-items-10 365.19µ ± 5% 22.56µ ± 18% -93.82% (p=0.002 n=6) +ProcessChunks/100000-items-10 3435.1µ ± 7% 222.5µ ± 16% -93.52% (p=0.002 n=6) +ProcessChunks/1000000-items-10 29.059m ± 9% 2.219m ± 31% -92.36% (p=0.002 n=6) +geomean 314.3µ 22.87µ -92.72% + + │ old.txt │ new.txt │ + │ B/op │ B/op vs base │ +ProcessChunks/100-items-10 2.969Ki ± 0% 0.000Ki ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/1000-items-10 29.02Ki ± 0% 0.00Ki ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/10000-items-10 370.7Ki ± 0% 0.0Ki ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/100000-items-10 4.165Mi ± 0% 0.000Mi ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/1000000-items-10 44.65Mi ± 0% 0.00Mi ± 0% -100.00% (p=0.002 n=6) +geomean 362.1Ki ? ¹ ² +¹ summaries must be >0 to compute geomean +² ratios must be >0 to compute geomean + + │ old.txt │ new.txt │ + │ allocs/op │ allocs/op vs base │ +ProcessChunks/100-items-10 81.00 ± 0% 0.00 ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/1000-items-10 759.0 ± 0% 0.0 ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/10000-items-10 7.514k ± 0% 0.000k ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/100000-items-10 75.02k ± 0% 0.00k ± 0% -100.00% (p=0.002 n=6) +ProcessChunks/1000000-items-10 750.0k ± 0% 0.0k ± 0% -100.00% (p=0.002 n=6) +geomean 7.638k ? ¹ ² +¹ summaries must be >0 to compute geomean +² ratios must be >0 to compute geomean +``` + + + diff --git a/comp/core/tagger/server/server.go b/comp/core/tagger/server/server.go index 58c4ef2a0667c..5e323c43c7ee2 100644 --- a/comp/core/tagger/server/server.go +++ b/comp/core/tagger/server/server.go @@ -77,6 +77,15 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu ticker := time.NewTicker(streamKeepAliveInterval) defer ticker.Stop() + + sendFunc := func(chunk []*pb.StreamTagsEvent) error { + return grpc.DoWithTimeout(func() error { + return out.Send(&pb.StreamTagsResponse{ + Events: chunk, + }) + }, taggerStreamSendTimeout) + } + for { select { case events, ok := <-subscription.EventsChan(): @@ -98,20 +107,10 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu responseEvents = append(responseEvents, e) } - // Split events into chunks and send each one - chunks := splitEvents(responseEvents, s.maxEventSize) - for _, chunk := range chunks { - err = grpc.DoWithTimeout(func() error { - return out.Send(&pb.StreamTagsResponse{ - Events: chunk, - }) - }, taggerStreamSendTimeout) - - if err != nil { - log.Warnf("error sending tagger event: %s", err) - s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() - return err - } + if err := processChunksInPlace(responseEvents, s.maxEventSize, computeTagsEventInBytes, sendFunc); err != nil { + log.Warnf("error sending tagger event: %s", err) + s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() + return err } case <-out.Context().Done(): diff --git a/comp/core/tagger/server/util.go b/comp/core/tagger/server/util.go index 28fc2c54a1f3d..538c71b4dc05e 100644 --- a/comp/core/tagger/server/util.go +++ b/comp/core/tagger/server/util.go @@ -11,15 +11,44 @@ import ( pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" ) -// splitBySize splits the given slice into contiguous non-overlapping subslices such that -// the size of each sub-slice is at most maxChunkSize. -// The size of each item is calculated using computeSize +type sizeComputerFunc[T any] func(T) int + +type consumeChunkFunc[T any] func(T) error + +// computeProtoEventSize returns the size of a tags stream event in bytes +func computeTagsEventInBytes(event *pb.StreamTagsEvent) int { return proto.Size(event) } + +// processChunksInPlace splits the passed slice into contiguous chunks such that the total size of each chunk is at most maxChunkSize +// and applies the consume function to each of these chunks // -// This function assumes that the size of each single item of the initial slice is not larger than maxChunkSize -func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T { +// The size of an item is computed with computeSize +// If an item has a size large than maxChunkSize, it is placed in a singleton chunk (chunk with one item) +// +// The consume function is applied to different chunks in-place, without any need extra memory allocation +func processChunksInPlace[T any](slice []T, maxChunkSize int, computeSize sizeComputerFunc[T], consume consumeChunkFunc[[]T]) error { + idx := 0 + for idx < len(slice) { + chunkSize := computeSize(slice[idx]) + j := idx + 1 + + for j < len(slice) { + eventSize := computeSize(slice[j]) + if chunkSize+eventSize > maxChunkSize { + break + } + chunkSize += eventSize + j++ + } + + if err := consume(slice[idx:j]); err != nil { + return err + } + idx = j + } + return nil +} - // TODO: return an iter.Seq[[]T] instead of [][]T once we upgrade to golang v1.23 - // returning iter.Seq[[]T] has better performance in terms of memory consumption +func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T { var chunks [][]T currentChunk := []T{} currentSize := 0 @@ -40,11 +69,21 @@ func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [] return chunks } -// splitEvents splits the array of events to chunks with at most maxChunkSize each -func splitEvents(events []*pb.StreamTagsEvent, maxChunkSize int) [][]*pb.StreamTagsEvent { - return splitBySize( - events, - maxChunkSize, - func(event *pb.StreamTagsEvent) int { return proto.Size(event) }, - ) +// processChunksWithSplit splits the passed slice into contiguous chunks such that the total size of each chunk is at most maxChunkSize +// and then applies the consume function to each of these chunks +// +// The size of an item is computed with computeSize +// If an item has a size large than maxChunkSize, it is placed in a singleton chunk (chunk with one item) +// +// Prefer using processChunksInPlace for better CPU and memory performance. This implementation is only kept for benchmarking purposes. +func processChunksWithSplit[T any](slice []T, maxChunkSize int, computeSize sizeComputerFunc[T], consume consumeChunkFunc[[]T]) error { + chunks := splitBySize(slice, maxChunkSize, computeSize) + + for _, chunk := range chunks { + if err := consume(chunk); err != nil { + return err + } + } + + return nil } diff --git a/comp/core/tagger/server/util_benchmark_test.go b/comp/core/tagger/server/util_benchmark_test.go new file mode 100644 index 0000000000000..49ccd243d7365 --- /dev/null +++ b/comp/core/tagger/server/util_benchmark_test.go @@ -0,0 +1,48 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package server + +import ( + "fmt" + "testing" +) + +var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000} + +const maxChunkSize = 4 +const mockItemSize = 1 + +var global []int + +func createBaseBenchmarkSlice(size int) []int { + var baseBenchmarkSlice = make([]int, size) + for i := range size { + baseBenchmarkSlice[i] = i + } + return baseBenchmarkSlice +} + +func mockComputeSize(int) int { return mockItemSize } + +func BenchmarkProcessChunks(b *testing.B) { + b.ReportAllocs() + for _, size := range benchmarkSizes { + b.Run(fmt.Sprintf("%d-items", size), func(b *testing.B) { + + // Point this to the implementation you want to benchmark + var processChunksFunc = processChunksInPlace[int] + + items := createBaseBenchmarkSlice(size) + var local []int + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = processChunksFunc(items, maxChunkSize, mockComputeSize, func(t []int) error { local = t; return nil }) + } + global = local + }) + } +} diff --git a/comp/core/tagger/server/util_test.go b/comp/core/tagger/server/util_test.go index 76f94c4988630..7a8b979b30157 100644 --- a/comp/core/tagger/server/util_test.go +++ b/comp/core/tagger/server/util_test.go @@ -6,6 +6,7 @@ package server import ( + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -16,18 +17,35 @@ type mockStreamTagsEvent struct { size int } -func TestSplitEvents(t *testing.T) { +var computeSize = func(e mockStreamTagsEvent) int { + return e.size +} + +func getConsumeFunc(slice *[][]int) consumeChunkFunc[[]mockStreamTagsEvent] { + return func(chunk []mockStreamTagsEvent) error { + ids := make([]int, 0, len(chunk)) + for _, item := range chunk { + ids = append(ids, item.id) + } + + *slice = append(*slice, ids) + return nil + } +} + +func Test(t *testing.T) { + testCases := []struct { name string events []mockStreamTagsEvent maxChunkSize int - expected [][]mockStreamTagsEvent // Expecting indices of events in chunks for easier comparison + expected [][]int // Expecting id's of events in chunks for easier comparison }{ { name: "Empty input", events: []mockStreamTagsEvent{}, maxChunkSize: 100, - expected: nil, // No chunks expected + expected: [][]int{}, }, { name: "Single event within chunk size", @@ -35,9 +53,9 @@ func TestSplitEvents(t *testing.T) { {id: 1, size: 50}, // Mock event with size 50 }, maxChunkSize: 100, - expected: [][]mockStreamTagsEvent{ + expected: [][]int{ { - {id: 1, size: 50}, // One chunk with one event + 1, // One chunk with one event }, }, }, @@ -47,9 +65,9 @@ func TestSplitEvents(t *testing.T) { {id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // Total size = 90 }, maxChunkSize: 100, - expected: [][]mockStreamTagsEvent{ + expected: [][]int{ { - {id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // All events fit in one chunk + 1, 2, 3, // All events fit in one chunk }, }, }, @@ -59,13 +77,12 @@ func TestSplitEvents(t *testing.T) { {id: 1, size: 40}, {id: 2, size: 50}, {id: 3, size: 60}, // Total size = 150 }, maxChunkSize: 100, - expected: [][]mockStreamTagsEvent{ + expected: [][]int{ { - {id: 1, size: 40}, - {id: 2, size: 50}, + 1, 2, }, { - {id: 3, size: 60}, + 3, }, // Last event in second chunk }, }, @@ -75,8 +92,8 @@ func TestSplitEvents(t *testing.T) { {id: 1, size: 50}, {id: 2, size: 50}, // Total size = 100 }, maxChunkSize: 100, - expected: [][]mockStreamTagsEvent{ - {{id: 1, size: 50}, {id: 2, size: 50}}, // Both events fit exactly in one chunk + expected: [][]int{ + {1, 2}, // Both events fit exactly in one chunk }, }, { @@ -85,21 +102,35 @@ func TestSplitEvents(t *testing.T) { {id: 1, size: 100}, {id: 2, size: 101}, // One exactly fits, one exceeds }, maxChunkSize: 100, - expected: [][]mockStreamTagsEvent{ - { - {id: 1, size: 100}, - }, - { - {id: 2, size: 101}, - }, + expected: [][]int{ + {1}, {2}, + }, + }, + { + name: "Multiple items exceeding max chunk size", + events: []mockStreamTagsEvent{ + {id: 1, size: 100}, {id: 2, size: 101}, {id: 3, size: 101}, + }, + maxChunkSize: 100, + expected: [][]int{ + {1}, {2}, {3}, }, }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - chunks := splitBySize(testCase.events, testCase.maxChunkSize, func(e mockStreamTagsEvent) int { return e.size }) - assert.Equal(t, testCase.expected, chunks) + slice := make([][]int, 0, len(testCase.expected)) + processChunksInPlace(testCase.events, testCase.maxChunkSize, computeSize, getConsumeFunc(&slice)) + if len(testCase.expected) > 0 || len(slice) > 0 { + assert.Truef(t, reflect.DeepEqual(testCase.expected, slice), "expected %v, found %v", testCase.expected, slice) + } + + slice = make([][]int, 0, len(testCase.expected)) + processChunksWithSplit(testCase.events, testCase.maxChunkSize, computeSize, getConsumeFunc(&slice)) + if len(testCase.expected) > 0 || len(slice) > 0 { + assert.Truef(t, reflect.DeepEqual(testCase.expected, slice), "expected %v, found %v", testCase.expected, slice) + } }) } }