Skip to content

Commit

Permalink
Implement a cache daemon that can move the cache into place quickly o…
Browse files Browse the repository at this point in the history
…n demand

 - a long running background process we'll deploy as a daemonset on k8s nodes
 - moves the cache into place via hardlinking the golden copy from a shared location
 - operates using the host filesystem, and thus has pretty priviledged access
  • Loading branch information
airhorns committed May 5, 2024
1 parent 3979c0a commit 492230e
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 171 deletions.
2 changes: 1 addition & 1 deletion internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
Ignores = StringSliceKey("dl.ignores")
DurationMS = DurationKey("dl.duration_ms")
CloneToProject = Int64Key("dl.clone_to_project")
DownstreamID = StringKey("dl.downstream_id")
CachePath = StringKey("dl.cache_path")
)

var (
Expand Down
39 changes: 19 additions & 20 deletions internal/pb/cache.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/pb/cache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ service Cache {
returns (PopulateDiskCacheResponse);
}

message PopulateDiskCacheRequest { string downstream_id = 1; }
message PopulateDiskCacheRequest { string path = 1; }

message PopulateDiskCacheResponse { int64 version = 1; };
104 changes: 40 additions & 64 deletions pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,96 +8,83 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"regexp"
"time"

"github.com/gadget-inc/dateilager/internal/files"
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/pb"
"github.com/gadget-inc/dateilager/pkg/client"
"golang.org/x/sys/unix"
)

/**
* The set of paths configuring where a cache daemon will drop caches for downstream clients
*
* For example, if the paths have these values:
* StagingPath: /var/lib/dateilager/staging
* DownstreamsRootPath: /var/lib/dateilager/root
* DownstreamPathSuffix: /volumes/something
*
* the cache for a downstream with id=1234 will be placed at:
* /var/lib/dateilager/root/1234/volumes/something
*
* and the cache for a downstream with id=abcd will be placed at:
* /var/lib/dateilager/root/abcd/volumes/something
*
* Each will be hard linked from the golden copy stored in /var/lib/dateilager/staging/<some-tmp-dir>
*/
type CachePaths struct {
// the root dir where we're going to stage downloaded caches */
StagingPath string
// the root dir that holds one sub-directory per downstream client of this daemon */
DownstreamsRootPath string
// a path to add to each downstream's dir when placing the cache
DownstreamPathSuffix string
}

type Cached struct {
pb.UnimplementedCacheServer
Client *client.Client
Paths *CachePaths
Client *client.Client
StagingPath string
// the current directory holding a fully formed downloaded cache
currentDir string
// the current version of the cache on disk at currentDir
currentVersion int64
}

var validDownstreamIDRegexp = regexp.MustCompile(`^[a-zA-Z0-9-]+$`)

