Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONTP-535] optimise tagger server chunking function by processing chunks in place #31837

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions comp/core/tagger/server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## Introduction

This package server implements a gRPC server that streams Tagger entities to remote tagger clients.

## Behaviour

When a client connects to the tagger grpc server, the server creates a subscription to the local tagger in order to stream tags.

Before streaming new tag events, the server sends an initial burst to the client over the stream. This initial burst contains a snapshot of the tagger content. After the initial burst has been processed, the server will stream new tag events to the client based on the filters provided in the streaming request.

### Cutting Events into chunks

Sending very large messages over the grpc stream can cause the message to be dropped or rejected by the client. The limit is 4MB by default.

To avoid such scenarios, especially when sending the initial burst, the server cuts each message into small chunks that can be easily transmitted over the stream.

This logic is implemented in the `util.go` folder.

We provide 2 implementations:
- `processChunksWithSplit`: splits an event slice into a small chunks where each chunk contains a contiguous slice of events. It then processes the generated chunks sequentially. This will load all chunks into memory (stored in a slice) and then return them.
- `processChunksInPlace`: this has a similar functionality as `processChunksWithSplit`, but it is more optimized in terms of memory and cpu because it processes chunks in place without any extra memory allocation.

We keep both implementations for at least release candidate to ensure everything works well and be able to quickly revert in case of regressions.

#### Benchmark Testing

Benchmark tests show that using lazy chunking results in significant memory and cpu improvement:

Go to `util_benchmark_test.go`:

```
// with processChunksFunc = processChunksWithSplit[int]
go test -bench BenchmarkProcessChunks -benchmem -count 6 -benchtime 100x > old.txt

// with processChunksFunc = processChunksInPlace[int]
go test -bench BenchmarkProcessChunks -benchmem -count 6 -benchtime 100x > new.txt

// Compare results
benchstat old.txt new.txt

goos: linux
goarch: arm64
pkg: github.com/DataDog/datadog-agent/comp/core/tagger/server
│ old.txt │ new.txt │
│ sec/op │ sec/op vs base │
ProcessChunks/100-items-10 2399.5n ± 47% 239.8n ± 4% -90.01% (p=0.002 n=6)
ProcessChunks/1000-items-10 35.072µ ± 13% 2.344µ ± 11% -93.32% (p=0.002 n=6)
ProcessChunks/10000-items-10 365.19µ ± 5% 22.56µ ± 18% -93.82% (p=0.002 n=6)
ProcessChunks/100000-items-10 3435.1µ ± 7% 222.5µ ± 16% -93.52% (p=0.002 n=6)
ProcessChunks/1000000-items-10 29.059m ± 9% 2.219m ± 31% -92.36% (p=0.002 n=6)
geomean 314.3µ 22.87µ -92.72%

│ old.txt │ new.txt │
│ B/op │ B/op vs base │
ProcessChunks/100-items-10 2.969Ki ± 0% 0.000Ki ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/1000-items-10 29.02Ki ± 0% 0.00Ki ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/10000-items-10 370.7Ki ± 0% 0.0Ki ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/100000-items-10 4.165Mi ± 0% 0.000Mi ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/1000000-items-10 44.65Mi ± 0% 0.00Mi ± 0% -100.00% (p=0.002 n=6)
geomean 362.1Ki ? ¹ ²
¹ summaries must be >0 to compute geomean
² ratios must be >0 to compute geomean

│ old.txt │ new.txt │
│ allocs/op │ allocs/op vs base │
ProcessChunks/100-items-10 81.00 ± 0% 0.00 ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/1000-items-10 759.0 ± 0% 0.0 ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/10000-items-10 7.514k ± 0% 0.000k ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/100000-items-10 75.02k ± 0% 0.00k ± 0% -100.00% (p=0.002 n=6)
ProcessChunks/1000000-items-10 750.0k ± 0% 0.0k ± 0% -100.00% (p=0.002 n=6)
geomean 7.638k ? ¹ ²
¹ summaries must be >0 to compute geomean
² ratios must be >0 to compute geomean
```



27 changes: 13 additions & 14 deletions comp/core/tagger/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu

ticker := time.NewTicker(streamKeepAliveInterval)
defer ticker.Stop()

