Skip to content

Commit

Permalink
HostVolumePlugin interface and two implementations (#24497)
Browse files Browse the repository at this point in the history
* mkdir: HostVolumePluginMkdir: just creates a directory
* example-host-volume: HostVolumePluginExternal:
  plugin script that does mkfs and mount loopback

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
gulducat and tgross authored Nov 20, 2024
1 parent 23c19e5 commit a3784ca
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 25 deletions.
1 change: 1 addition & 0 deletions ci/test-core.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"client/dynamicplugins/...",
"client/fingerprint/...",
"client/hoststats/...",
"client/hostvolumemanager/...",
"client/interfaces/...",
"client/lib/...",
"client/logmon/...",
Expand Down
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/hoststats"
"github.com/hashicorp/nomad/client/hostvolumemanager"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/numalib"
Expand Down Expand Up @@ -289,6 +290,8 @@ type Client struct {
// drivermanager is responsible for managing driver plugins
drivermanager drivermanager.Manager

hostVolumeManager *hostvolumemanager.HostVolumeManager

// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
baseLabels []metrics.Label
Expand Down Expand Up @@ -532,6 +535,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)

c.hostVolumeManager = hostvolumemanager.NewHostVolumeManager(cfg.AllocMountsDir, logger)

// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the
// client, so we do that here rather than within the agent.
Expand Down
40 changes: 26 additions & 14 deletions client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package client

import (
"context"
"path/filepath"
"time"

metrics "github.com/armon/go-metrics"
Expand All @@ -23,31 +22,44 @@ func newHostVolumesEndpoint(c *Client) *HostVolume {

var hostVolumeRequestTimeout = time.Minute

func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), hostVolumeRequestTimeout)
}
func (v *HostVolume) Create(
req *cstructs.ClientHostVolumeCreateRequest,
resp *cstructs.ClientHostVolumeCreateResponse) error {

func (v *HostVolume) Create(req *cstructs.ClientHostVolumeCreateRequest, resp *cstructs.ClientHostVolumeCreateResponse) error {
defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now())
_, cancelFn := v.requestContext()
ctx, cancelFn := v.requestContext()
defer cancelFn()

// TODO(1.10.0): call into Client's host volume manager to create the work here
cresp, err := v.c.hostVolumeManager.Create(ctx, req)
if err != nil {
v.c.logger.Error("failed to create host volume", "name", req.Name, "error", err)
return err
}

resp.CapacityBytes = req.RequestedCapacityMinBytes
resp.HostPath = filepath.Join(v.c.config.AllocMountsDir, req.ID)
resp.CapacityBytes = cresp.CapacityBytes
resp.HostPath = cresp.HostPath

v.c.logger.Debug("created host volume", "id", req.ID, "path", resp.HostPath)
v.c.logger.Info("created host volume", "id", req.ID, "path", resp.HostPath)
return nil
}

func (v *HostVolume) Delete(req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error {
func (v *HostVolume) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {
defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now())
_, cancelFn := v.requestContext()
ctx, cancelFn := v.requestContext()
defer cancelFn()

// TODO(1.10.0): call into Client's host volume manager to delete the volume here
_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
if err != nil {
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
return err
}

v.c.logger.Debug("deleted host volume", "id", req.ID, "path", req.HostPath)
v.c.logger.Info("deleted host volume", "id", req.ID, "path", req.HostPath)
return nil
}

func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), hostVolumeRequestTimeout)
}
194 changes: 194 additions & 0 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package hostvolumemanager

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
)

type HostVolumePlugin interface {
Version(ctx context.Context) (string, error)
Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error)
Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error
// db TODO(1.10.0): update? resize? ??
}

type HostVolumePluginCreateResponse struct {
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
Context map[string]string `json:"context"` // metadata
}

var _ HostVolumePlugin = &HostVolumePluginMkdir{}

type HostVolumePluginMkdir struct {
ID string
TargetPath string

log hclog.Logger
}

func (p *HostVolumePluginMkdir) Version(_ context.Context) (string, error) {
return "0.0.1", nil
}

