Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: node selection via constraints #24518

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions command/agent/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) {
// Create a volume on the test node

vol := mock.HostVolumeRequest(structs.DefaultNamespace)
vol.NodePool = ""
vol.Constraints = nil
reqBody := struct {
Volumes []*structs.HostVolume
}{Volumes: []*structs.HostVolume{vol}}
Expand Down
10 changes: 4 additions & 6 deletions command/volume_create_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"github.com/hashicorp/hcl"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/mitchellh/cli"
"github.com/shoenig/test/must"
)

func TestHostVolumeCreateCommand_Run(t *testing.T) {
ci.Parallel(t)
srv, client, url := testServer(t, true, nil)
srv, client, url := testServer(t, true, func(c *agent.Config) {
c.Client.Meta = map[string]string{"rack": "foo"}
})
t.Cleanup(srv.Shutdown)

waitForNodes(t, client)
Expand All @@ -38,11 +41,6 @@ node_pool = "default"
capacity_min = "10GiB"
capacity_max = "20G"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

constraint {
attribute = "${meta.rack}"
value = "foo"
Expand Down
104 changes: 82 additions & 22 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package nomad
import (
"fmt"
"net/http"
"regexp"
"strings"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)

// HostVolume is the server RPC endpoint for host volumes
Expand Down Expand Up @@ -425,28 +427,12 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state

func (v *HostVolume) createVolume(vol *structs.HostVolume) error {

// TODO(1.10.0): proper node selection based on constraints and node
// pool. Also, should we move this into the validator step?
if vol.NodeID == "" {
var iter memdb.ResultIterator
var err error
var raw any
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return err
}
raw = iter.Next()
if raw == nil {
return fmt.Errorf("no node meets constraints for volume")
}

node := raw.(*structs.Node)
vol.NodeID = node.ID
node, err := v.placeHostVolume(vol)
if err != nil {
return fmt.Errorf("could not place volume %q: %w", vol.Name, err)
}
vol.NodeID = node.ID
vol.NodePool = node.NodePool

method := "ClientHostVolume.Create"
cReq := &cstructs.ClientHostVolumeCreateRequest{
Expand All @@ -459,7 +445,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
Parameters: vol.Parameters,
}
cResp := &cstructs.ClientHostVolumeCreateResponse{}
err := v.srv.RPC(method, cReq, cResp)
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
Expand All @@ -474,6 +460,80 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
return nil
}

// placeHostVolume finds a node that matches the node pool and constraints,
// which doesn't already have a volume by that name. It returns a non-nil Node
// or an error indicating placement failed.
func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) {

var iter memdb.ResultIterator
var err error
if vol.NodePool != "" {
iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool)
} else {
iter, err = v.srv.State().Nodes(nil)
}
if err != nil {
return nil, err
}

var checker *scheduler.ConstraintChecker

if len(vol.Constraints) > 0 {
ctx := &placementContext{
regexpCache: make(map[string]*regexp.Regexp),
versionCache: make(map[string]scheduler.VerConstraints),
semverCache: make(map[string]scheduler.VerConstraints),
}
checker = scheduler.NewConstraintChecker(ctx, vol.Constraints)
}

for {
raw := iter.Next()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall how our binpacking algorithm works for allocs. is it like this, where it's just whatever order comes out of state? I suspect, based on no real evidence, that folks won't want to binpack volumes the same way, unless they are the kind of volume that has a disk space limit, and we placed them based on available disk space.

basically, if I'm reading this right, this feels like a recipe for a full disk alert waking someone up.

I suppose their main mechanisms to avoid this would be to

  • use careful explicit constraints, which seems a little IAC-unfriendly, if they'd need a lot of specs?
  • reuse the same vol name a lot, so each instance lands on a distinct host

any other considerations I'm missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For allocs in the general scheduler (batch/service), we:

  • find all the nodes in the node pool and DC
  • shuffle them
  • iterate over them until we find 2 that are feasible (or a lot more than 2 for jobs with spread)
  • pick the best of 2

When using spread, we iterate over enough nodes to guarantee we're not putting allocs for the same job on the same host, which is effectively what we're doing here. Operators are going to want to spread volumes with the same "purpose" out because of failure domains. If the node is full, then the plugin will tell us that and we'll get an error back.

if raw == nil {
break
}
candidate := raw.(*structs.Node)

// note: this is a race if multiple users create volumes of the same
// name concurrently, but we can't solve it on the server because we
// haven't yet written to state. The client will reject requests to
// create/register a volume with the same name with a different ID.
if _, hasVol := candidate.HostVolumes[vol.Name]; hasVol {
continue
}

if checker != nil {
if ok := checker.Feasible(candidate); !ok {
continue
}
}

return candidate, nil
}

return nil, fmt.Errorf("no node meets constraints")
}

// placementContext implements the scheduler.ConstraintContext interface, a
// minimal subset of the scheduler.Context interface that we need to create a
// feasibility checker for constraints
type placementContext struct {
regexpCache map[string]*regexp.Regexp
versionCache map[string]scheduler.VerConstraints
semverCache map[string]scheduler.VerConstraints
}