sendFunc := func(chunk []*pb.StreamTagsEvent) error {
return grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: chunk,
})
}, taggerStreamSendTimeout)
}

for {
select {
case events, ok := <-subscription.EventsChan():
Expand All @@ -98,20 +107,10 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
responseEvents = append(responseEvents, e)
}

// Split events into chunks and send each one
chunks := splitEvents(responseEvents, s.maxEventSize)
for _, chunk := range chunks {
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: chunk,
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger event: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
}
if err := processChunksInPlace(responseEvents, s.maxEventSize, computeTagsEventInBytes, sendFunc); err != nil {
log.Warnf("error sending tagger event: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
}

case <-out.Context().Done():
Expand Down
67 changes: 53 additions & 14 deletions comp/core/tagger/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,44 @@ import (
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
)

// splitBySize splits the given slice into contiguous non-overlapping subslices such that
// the size of each sub-slice is at most maxChunkSize.
// The size of each item is calculated using computeSize
type sizeComputerFunc[T any] func(T) int

type consumeChunkFunc[T any] func(T) error

// computeProtoEventSize returns the size of a tags stream event in bytes
func computeTagsEventInBytes(event *pb.StreamTagsEvent) int { return proto.Size(event) }

// processChunksInPlace splits the passed slice into contiguous chunks such that the total size of each chunk is at most maxChunkSize
// and applies the consume function to each of these chunks
//
// This function assumes that the size of each single item of the initial slice is not larger than maxChunkSize
func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T {
// The size of an item is computed with computeSize
// If an item has a size large than maxChunkSize, it is placed in a singleton chunk (chunk with one item)
//
// The consume function is applied to different chunks in-place, without any need extra memory allocation
func processChunksInPlace[T any](slice []T, maxChunkSize int, computeSize sizeComputerFunc[T], consume consumeChunkFunc[[]T]) error {
idx := 0
for idx < len(slice) {
chunkSize := computeSize(slice[idx])
j := idx + 1

for j < len(slice) {
eventSize := computeSize(slice[j])
if chunkSize+eventSize > maxChunkSize {
break
}
chunkSize += eventSize
j++
}

if err := consume(slice[idx:j]); err != nil {
return err
}
idx = j
}
return nil
}

// TODO: return an iter.Seq[[]T] instead of [][]T once we upgrade to golang v1.23
// returning iter.Seq[[]T] has better performance in terms of memory consumption
func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T {
var chunks [][]T
currentChunk := []T{}
currentSize := 0
Expand All @@ -40,11 +69,21 @@ func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) []
return chunks
}

// splitEvents splits the array of events to chunks with at most maxChunkSize each
func splitEvents(events []*pb.StreamTagsEvent, maxChunkSize int) [][]*pb.StreamTagsEvent {
return splitBySize(
events,
maxChunkSize,
func(event *pb.StreamTagsEvent) int { return proto.Size(event) },
)
// processChunksWithSplit splits the passed slice into contiguous chunks such that the total size of each chunk is at most maxChunkSize
// and then applies the consume function to each of these chunks
//
// The size of an item is computed with computeSize
// If an item has a size large than maxChunkSize, it is placed in a singleton chunk (chunk with one item)
//
// Prefer using processChunksInPlace for better CPU and memory performance. This implementation is only kept for benchmarking purposes.
func processChunksWithSplit[T any](slice []T, maxChunkSize int, computeSize sizeComputerFunc[T], consume consumeChunkFunc[[]T]) error {
chunks := splitBySize(slice, maxChunkSize, computeSize)

for _, chunk := range chunks {
if err := consume(chunk); err != nil {
return err
}
}

return nil
}
48 changes: 48 additions & 0 deletions comp/core/tagger/server/util_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package server

import (
"fmt"
"testing"
)

var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000}

const maxChunkSize = 4
const mockItemSize = 1

var global []int

func createBaseBenchmarkSlice(size int) []int {
var baseBenchmarkSlice = make([]int, size)
for i := range size {
baseBenchmarkSlice[i] = i
}
return baseBenchmarkSlice
}

func mockComputeSize(int) int { return mockItemSize }

func BenchmarkProcessChunks(b *testing.B) {
b.ReportAllocs()
for _, size := range benchmarkSizes {
b.Run(fmt.Sprintf("%d-items", size), func(b *testing.B) {

// Point this to the implementation you want to benchmark
var processChunksFunc = processChunksInPlace[int]

items := createBaseBenchmarkSlice(size)
var local []int

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = processChunksFunc(items, maxChunkSize, mockComputeSize, func(t []int) error { local = t; return nil })
}
global = local
})
}
}
75 changes: 53 additions & 22 deletions comp/core/tagger/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package server

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -16,28 +17,45 @@ type mockStreamTagsEvent struct {
size int
}

func TestSplitEvents(t *testing.T) {
var computeSize = func(e mockStreamTagsEvent) int {
return e.size
}

func getConsumeFunc(slice *[][]int) consumeChunkFunc[[]mockStreamTagsEvent] {
return func(chunk []mockStreamTagsEvent) error {
ids := make([]int, 0, len(chunk))
for _, item := range chunk {
ids = append(ids, item.id)
}

*slice = append(*slice, ids)
return nil
}
}

func Test(t *testing.T) {

testCases := []struct {
name string
events []mockStreamTagsEvent
maxChunkSize int
expected [][]mockStreamTagsEvent // Expecting indices of events in chunks for easier comparison
expected [][]int // Expecting id's of events in chunks for easier comparison
}{
{
name: "Empty input",
events: []mockStreamTagsEvent{},
maxChunkSize: 100,
expected: nil, // No chunks expected
expected: [][]int{},
},
{
name: "Single event within chunk size",
events: []mockStreamTagsEvent{
{id: 1, size: 50}, // Mock event with size 50
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
expected: [][]int{
{
{id: 1, size: 50}, // One chunk with one event
1, // One chunk with one event
},
},
},
Expand All @@ -47,9 +65,9 @@ func TestSplitEvents(t *testing.T) {
{id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // Total size = 90
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
expected: [][]int{
{
{id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // All events fit in one chunk
1, 2, 3, // All events fit in one chunk
},
},
},
Expand All @@ -59,13 +77,12 @@ func TestSplitEvents(t *testing.T) {
{id: 1, size: 40}, {id: 2, size: 50}, {id: 3, size: 60}, // Total size = 150
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
expected: [][]int{
{
{id: 1, size: 40},
{id: 2, size: 50},
1, 2,
},
{
{id: 3, size: 60},
3,
}, // Last event in second chunk
},
},
Expand All @@ -75,8 +92,8 @@ func TestSplitEvents(t *testing.T) {
{id: 1, size: 50}, {id: 2, size: 50}, // Total size = 100
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{{id: 1, size: 50}, {id: 2, size: 50}}, // Both events fit exactly in one chunk
expected: [][]int{
{1, 2}, // Both events fit exactly in one chunk
},
},
{
Expand All @@ -85,21 +102,35 @@ func TestSplitEvents(t *testing.T) {
{id: 1, size: 100}, {id: 2, size: 101}, // One exactly fits, one exceeds
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{
{id: 1, size: 100},
},
{
{id: 2, size: 101},
},
expected: [][]int{
{1}, {2},
},
},
{
name: "Multiple items exceeding max chunk size",
events: []mockStreamTagsEvent{
{id: 1, size: 100}, {id: 2, size: 101}, {id: 3, size: 101},
},
maxChunkSize: 100,
expected: [][]int{
{1}, {2}, {3},
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
chunks := splitBySize(testCase.events, testCase.maxChunkSize, func(e mockStreamTagsEvent) int { return e.size })
assert.Equal(t, testCase.expected, chunks)
slice := make([][]int, 0, len(testCase.expected))
processChunksInPlace(testCase.events, testCase.maxChunkSize, computeSize, getConsumeFunc(&slice))
if len(testCase.expected) > 0 || len(slice) > 0 {
assert.Truef(t, reflect.DeepEqual(testCase.expected, slice), "expected %v, found %v", testCase.expected, slice)
}

slice = make([][]int, 0, len(testCase.expected))
processChunksWithSplit(testCase.events, testCase.maxChunkSize, computeSize, getConsumeFunc(&slice))
if len(testCase.expected) > 0 || len(slice) > 0 {
assert.Truef(t, reflect.DeepEqual(testCase.expected, slice), "expected %v, found %v", testCase.expected, slice)
}
})
}
}
Loading