Skip to content

Commit

Permalink
[RC] Optimize retrieval of raw target files in ClientGetConfigs
Browse files Browse the repository at this point in the history
ClientGetConfigs suffered from the same problem that verification of the
TUF repo suffered in that it attempts to retrieve raw target files one
by one instead of in a batched operation. This manifests if an agent has
a set of clients that causes it to receive a large number (50+) for a
specific product. The problem is made worse if there are multiple
clients requesting configs for that product, as it causes the
ClientGetConfig calls to backup due to the fact the operation is gated
by a mutex in the core service.

This mutex also blocks the main refresh loop from running, which can
result in clients timing out in the remote config backend and giving the
appearance that the clients are not running remote config properly.

The fix is to apply the same performance improvement that Verify
received, figuring out the minimal set of target files we need to pull in
advance, and then issuing a batch download operation to pull them all at
once.
  • Loading branch information
ameske committed Dec 10, 2024
1 parent 07187c1 commit b9737fa
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 85 deletions.
2 changes: 1 addition & 1 deletion pkg/config/remote/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *HTTPClient) Fetch(ctx context.Context, request *pbgo.LatestConfigsReque
}

url := c.baseURL + pollEndpoint
log.Debugf("fetching configurations at %s", url)
log.Infof("fetching configurations at %s", url)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(body))
if err != nil {
return nil, fmt.Errorf("failed to create org data request: %w", err)
Expand Down
160 changes: 85 additions & 75 deletions pkg/config/remote/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package service

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -111,50 +112,29 @@ func (s *Service) getNewDirectorRoots(uptane uptaneClient, currentVersion uint64
return roots, nil
}

func (s *Service) getTargetFiles(uptane uptaneClient, products []rdata.Product, cachedTargetFiles []*pbgo.TargetFileMeta) ([]*pbgo.File, error) {
productSet := make(map[rdata.Product]struct{})
for _, product := range products {
productSet[product] = struct{}{}
}
targets, err := uptane.Targets()
type bufferDestination struct {
bytes.Buffer
}

func (b *bufferDestination) Delete() error {
return nil
}

func (s *Service) getTargetFiles(uptaneClient uptaneClient, targetFilePaths []string) ([]*pbgo.File, error) {
files, err := uptaneClient.TargetFiles(targetFilePaths)
if err != nil {
return nil, err
}
cachedTargets := make(map[string]data.FileMeta)
for _, cachedTarget := range cachedTargetFiles {
hashes := make(data.Hashes)
for _, hash := range cachedTarget.Hashes {
h, err := hex.DecodeString(hash.Hash)
if err != nil {
return nil, err
}
hashes[hash.Algorithm] = h
}
cachedTargets[cachedTarget.Path] = data.FileMeta{
Hashes: hashes,
Length: cachedTarget.Length,
}
}

var configFiles []*pbgo.File
for targetPath, targetMeta := range targets {
configPathMeta, err := rdata.ParseConfigPath(targetPath)
if err != nil {
return nil, err
}
if _, inClientProducts := productSet[rdata.Product(configPathMeta.Product)]; inClientProducts {
if notEqualErr := tufutil.FileMetaEqual(cachedTargets[targetPath], targetMeta.FileMeta); notEqualErr == nil {
continue
}
fileContents, err := uptane.TargetFile(targetPath)
if err != nil {
return nil, err
}
configFiles = append(configFiles, &pbgo.File{
Path: targetPath,
Raw: fileContents,
})
}
for path, contents := range files {
// Note: This unconditionally succeeds as long as we don't change bufferDestination earlier
configFiles = append(configFiles, &pbgo.File{
Path: path,
Raw: contents,
})
}

return configFiles, nil
}

Expand Down Expand Up @@ -211,6 +191,7 @@ type uptaneClient interface {
StoredOrgUUID() (string, error)
Targets() (data.TargetFiles, error)
TargetFile(path string) ([]byte, error)
TargetFiles(files []string) (map[string][]byte, error)
TargetsMeta() ([]byte, error)
TargetsCustom() ([]byte, error)
TUFVersionState() (uptane.TUFVersions, error)
Expand Down Expand Up @@ -828,36 +809,30 @@ func (s *CoreAgentService) ClientGetConfigs(_ context.Context, request *pbgo.Cli
if err != nil {
return nil, err
}
targetsRaw, err := s.uptane.TargetsMeta()

directorTargets, err := s.uptane.Targets()
if err != nil {
return nil, err
}
targetFiles, err := s.getTargetFiles(s.uptane, rdata.StringListToProduct(request.Client.Products), request.CachedTargetFiles)
matchedClientConfigs, err := executeTracerPredicates(request.Client, directorTargets)
if err != nil {
return nil, err
}

directorTargets, err := s.uptane.Targets()
neededFiles, err := filterNeededTargetFiles(matchedClientConfigs, request.CachedTargetFiles, directorTargets)
if err != nil {
return nil, err
}
matchedClientConfigs, err := executeTracerPredicates(request.Client, directorTargets)

targetFiles, err := s.getTargetFiles(s.uptane, neededFiles)
if err != nil {
return nil, err
}

// filter files to only return the ones that predicates marked for this client
matchedConfigsMap := make(map[string]interface{})
for _, configPointer := range matchedClientConfigs {
matchedConfigsMap[configPointer] = struct{}{}
}
filteredFiles := make([]*pbgo.File, 0, len(matchedClientConfigs))
for _, targetFile := range targetFiles {
if _, ok := matchedConfigsMap[targetFile.Path]; ok {
filteredFiles = append(filteredFiles, targetFile)
}
targetsRaw, err := s.uptane.TargetsMeta()
if err != nil {
return nil, err
}

canonicalTargets, err := enforceCanonicalJSON(targetsRaw)
if err != nil {
return nil, err
Expand All @@ -866,11 +841,42 @@ func (s *CoreAgentService) ClientGetConfigs(_ context.Context, request *pbgo.Cli
return &pbgo.ClientGetConfigsResponse{
Roots: roots,
Targets: canonicalTargets,
TargetFiles: filteredFiles,
TargetFiles: targetFiles,
ClientConfigs: matchedClientConfigs,
}, nil
}

func filterNeededTargetFiles(neededConfigs []string, cachedTargetFiles []*pbgo.TargetFileMeta, tufTargets data.TargetFiles) ([]string, error) {
// Build an O(1) lookup of cached target files
cachedTargetsMap := make(map[string]data.FileMeta)
for _, cachedTarget := range cachedTargetFiles {
hashes := make(data.Hashes)
for _, hash := range cachedTarget.Hashes {
h, err := hex.DecodeString(hash.Hash)
if err != nil {
return nil, err
}
hashes[hash.Algorithm] = h
}
cachedTargetsMap[cachedTarget.Path] = data.FileMeta{
Hashes: hashes,
Length: cachedTarget.Length,
}
}

// We don't need to pull the raw contents if the client already has the exact version of the file cached
filteredList := make([]string, 0, len(neededConfigs))
for _, path := range neededConfigs {
if notEqualErr := tufutil.FileMetaEqual(cachedTargetsMap[path], tufTargets[path].FileMeta); notEqualErr == nil {
continue
}

filteredList = append(filteredList, path)
}

return filteredList, nil
}

// ConfigGetState returns the state of the configuration and the director repos in the local store
func (s *CoreAgentService) ConfigGetState() (*pbgo.GetStateConfigResponse, error) {
state, err := s.uptane.State()
Expand Down Expand Up @@ -1117,29 +1123,14 @@ func (c *HTTPClient) getUpdate(
if tufVersions.DirectorTargets == currentTargetsVersion {
return nil, nil
}
roots, err := c.getNewDirectorRoots(c.uptane, currentRootVersion, tufVersions.DirectorRoot)
if err != nil {
return nil, err
}
targetsRaw, err := c.uptane.TargetsMeta()
if err != nil {
return nil, err
}
targetFiles, err := c.getTargetFiles(c.uptane, rdata.StringListToProduct(products), cachedTargetFiles)
if err != nil {
return nil, err
}

canonicalTargets, err := enforceCanonicalJSON(targetsRaw)
if err != nil {
return nil, err
}

// Filter out files that either:
// - don't correspond to the product list the client is requesting
// - have expired
directorTargets, err := c.uptane.Targets()
if err != nil {
return nil, err
}

productsMap := make(map[string]struct{})
for _, product := range products {
productsMap[product] = struct{}{}
Expand All @@ -1165,14 +1156,33 @@ func (c *HTTPClient) getUpdate(

configs = append(configs, path)
}
span.SetTag("configs.returned", configs)
span.SetTag("configs.expired", expiredConfigs)

// Gather the files and map-ify them for the state data structure
targetFiles, err := c.getTargetFiles(c.uptane, configs)
if err != nil {
return nil, err
}
fileMap := make(map[string][]byte, len(targetFiles))
for _, f := range targetFiles {
fileMap[f.Path] = f.Raw
}

span.SetTag("configs.returned", configs)
span.SetTag("configs.expired", expiredConfigs)
// Gather some TUF metadata files we need to send down
roots, err := c.getNewDirectorRoots(c.uptane, currentRootVersion, tufVersions.DirectorRoot)
if err != nil {
return nil, err
}
targetsRaw, err := c.uptane.TargetsMeta()
if err != nil {
return nil, err
}
canonicalTargets, err := enforceCanonicalJSON(targetsRaw)
if err != nil {
return nil, err
}

return &state.Update{
TUFRoots: roots,
TUFTargets: canonicalTargets,
Expand Down
21 changes: 12 additions & 9 deletions pkg/config/remote/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *mockUptane) TargetFile(path string) ([]byte, error) {
return args.Get(0).([]byte), args.Error(1)
}

func (m *mockUptane) TargetFiles(files []string) (map[string][]byte, error) {
args := m.Called(files)
return args.Get(0).(map[string][]byte), args.Error(1)
}

func (m *mockUptane) TargetsMeta() ([]byte, error) {
args := m.Called()
return args.Get(0).([]byte), args.Error(1)
Expand Down Expand Up @@ -486,8 +491,8 @@ func TestService(t *testing.T) {
}, nil)
uptaneClient.On("DirectorRoot", uint64(3)).Return(root3, nil)
uptaneClient.On("DirectorRoot", uint64(4)).Return(root4, nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/1").Return(fileAPM1, nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/2").Return(fileAPM2, nil)

uptaneClient.On("TargetFiles", []string{"datadog/2/APM_SAMPLING/id/1", "datadog/2/APM_SAMPLING/id/2"}).Return(map[string][]byte{"datadog/2/APM_SAMPLING/id/1": fileAPM1, "datadog/2/APM_SAMPLING/id/2": fileAPM2}, nil)
uptaneClient.On("Update", lastConfigResponse).Return(nil)
api.On("Fetch", mock.Anything, &pbgo.LatestConfigsRequest{
Hostname: service.hostname,
Expand All @@ -513,7 +518,7 @@ func TestService(t *testing.T) {
configResponse, err := service.ClientGetConfigs(context.Background(), &pbgo.ClientGetConfigsRequest{Client: client})
assert.NoError(t, err)
assert.ElementsMatch(t, [][]byte{canonicalRoot3, canonicalRoot4}, configResponse.Roots)
assert.ElementsMatch(t, []*pbgo.File{{Path: "datadog/2/APM_SAMPLING/id/1", Raw: fileAPM1}, {Path: "datadog/2/APM_SAMPLING/id/2", Raw: fileAPM2}}, configResponse.TargetFiles)
assert.ElementsMatch(t, []*pbgo.File{{Path: "datadog/2/APM_SAMPLING/id/1", Raw: fileAPM1}, {Path: "datadog/2/APM_SAMPLING/id/2", Raw: fileAPM2}}, configResponse.TargetFiles, nil)
assert.Equal(t, canonicalTargets, configResponse.Targets)
assert.ElementsMatch(t,
configResponse.ClientConfigs,
Expand Down Expand Up @@ -597,8 +602,7 @@ func TestServiceClientPredicates(t *testing.T) {
DirectorRoot: 1,
DirectorTargets: 5,
}, nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/1").Return([]byte(``), nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/2").Return([]byte(``), nil)
uptaneClient.On("TargetFiles", []string{"datadog/2/APM_SAMPLING/id/1", "datadog/2/APM_SAMPLING/id/2"}).Return(map[string][]byte{"datadog/2/APM_SAMPLING/id/1": []byte(``), "datadog/2/APM_SAMPLING/id/2": []byte(``)}, nil)
uptaneClient.On("Update", lastConfigResponse).Return(nil)
api.On("Fetch", mock.Anything, &pbgo.LatestConfigsRequest{
Hostname: service.hostname,
Expand Down Expand Up @@ -887,8 +891,7 @@ func TestConfigExpiration(t *testing.T) {
DirectorRoot: 1,
DirectorTargets: 5,
}, nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/1").Return([]byte(``), nil)
uptaneClient.On("TargetFile", "datadog/2/APM_SAMPLING/id/2").Return([]byte(``), nil)
uptaneClient.On("TargetFiles", []string{"datadog/2/APM_SAMPLING/id/1"}).Return(map[string][]byte{"datadog/2/APM_SAMPLING/id/1": []byte(``), "datadog/2/APM_SAMPLING/id/2": []byte(``)}, nil)
uptaneClient.On("Update", lastConfigResponse).Return(nil)
api.On("Fetch", mock.Anything, &pbgo.LatestConfigsRequest{
Hostname: service.hostname,
Expand Down Expand Up @@ -1190,7 +1193,7 @@ func TestHTTPClientRecentUpdate(t *testing.T) {
},
nil,
)
uptaneClient.On("TargetFile", "datadog/2/TESTING1/id/1").Return([]byte(`testing_1`), nil)
uptaneClient.On("TargetFiles", []string{"datadog/2/TESTING1/id/1"}).Return(map[string][]byte{"datadog/2/TESTING1/id/1": []byte(`testing_1`)}, nil)

client := setupCDNClient(t, uptaneClient)
defer client.Close()
Expand Down Expand Up @@ -1236,7 +1239,7 @@ func TestHTTPClientUpdateSuccess(t *testing.T) {
},
nil,
)
uptaneClient.On("TargetFile", "datadog/2/TESTING1/id/1").Return([]byte(`testing_1`), nil)
uptaneClient.On("TargetFiles", []string{"datadog/2/TESTING1/id/1"}).Return(map[string][]byte{"datadog/2/TESTING1/id/1": []byte(`testing_1`)}, nil)

updateErr := fmt.Errorf("uh oh")
if tt.updateSucceeds {
Expand Down
31 changes: 31 additions & 0 deletions pkg/config/remote/uptane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,37 @@ func (c *Client) TargetFile(path string) ([]byte, error) {
return c.unsafeTargetFile(path)
}

// TargetFiles returns the content of various multiple target files if the repository is in a
// verified state.
func (c *Client) TargetFiles(targetFiles []string) (map[string][]byte, error) {
c.Lock()
defer c.Unlock()

err := c.verify()
if err != nil {
return nil, err
}

// Build the storage space
destinations := make(map[string]client.Destination)
for _, path := range targetFiles {
destinations[path] = &bufferDestination{}
}

err = c.directorTUFClient.DownloadBatch(destinations)
if err != nil {
return nil, err
}

// Build the return type
files := make(map[string][]byte)
for path, contents := range destinations {
files[path] = contents.(*bufferDestination).Bytes()
}

return files, nil
}

// TargetsMeta returns the current raw targets.json meta of this uptane client
func (c *Client) TargetsMeta() ([]byte, error) {
c.Lock()
Expand Down

0 comments on commit b9737fa

Please sign in to comment.