diff --git a/ci/test-core.json b/ci/test-core.json index 95f354fbe99..5ec461809ed 100644 --- a/ci/test-core.json +++ b/ci/test-core.json @@ -17,6 +17,7 @@ "client/dynamicplugins/...", "client/fingerprint/...", "client/hoststats/...", + "client/hostvolumemanager/...", "client/interfaces/...", "client/lib/...", "client/logmon/...", diff --git a/client/client.go b/client/client.go index 8d3a0805312..b10aa54752e 100644 --- a/client/client.go +++ b/client/client.go @@ -34,6 +34,7 @@ import ( "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/hoststats" + "github.com/hashicorp/nomad/client/hostvolumemanager" cinterfaces "github.com/hashicorp/nomad/client/interfaces" "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/numalib" @@ -289,6 +290,8 @@ type Client struct { // drivermanager is responsible for managing driver plugins drivermanager drivermanager.Manager + hostVolumeManager *hostvolumemanager.HostVolumeManager + // baseLabels are used when emitting tagged metrics. All client metrics will // have these tags, and optionally more. baseLabels []metrics.Label @@ -532,6 +535,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.devicemanager = devManager c.pluginManagers.RegisterAndRun(devManager) + c.hostVolumeManager = hostvolumemanager.NewHostVolumeManager(cfg.AllocMountsDir, logger) + // Set up the service registration wrapper using the Consul and Nomad // implementations. The Nomad implementation is only ever used on the // client, so we do that here rather than within the agent. diff --git a/client/host_volume_endpoint.go b/client/host_volume_endpoint.go index c22d22efde0..622fff3c492 100644 --- a/client/host_volume_endpoint.go +++ b/client/host_volume_endpoint.go @@ -5,7 +5,6 @@ package client import ( "context" - "path/filepath" "time" metrics "github.com/armon/go-metrics" @@ -23,31 +22,44 @@ func newHostVolumesEndpoint(c *Client) *HostVolume { var hostVolumeRequestTimeout = time.Minute -func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), hostVolumeRequestTimeout) -} +func (v *HostVolume) Create( + req *cstructs.ClientHostVolumeCreateRequest, + resp *cstructs.ClientHostVolumeCreateResponse) error { -func (v *HostVolume) Create(req *cstructs.ClientHostVolumeCreateRequest, resp *cstructs.ClientHostVolumeCreateResponse) error { defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now()) - _, cancelFn := v.requestContext() + ctx, cancelFn := v.requestContext() defer cancelFn() - // TODO(1.10.0): call into Client's host volume manager to create the work here + cresp, err := v.c.hostVolumeManager.Create(ctx, req) + if err != nil { + v.c.logger.Error("failed to create host volume", "name", req.Name, "error", err) + return err + } - resp.CapacityBytes = req.RequestedCapacityMinBytes - resp.HostPath = filepath.Join(v.c.config.AllocMountsDir, req.ID) + resp.CapacityBytes = cresp.CapacityBytes + resp.HostPath = cresp.HostPath - v.c.logger.Debug("created host volume", "id", req.ID, "path", resp.HostPath) + v.c.logger.Info("created host volume", "id", req.ID, "path", resp.HostPath) return nil } -func (v *HostVolume) Delete(req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error { +func (v *HostVolume) Delete( + req *cstructs.ClientHostVolumeDeleteRequest, + resp *cstructs.ClientHostVolumeDeleteResponse) error { defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now()) - _, cancelFn := v.requestContext() + ctx, cancelFn := v.requestContext() defer cancelFn() - // TODO(1.10.0): call into Client's host volume manager to delete the volume here + _, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it? + if err != nil { + v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err) + return err + } - v.c.logger.Debug("deleted host volume", "id", req.ID, "path", req.HostPath) + v.c.logger.Info("deleted host volume", "id", req.ID, "path", req.HostPath) return nil } + +func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), hostVolumeRequestTimeout) +} diff --git a/client/hostvolumemanager/host_volume_plugin.go b/client/hostvolumemanager/host_volume_plugin.go new file mode 100644 index 00000000000..357eb2ef61e --- /dev/null +++ b/client/hostvolumemanager/host_volume_plugin.go @@ -0,0 +1,194 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package hostvolumemanager + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" +) + +type HostVolumePlugin interface { + Version(ctx context.Context) (string, error) + Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) + Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error + // db TODO(1.10.0): update? resize? ?? +} + +type HostVolumePluginCreateResponse struct { + Path string `json:"path"` + SizeBytes int64 `json:"bytes"` + Context map[string]string `json:"context"` // metadata +} + +var _ HostVolumePlugin = &HostVolumePluginMkdir{} + +type HostVolumePluginMkdir struct { + ID string + TargetPath string + + log hclog.Logger +} + +func (p *HostVolumePluginMkdir) Version(_ context.Context) (string, error) { + return "0.0.1", nil +} + +func (p *HostVolumePluginMkdir) Create(_ context.Context, + req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { + + path := filepath.Join(p.TargetPath, req.ID) + log := p.log.With( + "operation", "create", + "volume_id", req.ID, + "path", path) + log.Debug("running plugin") + + err := os.Mkdir(path, 0o700) + if err != nil { + log.Debug("error with plugin", "error", err) + return nil, err + } + + log.Debug("plugin ran successfully") + return &HostVolumePluginCreateResponse{ + Path: path, + SizeBytes: 0, + Context: map[string]string{}, + }, nil +} + +func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error { + path := filepath.Join(p.TargetPath, req.ID) + log := p.log.With( + "operation", "delete", + "volume_id", req.ID, + "path", path) + log.Debug("running plugin") + + err := os.RemoveAll(path) + if err != nil { + log.Debug("error with plugin", "error", err) + return err + } + + log.Debug("plugin ran successfully") + return nil +} + +var _ HostVolumePlugin = &HostVolumePluginExternal{} + +type HostVolumePluginExternal struct { + ID string + Executable string + TargetPath string + + log hclog.Logger +} + +func (p *HostVolumePluginExternal) Version(_ context.Context) (string, error) { + return "0.0.1", nil // db TODO(1.10.0): call the plugin, use in fingerprint +} + +func (p *HostVolumePluginExternal) Create(ctx context.Context, + req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { + + params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null" + if err != nil { + return nil, fmt.Errorf("error marshaling volume pramaters: %w", err) + } + envVars := []string{ + "NODE_ID=" + req.NodeID, + "VOLUME_NAME=" + req.Name, + fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes), + fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes), + "PARAMETERS=" + string(params), + } + + stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars) + if err != nil { + return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, req.PluginID, err) + } + + var pluginResp HostVolumePluginCreateResponse + err = json.Unmarshal(stdout, &pluginResp) + if err != nil { + return nil, err + } + return &pluginResp, nil +} + +func (p *HostVolumePluginExternal) Delete(ctx context.Context, + req *cstructs.ClientHostVolumeDeleteRequest) error { + + params, err := json.Marshal(req.Parameters) + if err != nil { + return fmt.Errorf("error marshaling volume pramaters: %w", err) + } + envVars := []string{ + "NODE_ID=" + req.NodeID, + "PARAMETERS=" + string(params), + } + + _, _, err = p.runPlugin(ctx, "delete", req.ID, envVars) + if err != nil { + return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, req.PluginID, err) + } + return nil +} + +func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, + op, volID string, env []string) (stdout, stderr []byte, err error) { + + path := filepath.Join(p.TargetPath, volID) + log := p.log.With( + "operation", op, + "volume_id", volID, + "path", path) + log.Debug("running plugin") + + // set up plugin execution + cmd := exec.CommandContext(ctx, p.Executable, op, path) + + cmd.Env = append([]string{ + "OPERATION=" + op, + "HOST_PATH=" + path, + }, env...) + + var errBuf bytes.Buffer + cmd.Stderr = io.Writer(&errBuf) + + // run the command and capture output + mErr := &multierror.Error{} + stdout, err = cmd.Output() + if err != nil { + mErr = multierror.Append(mErr, err) + } + stderr, err = io.ReadAll(&errBuf) + if err != nil { + mErr = multierror.Append(mErr, err) + } + + log = log.With( + "stdout", string(stdout), + "stderr", string(stderr), + ) + if mErr.ErrorOrNil() != nil { + err = helper.FlattenMultierror(mErr) + log.Debug("error with plugin", "error", err) + return stdout, stderr, err + } + log.Debug("plugin ran successfully") + return stdout, stderr, nil +} diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go new file mode 100644 index 00000000000..4d7da7d1ea4 --- /dev/null +++ b/client/hostvolumemanager/host_volumes.go @@ -0,0 +1,98 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package hostvolumemanager + +import ( + "context" + "fmt" + "sync" + + "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +type HostVolumeManager struct { + log hclog.Logger + plugins *sync.Map +} + +func NewHostVolumeManager(sharedMountDir string, logger hclog.Logger) *HostVolumeManager { + log := logger.Named("host_volumes") + + mgr := &HostVolumeManager{ + log: log, + plugins: &sync.Map{}, + } + // db TODO(1.10.0): discover plugins on disk, need a new plugin dir + // TODO: how do we define the external mounter plugins? plugin configs? + mgr.setPlugin("mkdir", &HostVolumePluginMkdir{ + ID: "mkdir", + TargetPath: sharedMountDir, + log: log.With("plugin_id", "mkdir"), + }) + mgr.setPlugin("example-host-volume", &HostVolumePluginExternal{ + ID: "example-host-volume", + Executable: "/opt/nomad/hostvolumeplugins/example-host-volume", + TargetPath: sharedMountDir, + log: log.With("plugin_id", "example-host-volume"), + }) + return mgr +} + +// db TODO(1.10.0): fingerprint elsewhere / on sighup, and SetPlugin from afar? +func (hvm *HostVolumeManager) setPlugin(id string, plug HostVolumePlugin) { + hvm.plugins.Store(id, plug) +} + +func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, bool) { + obj, ok := hvm.plugins.Load(id) + if !ok { + return nil, false + } + return obj.(HostVolumePlugin), true +} + +func (hvm *HostVolumeManager) Create(ctx context.Context, + req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) { + + plug, ok := hvm.getPlugin(req.PluginID) + if !ok { + return nil, fmt.Errorf("no such plugin %q", req.PluginID) + } + + pluginResp, err := plug.Create(ctx, req) + if err != nil { + return nil, err + } + + resp := &cstructs.ClientHostVolumeCreateResponse{ + HostPath: pluginResp.Path, + CapacityBytes: pluginResp.SizeBytes, + } + + // db TODO(1.10.0): now we need to add it to the node fingerprint! + // db TODO(1.10.0): and save it in client state! + + return resp, nil +} + +func (hvm *HostVolumeManager) Delete(ctx context.Context, + req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) { + + plug, ok := hvm.getPlugin(req.PluginID) + if !ok { + return nil, fmt.Errorf("no such plugin %q", req.PluginID) + } + + err := plug.Delete(ctx, req) + if err != nil { + return nil, err + } + + resp := &cstructs.ClientHostVolumeDeleteResponse{} + + // db TODO(1.10.0): save the client state! + + return resp, nil +} diff --git a/client/structs/host_volumes.go b/client/structs/host_volumes.go index ba6806051aa..38d3cb2d770 100644 --- a/client/structs/host_volumes.go +++ b/client/structs/host_volumes.go @@ -45,6 +45,11 @@ type ClientHostVolumeDeleteRequest struct { // ID is a UUID-like string generated by the server. ID string + // PluginID is the name of the host volume plugin on the client that will be + // used for deleting the volume. If omitted, the client will use its default + // built-in plugin. + PluginID string + // NodeID is the node where the volume is placed. It's included in the // client RPC request so that the server can route the request to the // correct node. diff --git a/command/volume_create_host_test.go b/command/volume_create_host_test.go index f7f1cd5a57f..bd4fff6f46a 100644 --- a/command/volume_create_host_test.go +++ b/command/volume_create_host_test.go @@ -32,7 +32,7 @@ func TestHostVolumeCreateCommand_Run(t *testing.T) { namespace = "prod" name = "database" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_pool = "default" capacity_min = "10GiB" @@ -99,7 +99,7 @@ func TestHostVolume_HCLDecode(t *testing.T) { namespace = "prod" name = "database" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_pool = "default" capacity_min = "10GiB" @@ -132,7 +132,7 @@ parameters { expected: &api.HostVolume{ Namespace: "prod", Name: "database", - PluginID: "plugin_id", + PluginID: "mkdir", NodePool: "default", Constraints: []*api.Constraint{{ LTarget: "${attr.kernel.name}", @@ -165,13 +165,13 @@ parameters { namespace = "prod" name = "database" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_pool = "default" `, expected: &api.HostVolume{ Namespace: "prod", Name: "database", - PluginID: "plugin_id", + PluginID: "mkdir", NodePool: "default", }, }, @@ -182,7 +182,7 @@ node_pool = "default" namespace = "prod" name = "database" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_pool = "default" capacity_min = "a" @@ -197,7 +197,7 @@ capacity_min = "a" namespace = "prod" name = "database" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_pool = "default" constraint { diff --git a/command/volume_delete_host_test.go b/command/volume_delete_host_test.go index 0cc24645376..fde8994df33 100644 --- a/command/volume_delete_host_test.go +++ b/command/volume_delete_host_test.go @@ -36,7 +36,7 @@ func TestHostVolumeDeleteCommand(t *testing.T) { namespace = "prod" name = "example" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_id = "%s" node_pool = "default" capability { diff --git a/command/volume_status_host_test.go b/command/volume_status_host_test.go index d4784029555..f150b1f9985 100644 --- a/command/volume_status_host_test.go +++ b/command/volume_status_host_test.go @@ -60,7 +60,7 @@ func TestHostVolumeStatusCommand_List(t *testing.T) { namespace = "%s" name = "%s" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_id = "%s" node_pool = "default" capability { @@ -115,7 +115,7 @@ func TestHostVolumeStatusCommand_Get(t *testing.T) { namespace = "prod" name = "example" type = "host" -plugin_id = "plugin_id" +plugin_id = "mkdir" node_id = "%s" node_pool = "default" capability { diff --git a/demo/hostvolume/_test-plugin.sh b/demo/hostvolume/_test-plugin.sh new file mode 100755 index 00000000000..5ccd1f28a72 --- /dev/null +++ b/demo/hostvolume/_test-plugin.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +set -euo pipefail + +if [[ $# -eq 0 || "$*" =~ -h ]]; then + cat < + +Operations: + create, delete, version + any other operation will be passed to the plugin + +Environment variables: + PLUGIN: executable to run (default ./example-host-volume) + TARGET_DIR: path to place the mount dir (default /tmp, + usually {nomad data dir}/alloc_mounts) +EOF + exit +fi + +op="$1" +shift + +plugin="${PLUGIN:-./example-host-volume}" +alloc_mounts="${TARGET_DIR:-/tmp}" +uuid='74564d17-ce50-0bc1-48e5-6feaa41ede48' + +case $op in + version) + args='version' + ;; + + create) + args="create $alloc_mounts/$uuid" + export HOST_PATH="$alloc_mounts/$uuid" + export VOLUME_NAME=test + export NODE_ID=0b62d807-6101-a80f-374d-e1c430abbf47 + export CAPACITY_MAX_BYTES=50000000 # 50mb + export CAPACITY_MIN_BYTES=50000000 # 50mb + export PARAMETERS='{"a": "ayy"}' + # db TODO(1.10.0): check stdout + ;; + + delete) + args="delete $alloc_mounts/$uuid" + export HOST_PATH="$alloc_mounts/$uuid" + export PARAMETERS='{"a": "ayy"}' + ;; + + *) + args="$*" + ;; +esac + +export OPERATION="$op" +set -x +eval "$plugin $* $args" diff --git a/demo/hostvolume/example-host-volume b/demo/hostvolume/example-host-volume new file mode 100755 index 00000000000..ae0f7711326 --- /dev/null +++ b/demo/hostvolume/example-host-volume @@ -0,0 +1,104 @@ +#!/usr/bin/env bash +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +# db TODO(1.10.0): where does PATH come from here? somewhere implicit? /sbin/ and /bin/ and ...? + +set -euo pipefail + +help() { + cat < [path] + +Options: + -v|--verbose: Show shell commands (set -x) + -h|--help: Print this help text and exit + +Operations: + create: Creates and mounts the device at path (required) + required environment: + CAPACITY_MIN_BYTES + delete: Unmounts and deletes the device at path (required) + version: Outputs this plugin's version + +EOF +} + +version() { + echo "0.0.1" +} + +# parse args +[ $# -eq 0 ] && { help; exit 1; } +for arg in "$@"; do + case $arg in + -h|-help|--help) help; exit 0 ;; + version|--version) version; exit 0 ;; + -v|--verbose) set -x; shift; ;; + esac +done + +# path is required for everything else +[ $# -lt 2 ] && { echo 'path required; seek --help' 1>&2; exit 1; } +host_path="$2" + +validate_path() { + local path="$1" + if [[ ! "$path" =~ [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12} ]]; then + 1>&2 echo "expected uuid-lookin ID in the HOST_PATH; got: '$path'" + return 1 + fi +} + +is_mounted() { + awk '{print $2}' /proc/mounts | grep -q "^$1$" +} + +create_volume() { + local path="$1" + validate_path "$path" + local bytes="$2" + + # translate to mb for dd block size + local megs=$((bytes / 1024 / 1024)) # lazy, approximate + + # the extra conditionals are for idempotency + if [ ! -f "$path.ext4" ]; then + dd if=/dev/zero of="$path.ext4" bs=1M count="$megs" + # mkfs is noisy on stdout, so we send it to stderr + # to avoid breaking the JSON parsing on the client + mkfs.ext4 "$path.ext4" 1>&2 + fi + if ! is_mounted "$path"; then + mkdir -p "$path" + mount "$path.ext4" "$path" + fi +} + +delete_volume() { + local path="$1" + validate_path "$path" + is_mounted "$path" && umount "$path" + rm -rf "$path" + rm -f "$path.ext4" +} + +case "$1" in + "create") + create_volume "$host_path" "$CAPACITY_MIN_BYTES" + # output what Nomad expects + bytes="$(stat --format='%s' "$host_path.ext4")" + printf '{"path": "%s", "bytes": %s}' "$host_path", "$bytes" + ;; + "delete") + delete_volume "$host_path" ;; + *) + echo "unknown operation: $1" 1>&2 + exit 1 ;; +esac diff --git a/demo/hostvolume/host.volume.hcl b/demo/hostvolume/host.volume.hcl new file mode 100644 index 00000000000..cb0774b94e7 --- /dev/null +++ b/demo/hostvolume/host.volume.hcl @@ -0,0 +1,19 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 +name = "test" +type = "host" +plugin_id = "example-host-volume" +capacity_min = "50mb" +capacity_max = "50mb" + +capability { + access_mode = "single-node-writer" + attachment_mode = "file-system" +} + +parameters { + a = "ayy" +} + +# TODO(1.10.0): don't require node_pool +node_pool = "default" diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index 3998dabc3f2..4b4d09deac3 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -554,6 +554,7 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error { method := "ClientHostVolume.Delete" cReq := &cstructs.ClientHostVolumeDeleteRequest{ ID: vol.ID, + PluginID: vol.PluginID, NodeID: vol.NodeID, HostPath: vol.HostPath, Parameters: vol.Parameters, diff --git a/nomad/mock/host_volumes.go b/nomad/mock/host_volumes.go index 7cec8e2884a..a87b084dad3 100644 --- a/nomad/mock/host_volumes.go +++ b/nomad/mock/host_volumes.go @@ -12,7 +12,7 @@ func HostVolumeRequest(ns string) *structs.HostVolume { vol := &structs.HostVolume{ Namespace: ns, Name: "example", - PluginID: "example-plugin", + PluginID: "mkdir", NodePool: structs.NodePoolDefault, Constraints: []*structs.Constraint{ {