func (ctx *placementContext) Metrics() *structs.AllocMetric { return &structs.AllocMetric{} }
func (ctx *placementContext) RegexpCache() map[string]*regexp.Regexp { return ctx.regexpCache }

func (ctx *placementContext) VersionConstraintCache() map[string]scheduler.VerConstraints {
return ctx.versionCache
}

func (ctx *placementContext) SemverConstraintCache() map[string]scheduler.VerConstraints {
return ctx.semverCache
}

func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *structs.HostVolumeDeleteResponse) error {

authErr := v.srv.Authenticate(v.ctx, args)
Expand Down
118 changes: 118 additions & 0 deletions nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -156,6 +157,25 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
must.EqError(t, err, "Permission denied")
})

t.Run("invalid node constraints", func(t *testing.T) {
req.Volumes[0].Constraints[0].RTarget = "r2"
req.Volumes[1].Constraints[0].RTarget = "r2"

defer func() {
req.Volumes[0].Constraints[0].RTarget = "r1"
req.Volumes[1].Constraints[0].RTarget = "r1"
}()

var resp structs.HostVolumeCreateResponse
req.AuthToken = token
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
must.EqError(t, err, `2 errors occurred:
* could not place volume "example1": no node meets constraints
* could not place volume "example2": no node meets constraints

`)
})

t.Run("valid create", func(t *testing.T) {
var resp structs.HostVolumeCreateResponse
req.AuthToken = token
Expand Down Expand Up @@ -611,6 +631,103 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
})
}

func TestHostVolumeEndpoint_placeVolume(t *testing.T) {
srv, _, cleanupSrv := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0
})
t.Cleanup(cleanupSrv)
testutil.WaitForLeader(t, srv.RPC)
store := srv.fsm.State()

endpoint := &HostVolume{
srv: srv,
logger: testlog.HCLogger(t),
}

node0, node1, node2, node3 := mock.Node(), mock.Node(), mock.Node(), mock.Node()
node0.NodePool = structs.NodePoolDefault
node1.NodePool = "dev"
node1.Meta["rack"] = "r2"
node2.NodePool = "prod"
node3.NodePool = "prod"
node3.Meta["rack"] = "r3"
node3.HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": {
Name: "example",
Path: "/srv",
}}

must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node0))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node2))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node3))

testCases := []struct {
name string
vol *structs.HostVolume
expect *structs.Node
expectErr string
}{
{
name: "only one in node pool",
vol: &structs.HostVolume{NodePool: "default"},
expect: node0,
},
{
name: "only one that matches constraints",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r2",
Operand: "=",
},
}},
expect: node1,
},
{
name: "only one available in pool",
vol: &structs.HostVolume{NodePool: "prod", Name: "example"},
expect: node2,
},
{
name: "no match",
vol: &structs.HostVolume{Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r6",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
{
name: "match already has a volume with the same name",
vol: &structs.HostVolume{
Name: "example",
Constraints: []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: "r3",
Operand: "=",
},
}},
expectErr: "no node meets constraints",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node, err := endpoint.placeHostVolume(tc.vol)
if tc.expectErr == "" {
must.NoError(t, err)
must.Eq(t, tc.expect, node)
} else {
must.EqError(t, err, tc.expectErr)
must.Nil(t, node)
}
})
}
}

// mockHostVolumeClient models client RPCs that have side-effects on the
// client host
type mockHostVolumeClient struct {
Expand All @@ -631,6 +748,7 @@ func newMockHostVolumeClient(t *testing.T, srv *Server, pool string) (*mockHostV
c.Node.NodePool = pool
// TODO(1.10.0): we'll want to have a version gate for this feature
c.Node.Attributes["nomad.version"] = version.Version
c.Node.Meta["rack"] = "r1"
}, srv.config.RPCAddr, map[string]any{"HostVolume": mockClientEndpoint})
t.Cleanup(cleanup)

Expand Down
2 changes: 2 additions & 0 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolu
if _, ok := node.HostVolumes[v.Name]; ok {
v.State = structs.HostVolumeStateReady
}
// Register RPCs for new volumes may not have the node pool set
v.NodePool = node.NodePool

// Allocations are denormalized on read, so we don't want these to be
// written to the state store.
Expand Down
6 changes: 6 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (hv *HostVolume) Validate() error {
if err := constraint.Validate(); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("invalid constraint: %v", err))
}
switch constraint.Operand {
case ConstraintDistinctHosts, ConstraintDistinctProperty:
mErr = multierror.Append(mErr, fmt.Errorf(
"invalid constraint %s: host volumes of the same name are always on distinct hosts", constraint.Operand))
default:
}
}

return mErr.ErrorOrNil()
Expand Down
7 changes: 7 additions & 0 deletions scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Context interface {
SendEvent(event interface{})
}

type ConstraintContext interface {
Metrics() *structs.AllocMetric
RegexpCache() map[string]*regexp.Regexp
VersionConstraintCache() map[string]VerConstraints
SemverConstraintCache() map[string]VerConstraints
}

// EvalCache is used to cache certain things during an evaluation
type EvalCache struct {
reCache map[string]*regexp.Regexp
Expand Down
Loading