func (p *HostVolumePluginMkdir) Create(_ context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {

path := filepath.Join(p.TargetPath, req.ID)
log := p.log.With(
"operation", "create",
"volume_id", req.ID,
"path", path)
log.Debug("running plugin")

err := os.Mkdir(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
}

log.Debug("plugin ran successfully")
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
Context: map[string]string{},
}, nil
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
path := filepath.Join(p.TargetPath, req.ID)
log := p.log.With(
"operation", "delete",
"volume_id", req.ID,
"path", path)
log.Debug("running plugin")

err := os.RemoveAll(path)
if err != nil {
log.Debug("error with plugin", "error", err)
return err
}

log.Debug("plugin ran successfully")
return nil
}

var _ HostVolumePlugin = &HostVolumePluginExternal{}

type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string

log hclog.Logger
}

func (p *HostVolumePluginExternal) Version(_ context.Context) (string, error) {
return "0.0.1", nil // db TODO(1.10.0): call the plugin, use in fingerprint
}

func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {

params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null"
if err != nil {
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes),
fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes),
"PARAMETERS=" + string(params),
}

stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
if err != nil {
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, req.PluginID, err)
}

var pluginResp HostVolumePluginCreateResponse
err = json.Unmarshal(stdout, &pluginResp)
if err != nil {
return nil, err
}
return &pluginResp, nil
}

func (p *HostVolumePluginExternal) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) error {

params, err := json.Marshal(req.Parameters)
if err != nil {
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"PARAMETERS=" + string(params),
}

_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
if err != nil {
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, req.PluginID, err)
}
return nil
}

func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
op, volID string, env []string) (stdout, stderr []byte, err error) {

path := filepath.Join(p.TargetPath, volID)
log := p.log.With(
"operation", op,
"volume_id", volID,
"path", path)
log.Debug("running plugin")

// set up plugin execution
cmd := exec.CommandContext(ctx, p.Executable, op, path)

cmd.Env = append([]string{
"OPERATION=" + op,
"HOST_PATH=" + path,
}, env...)

var errBuf bytes.Buffer
cmd.Stderr = io.Writer(&errBuf)

// run the command and capture output
mErr := &multierror.Error{}
stdout, err = cmd.Output()
if err != nil {
mErr = multierror.Append(mErr, err)
}
stderr, err = io.ReadAll(&errBuf)
if err != nil {
mErr = multierror.Append(mErr, err)
}

log = log.With(
"stdout", string(stdout),
"stderr", string(stderr),
)
if mErr.ErrorOrNil() != nil {
err = helper.FlattenMultierror(mErr)
log.Debug("error with plugin", "error", err)
return stdout, stderr, err
}
log.Debug("plugin ran successfully")
return stdout, stderr, nil
}
98 changes: 98 additions & 0 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package hostvolumemanager

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
)

type HostVolumeManager struct {
log hclog.Logger
plugins *sync.Map
}

func NewHostVolumeManager(sharedMountDir string, logger hclog.Logger) *HostVolumeManager {
log := logger.Named("host_volumes")

mgr := &HostVolumeManager{
log: log,
plugins: &sync.Map{},
}
// db TODO(1.10.0): discover plugins on disk, need a new plugin dir
// TODO: how do we define the external mounter plugins? plugin configs?
mgr.setPlugin("mkdir", &HostVolumePluginMkdir{
ID: "mkdir",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "mkdir"),
})
mgr.setPlugin("example-host-volume", &HostVolumePluginExternal{
ID: "example-host-volume",
Executable: "/opt/nomad/hostvolumeplugins/example-host-volume",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "example-host-volume"),
})
return mgr
}

// db TODO(1.10.0): fingerprint elsewhere / on sighup, and SetPlugin from afar?
func (hvm *HostVolumeManager) setPlugin(id string, plug HostVolumePlugin) {
hvm.plugins.Store(id, plug)
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, bool) {
obj, ok := hvm.plugins.Load(id)
if !ok {
return nil, false
}
return obj.(HostVolumePlugin), true
}

func (hvm *HostVolumeManager) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {

plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
}

pluginResp, err := plug.Create(ctx, req)
if err != nil {
return nil, err
}

resp := &cstructs.ClientHostVolumeCreateResponse{
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}

// db TODO(1.10.0): now we need to add it to the node fingerprint!
// db TODO(1.10.0): and save it in client state!

return resp, nil
}

func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {

plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
}

err := plug.Delete(ctx, req)
if err != nil {
return nil, err
}

resp := &cstructs.ClientHostVolumeDeleteResponse{}

// db TODO(1.10.0): save the client state!

return resp, nil
}
Loading

0 comments on commit a3784ca

Please sign in to comment.