Skip to content

Commit

Permalink
Retry environment metadata updates (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Jul 7, 2023
1 parent 00c0ca5 commit 1db7517
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
64 changes: 55 additions & 9 deletions fly/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"path"
"sync/atomic"
"time"

"golang.org/x/exp/slog"
Expand All @@ -18,13 +19,14 @@ import (
const DefaultTimeout = 2 * time.Second

type Environment struct {
HTTPClient *http.Client
setPrimaryStatusCancel atomic.Value

Timeout time.Duration
HTTPClient *http.Client
Timeout time.Duration
}

func NewEnvironment() *Environment {
return &Environment{
e := &Environment{
HTTPClient: &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
Expand All @@ -34,23 +36,67 @@ func NewEnvironment() *Environment {
},
Timeout: DefaultTimeout,
}
e.setPrimaryStatusCancel.Store(context.CancelCauseFunc(func(error) {}))

return e
}

func (e *Environment) Type() string { return "fly.io" }

func (e *Environment) SetPrimaryStatus(ctx context.Context, isPrimary bool) error {
func (e *Environment) SetPrimaryStatus(ctx context.Context, isPrimary bool) {
const retryN = 5

appName := AppName()
if appName == "" {
slog.Info("cannot set primary status on host environment", slog.String("reason", "app name unavailable"))
return nil
slog.Debug("cannot set environment metadata", slog.String("reason", "app name unavailable"))
return
}

machineID := MachineID()
if machineID == "" {
slog.Info("cannot set primary status on host environment", slog.String("reason", "machine id unavailable"))
return nil
slog.Debug("cannot set environment metadata", slog.String("reason", "machine id unavailable"))
return
}

// Ensure we only have a single in-flight command at a time as the primary
// status can change while we are retrying. This status is only for
// informational purposes so it is not critical to correct functioning.
ctx, cancel := context.WithCancelCause(ctx)
oldCancel := e.setPrimaryStatusCancel.Swap(cancel).(context.CancelCauseFunc)
oldCancel(fmt.Errorf("interrupted by new status update"))

// Continuously retry status update in case the unix socket is unavailable
// or in case we have exceeded the rate limit. We run this in a goroutine
// so that we are not blocking the main lease loop.
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

var err error
for i := 0; ; i++ {
if err = e.setPrimaryStatus(ctx, isPrimary); err == nil {
return
} else if i >= retryN {
break
}

select {
case <-ctx.Done():
slog.Debug("cannot set environment metadata",
slog.String("reason", "context canceled"),
slog.Any("err", context.Cause(ctx)))
return
case <-ticker.C:
}
}

slog.Info("cannot set environment metadata",
slog.String("reason", "retries exceeded"),
slog.Any("err", err))
}()
}

func (e *Environment) setPrimaryStatus(ctx context.Context, isPrimary bool) error {
role := "replica"
if isPrimary {
role = "primary"
Expand All @@ -66,7 +112,7 @@ func (e *Environment) SetPrimaryStatus(ctx context.Context, isPrimary bool) erro
u := url.URL{
Scheme: "http",
Host: "localhost",
Path: path.Join("/v1", "apps", appName, "machines", machineID, "metadata", "role"),
Path: path.Join("/v1", "apps", AppName(), "machines", MachineID(), "metadata", "role"),
}
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(reqBody))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions litefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ type Environment interface {
Type() string

// SetPrimaryStatus sets marks the current node as the primary or not.
SetPrimaryStatus(ctx context.Context, isPrimary bool) error
SetPrimaryStatus(ctx context.Context, isPrimary bool)
}

type nopEnvironment struct{}

func (*nopEnvironment) Type() string { return "" }

func (*nopEnvironment) SetPrimaryStatus(ctx context.Context, v bool) error { return nil }
func (*nopEnvironment) SetPrimaryStatus(ctx context.Context, v bool) {}

// NativeEndian is always set to little endian as that is the only endianness
// used by supported platforms for LiteFS. This may be expanded in the future.
Expand Down
14 changes: 3 additions & 11 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,7 @@ func (s *Store) markDirty(name string) {
// monitorLease continuously handles either the leader lease or replicates from the primary.
func (s *Store) monitorLease(ctx context.Context) (err error) {
// Initialize environment to indicate this node is not a primary.
if err := s.Environment.SetPrimaryStatus(ctx, false); err != nil {
slog.Info("cannot init primary status on host environment", slog.Any("err", err))
}
s.Environment.SetPrimaryStatus(ctx, false)

var handoffLeaseID string
for {
Expand Down Expand Up @@ -918,14 +916,8 @@ func (s *Store) monitorLeaseAsPrimary(ctx context.Context, lease Lease) error {
}()

// Notify host environment that we are primary.
if err := s.Environment.SetPrimaryStatus(ctx, true); err != nil {
slog.Info("cannot set primary status on host environment", slog.Any("err", err))
}
defer func() {
if err := s.Environment.SetPrimaryStatus(ctx, false); err != nil {
slog.Info("cannot unset primary status on host environment", slog.Any("err", err))
}
}()
s.Environment.SetPrimaryStatus(ctx, true)
defer func() { s.Environment.SetPrimaryStatus(ctx, false) }()

waitDur := lease.TTL() / 2

Expand Down

0 comments on commit 1db7517

Please sign in to comment.