func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCacheRequest) (*pb.PopulateDiskCacheResponse, error) {

Check failure on line 31 in pkg/api/cached.go

View workflow job for this annotation

GitHub Actions / golangci

unnecessary leading newline (whitespace)
logger.Debug(ctx, "cached.PopulateDiskCache[Init]",
key.DownstreamID.Field(req.DownstreamId),
)

// ensure potentially-hostile downstream id doesn't have `.` or other path traversal characters in it
if !validDownstreamIDRegexp.MatchString(req.DownstreamId) {
return nil, validationError(ctx, req, "invalid downstream id passed", errors.New("-"))
err := requireAdminAuth(ctx)
if err != nil {
return nil, err
}

destination := path.Join(c.Paths.DownstreamsRootPath, req.DownstreamId, c.Paths.DownstreamPathSuffix)
destinationParentStat, err := os.Stat(filepath.Dir(destination))
destination := req.Path
logger.Debug(ctx, "cached.PopulateDiskCache[Init]",
key.CachePath.Field(req.Path),
)

version, err := c.WriteCache(destination)
if err != nil {
return nil, validationError(ctx, req, "error validating destination directory", err)
return nil, err
}

if !destinationParentStat.IsDir() {
return nil, validationError(ctx, req, "refusing to overwrite file with directory when building cache", err)
}
logger.Debug(ctx, "cached.PopulateDiskCache[Written]",
key.CachePath.Field(destination),
)

return &pb.PopulateDiskCacheResponse{Version: version}, nil
}

// check if the destination exists, and if so, if its writable
// hardlink the golden copy into this downstream's destination, creating it if need be
func (c *Cached) WriteCache(destination string) (int64, error) {
if c.currentDir == "" {
return nil, validationError(ctx, req, "no cache prepared, currentDir is nil", err)
return -1, errors.New("no cache prepared, currentDir is nil")
}

stat, err := os.Stat(destination)
if !os.IsNotExist(err) {
if err != nil {
return -1, fmt.Errorf("failed to stat cache destination %s: %v", destination, err)
}

if !stat.IsDir() {
return -1, fmt.Errorf("failed to open cache destination %s for writing -- it is already a file", destination)
}

if unix.Access(destination, unix.W_OK) != nil {
return -1, fmt.Errorf("failed to open cache destination %s for writing -- write permission denied", destination)
}
}

// hardlink the golden copy into this downstream's destination, creating it if need be
err = files.HardlinkDir(c.currentDir, destination)
if err != nil {
return nil, validationError(ctx, req, "failed to hardlink cache", err)
return -1, fmt.Errorf("failed to hardlink cache to destination %s: %v", destination, err)
}

logger.Debug(ctx, "cached.PopulateDiskCache[Written]",
key.DownstreamID.Field(req.DownstreamId),
)

return &pb.PopulateDiskCacheResponse{Version: c.currentVersion}, nil
return c.currentVersion, nil
}

/** Fetch the cache into a spot in the staging dir */
func (c *Cached) Prepare(ctx context.Context) error {
start := time.Now()
newDir := path.Join(c.Paths.StagingPath, randomString())
newDir := path.Join(c.StagingPath, randomString())
version, count, err := c.Client.GetCache(ctx, newDir)
if err != nil {
return err
Expand All @@ -118,14 +105,3 @@ func randomString() string {
}
return base64.URLEncoding.EncodeToString(randBytes)
}

func validationError(ctx context.Context, req *pb.PopulateDiskCacheRequest, msg string, err error) error {
logger.Warn(
ctx,
fmt.Sprintf("cached.PopulateDiskCache[error] %s: %v", msg, err),
key.DownstreamID.Field(req.DownstreamId),
)

// return no details to the caller as they are potentially an attacker from a sandboxed context
return errors.New("request failed")
}
6 changes: 3 additions & 3 deletions pkg/cached/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type CacheServer struct {
Cached *api.Cached
}

func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate, paths *api.CachePaths) *CacheServer {
func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate, stagingPath string) *CacheServer {
creds := credentials.NewServerTLSFromCert(cert)

grpcServer := grpc.NewServer(
Expand All @@ -50,8 +50,8 @@ func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate
healthpb.RegisterHealthServer(grpcServer, healthServer)

cached := &api.Cached{
Client: client,
Paths: paths,
Client: client,
StagingPath: stagingPath,
}
pb.RegisterCacheServer(grpcServer, cached)

Expand Down
37 changes: 14 additions & 23 deletions pkg/cli/cachedaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/telemetry"
"github.com/gadget-inc/dateilager/pkg/api"
"github.com/gadget-inc/dateilager/pkg/cached"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/gadget-inc/dateilager/pkg/version"
Expand All @@ -31,20 +30,19 @@ func NewCacheDaemonCommand() *cobra.Command {
)

var (
level *zapcore.Level
encoding string
tracing bool
profilePath string
upstreamHost string
upstreamPort uint16
certFile string
keyFile string
port int
timeout uint
headlessHost string
stagingPath string
downstreamsPath string
downstreamsPathSuffix string
level *zapcore.Level
encoding string
tracing bool
profilePath string
upstreamHost string
upstreamPort uint16
certFile string
keyFile string
port int
timeout uint
headlessHost string
stagingPath string
downstreamsPath string
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -107,13 +105,7 @@ func NewCacheDaemonCommand() *cobra.Command {
return fmt.Errorf("failed to listen on TCP port %d: %w", port, err)
}

paths := &api.CachePaths{
StagingPath: stagingPath,
DownstreamsRootPath: downstreamsPath,
DownstreamPathSuffix: downstreamsPathSuffix,
}

s := cached.NewServer(ctx, cl, &cert, paths)
s := cached.NewServer(ctx, cl, &cert, stagingPath)

osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM)
Expand Down Expand Up @@ -159,7 +151,6 @@ func NewCacheDaemonCommand() *cobra.Command {
flags.IntVar(&port, "port", 5053, "cache API port")
flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches")
flags.StringVar(&downstreamsPath, "downstreams-path", "", "root path of where each downstream's directory sits")
flags.StringVar(&downstreamsPathSuffix, "downstream-path-suffix", "", "path where the cache will be deposited for each downstream consumer")

_ = cmd.MarkPersistentFlagRequired("staging-path")
_ = cmd.MarkPersistentFlagRequired("downstreams-path")
Expand Down
15 changes: 4 additions & 11 deletions pkg/cli/populatediskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
)

func NewCmdPopulateDiskCache() *cobra.Command {
var (
downstreamID string
)

cmd := &cobra.Command{
Use: "populate-disk-cache",
RunE: func(cmd *cobra.Command, _ []string) error {
Use: "populate-disk-cache <destination_path>",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
c := client.FromContext(ctx)

version, err := c.PopulateDiskCache(ctx, downstreamID)
version, err := c.PopulateDiskCache(ctx, args[0])
if err != nil {
return err
}
Expand All @@ -29,9 +26,5 @@ func NewCmdPopulateDiskCache() *cobra.Command {
},
}

cmd.Flags().StringVar(&downstreamID, "id", "", "ID of this client for populating the disk cache in the root")

_ = cmd.MarkFlagRequired("id")

return cmd
}
8 changes: 4 additions & 4 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,19 +951,19 @@ func (c *Client) CloneToProject(ctx context.Context, source int64, target int64,
return &response.LatestVersion, nil
}

func (c *Client) PopulateDiskCache(ctx context.Context, downstreamID string) (int64, error) {
func (c *Client) PopulateDiskCache(ctx context.Context, destination string) (int64, error) {
ctx, span := telemetry.Start(ctx, "client.populate-disk-cache", trace.WithAttributes(
key.DownstreamID.Attribute(downstreamID),
key.CachePath.Attribute(destination),
))
defer span.End()

request := &pb.PopulateDiskCacheRequest{
DownstreamId: downstreamID,
Path: destination,
}

response, err := c.cache.PopulateDiskCache(ctx, request)
if err != nil {
return 0, fmt.Errorf("populate disk cache for %s: %w", downstreamID, err)
return 0, fmt.Errorf("populate disk cache for %s: %w", destination, err)
}

return response.Version, nil
Expand Down
Loading

0 comments on commit 492230e

Please sign in to comment.