Skip to content

Commit

Permalink
Errgroup server boot and consistency changes
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed May 8, 2024
1 parent 8c93d32 commit b655081
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 308 deletions.
231 changes: 213 additions & 18 deletions pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"encoding/base64"
"errors"
"fmt"
"math"
"os"
"path"
"path/filepath"
"syscall"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand All @@ -17,11 +20,17 @@ import (
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/pb"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/gadget-inc/dateilager/pkg/version"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
DriverName = "com.gadget.dateilager.cached"
)

type Cached struct {
pb.UnimplementedCachedServer
csi.UnimplementedIdentityServer
Expand Down Expand Up @@ -50,17 +59,202 @@ func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCach

destination := req.Path

version, err := c.WriteCache(destination)
version, err := c.writeCache(destination)
if err != nil {
return nil, err
}

return &pb.PopulateDiskCacheResponse{Version: version}, nil
}

// Fetch the cache into a spot in the staging dir
func (c *Cached) Prepare(ctx context.Context) error {
start := time.Now()
folderName, err := randomString()
if err != nil {
return err
}
newDir := path.Join(c.StagingPath, folderName)
version, count, err := c.Client.GetCache(ctx, newDir)
if err != nil {
return err
}

c.currentDir = newDir
c.currentVersion = version

logger.Info(ctx, "downloaded golden copy", key.Directory.Field(newDir), key.DurationMS.Field(time.Since(start)), key.Version.Field(version), key.Count.Field(int64(count)))
return nil
}

// GetPluginInfo returns metadata of the plugin
func (c *Cached) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
resp := &csi.GetPluginInfoResponse{
Name: DriverName,
VendorVersion: version.Version,
}

return resp, nil
}

// GetPluginCapabilities returns available capabilities of the plugin
func (c *Cached) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
resp := &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{},
}

return resp, nil
}

// Probe returns the health and readiness of the plugin
func (c *Cached) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
ready := true
if c.currentDir == "" {
ready = false
logger.Warn(ctx, "csi probe failed as daemon hasn't prepared cache yet", key.Version.Field(c.currentVersion))
}

return &csi.ProbeResponse{
Ready: &wrappers.BoolValue{
Value: ready,
},
}, nil
}

// NodeGetCapabilities returns the supported capabilities of the node server
// this driver has no capabilities like expansion or staging, because we only use it for node local volumes
func (c *Cached) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
nscaps := []*csi.NodeServiceCapability{}

return &csi.NodeGetCapabilitiesResponse{
Capabilities: nscaps,
}, nil
}

// NodeGetInfo returns the supported capabilities of the node server. This
// Usually, a CSI driver would return some interesting stuff about the node here for the controller to use to place volumes, but because we're only supporting node local volumes, we return something very basic
func (c *Cached) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: first(os.Getenv("NODE_NAME"), "dev"),
MaxVolumesPerNode: 110,
}, nil
}

func (c *Cached) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume ID must be provided")
}

if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Target Path must be provided")
}

if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
}

targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
volumeAttributes := req.GetVolumeContext()

var cachePath string
var targetPermissions os.FileMode

if suffix, exists := volumeAttributes["placeCacheAtPath"]; exists {
// running in suffix mode, desired outcome:
// - the mount point is writable by the pod
// - the cache is mounted at the suffix, and is not writable
cachePath = path.Join(targetPath, suffix)
targetPermissions = 0777
} else {
// running in unsuffixed mode, desired outcome:
// - the mount point *is* the cache, and is not writable by the pod
cachePath = targetPath
targetPermissions = 0755
}

if err := os.MkdirAll(targetPath, targetPermissions); err != nil {
return nil, fmt.Errorf("failed to create target directory %s: %s", targetPath, err)
}

if err := os.Chmod(targetPath, targetPermissions); err != nil {
return nil, fmt.Errorf("failed to change ownership of target directory %s: %s", targetPath, err)
}

version, err := c.writeCache(cachePath)
if err != nil {
return nil, err
}

logger.Info(ctx, "volume published", key.VolumeID.Field(volumeID), key.TargetPath.Field(targetPath), key.Version.Field(version))

return &csi.NodePublishVolumeResponse{}, nil
}

func (s *Cached) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Volume ID must be provided")
}

if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
}

targetPath := req.GetTargetPath()

// Clean up directory
if err := os.RemoveAll(targetPath); err != nil {
return nil, fmt.Errorf("failed to remove directory %s: %s", targetPath, err)
}

logger.Info(ctx, "volume unpublished and data removed", key.TargetPath.Field(targetPath))
return &csi.NodeUnpublishVolumeResponse{}, nil
}

// NodeGetVolumeStats returns the volume capacity statistics available for the given volume.
func (c *Cached) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats Volume ID must be provided")
}

volumePath := req.VolumePath
if volumePath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats Volume Path must be provided")
}

usedBytes, err := getFolderSize(volumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve used size statistics for volume path %s: %v", volumePath, err)
}

var stat syscall.Statfs_t
err = syscall.Statfs(volumePath, &stat)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve total size statistics for volume path %s: %v", volumePath, err)
}

// Calculate free space in bytes
freeBytes := stat.Bavail * uint64(stat.Bsize)
if freeBytes > math.MaxInt64 {
return nil, status.Errorf(codes.Internal, "total size statistics for volume path too big for int64: %d", freeBytes)
}
signedFreeBytes := int64(freeBytes)

return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: signedFreeBytes,
Total: signedFreeBytes + usedBytes,
Used: usedBytes,
Unit: csi.VolumeUsage_BYTES,
},
},
}, nil
}

// check if the destination exists, and if so, if its writable
// hardlink the golden copy into this downstream's destination, creating it if need be
func (c *Cached) WriteCache(destination string) (int64, error) {
func (c *Cached) writeCache(destination string) (int64, error) {
if c.currentDir == "" {
return -1, errors.New("no cache prepared, currentDir is nil")
}
Expand All @@ -87,24 +281,25 @@ func (c *Cached) WriteCache(destination string) (int64, error) {
return c.currentVersion, nil
}

// Fetch the cache into a spot in the staging dir
func (c *Cached) Prepare(ctx context.Context) error {
start := time.Now()
folderName, err := randomString()
if err != nil {
return err
}
newDir := path.Join(c.StagingPath, folderName)
version, count, err := c.Client.GetCache(ctx, newDir)
if err != nil {
return err
func first(one, two string) string {
if one == "" {
return two
}
return one
}

c.currentDir = newDir
c.currentVersion = version

logger.Info(ctx, "downloaded golden copy", key.Directory.Field(newDir), key.DurationMS.Field(time.Since(start)), key.Version.Field(version), key.Count.Field(int64(count)))
return nil
func getFolderSize(path string) (int64, error) {
var totalSize int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
totalSize += info.Size()
}
return nil
})
return totalSize, err
}

func randomString() (string, error) {
Expand Down
Loading

0 comments on commit b655081

Please sign in to comment.