Skip to content

Commit

Permalink
consumer: clean up shard working directory if NewStore() fails
Browse files Browse the repository at this point in the history
Previously we failed to clean these up if NewStore() errored out.
  • Loading branch information
jgraettinger committed Aug 24, 2024
1 parent 3d7972e commit ff3183e
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions consumer/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"time"

Expand Down Expand Up @@ -219,7 +219,7 @@ func beginRecovery(s *shard) error {

// Create local temporary directory into which we recover.
var dir string
if dir, err = ioutil.TempDir("", strings.ReplaceAll(spec.Id.String(), "/", "_")+"-"); err != nil {
if dir, err = os.MkdirTemp("", strings.ReplaceAll(spec.Id.String(), "/", "_")+"-"); err != nil {
return errors.WithMessage(err, "creating shard working directory")
}

Expand Down Expand Up @@ -278,6 +278,12 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
}

if s.store, err = s.svc.App.NewStore(s, s.recovery.recorder); err != nil {
if s.store == nil {
// s.store is nil, ergo its Destroy() will not be invoked by
// waitAndTearDown(), and we are responsible for best-effort
// cleanup of the playback directory now.
_ = os.RemoveAll(s.recovery.recorder.Dir())
}
return cp, errors.WithMessage(err, "app.NewStore")
} else if cp, err = s.store.RestoreCheckpoint(s); err != nil {
return cp, errors.WithMessage(err, "store.RestoreCheckpoint")
Expand Down

0 comments on commit ff3183e

Please sign in to comment.