Skip to content

Commit

Permalink
broker: generalize handling to roll the spool if it's the first known…
Browse files Browse the repository at this point in the history
… write

It's relatively common that a bucket policy will remove ALL fragments of
an idle journal. Later, the journal will suddenly have new writes, and
in this case it's strongly desired for a new dirtying fragment write to
occur as quickly as possible.
  • Loading branch information
jgraettinger committed Mar 13, 2024
1 parent 0323445 commit fdfe530
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 41 deletions.
23 changes: 20 additions & 3 deletions broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,15 @@ func (b *appendFSM) onValidatePreconditions() {
}
}

// It's possible a peer might have a larger end offset which is not
// reflected in our index, if a commit wasn't accepted by all peers.
// Such writes are reported as failed to the client and are retried
// (this failure mode is what makes journals at-least-once).
var indexMin, indexMax = b.resolved.replica.index.OffsetRange()

var maxOffset = b.pln.spool.End
if eo := b.resolved.replica.index.EndOffset(); eo > maxOffset {
maxOffset = eo
if indexMax > maxOffset {
maxOffset = indexMax
}

if b.req.CheckRegisters != nil &&
Expand All @@ -434,6 +440,17 @@ func (b *appendFSM) onValidatePreconditions() {
// Re-sync the pipeline at the explicitly requested |maxOffset|.
b.rollToOffset = maxOffset
b.state = stateSendPipelineSync
} else if b.pln.spool.Begin == indexMin {
// The spool holds the journal's first known write and should be rolled.
// This has the effect of "dirtying" the remote fragment index,
// and protects against data loss if N > R consistency is lost (eg, Etcd fails).
// When the remote index is dirty, recovering brokers are clued in that writes
// against this journal have already occurred (and `gazctl reset-head`
// must be run to recover). If the index were instead pristine,
// recovering brokers cannot distinguish this case from a newly-created
// journal, which risks double-writes to journal offsets.
b.rollToOffset = b.pln.spool.End
b.state = stateStreamContent
} else {
b.state = stateStreamContent
}
Expand All @@ -457,7 +474,7 @@ func (b *appendFSM) onStreamContent(req *pb.AppendRequest, err error) {
// Potentially roll the Fragment forward ahead of this append. Our
// pipeline is synchronized, so we expect this will always succeed
// and don't ask for an acknowledgement.
var proposal = maybeRollFragment(b.pln.spool, 0, b.resolved.journalSpec.Fragment)
var proposal = maybeRollFragment(b.pln.spool, b.rollToOffset, b.resolved.journalSpec.Fragment)

if b.pln.spool.Fragment.Fragment != proposal {
b.pln.scatter(&pb.ReplicateRequest{
Expand Down
16 changes: 16 additions & 0 deletions broker/append_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,22 @@ func TestFSMStreamAndReadAcknowledgements(t *testing.T) {
fsm.onStreamContent(&pb.AppendRequest{}, nil) // Intent to commit.
fsm.onStreamContent(nil, io.EOF) // Client EOF.

// We previously completed a first write of this journal,
// which causes the _next_ write to roll the spool forward.
expect = pb.ReplicateRequest{
Proposal: &pb.Fragment{
Journal: "a/journal",
Begin: 2054,
End: 2054,
CompressionCodec: pb.CompressionCodec_GZIP,
},
Registers: boxLabels("after", ""), // Union/subtract applied.
Acknowledge: false, // In-sync pipeline isn't acknowledged again.
}
peerRecv(expect)

// Now the Append validation error causes a rollback.
expect.Acknowledge = true
peerRecv(expect) // Rollback.
peerSend(pb.ReplicateResponse{Status: pb.Status_OK}) // Send & read ACK.
fsm.onReadAcknowledgements()
Expand Down
6 changes: 3 additions & 3 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
}
}

// EndOffset returns the last (largest) End offset in the index.
func (fi *Index) EndOffset() int64 {
// OffsetRange returns the [Begin, End) offset range of all Fragments in the index.
func (fi *Index) OffsetRange() (int64, int64) {
defer fi.mu.RUnlock()
fi.mu.RLock()

return fi.set.EndOffset()
return fi.set.BeginOffset(), fi.set.EndOffset()
}

// SpoolCommit adds local Spool Fragment |frag| to the index.
Expand Down
8 changes: 6 additions & 2 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
<-ind.FirstRefreshCh()

c.Check(ind.set, gc.HasLen, 3)
c.Check(ind.EndOffset(), gc.Equals, int64(0x255))
var bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x255))

// Expect root/one provides Fragment 222-255.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand All @@ -279,7 +281,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
ind.ReplaceRemote(set)

c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected.
c.Check(ind.EndOffset(), gc.Equals, int64(0x555))
bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x555))

// Expect root/two now provides Fragment 222-333.
resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand Down
16 changes: 3 additions & 13 deletions broker/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -228,18 +228,8 @@ func maybeRollFragment(cur fragment.Spool, rollToOffset int64, spec pb.JournalSp
flushFragment = true // Empty fragment is trivially rolled.
} else if cl > spec.Length {
flushFragment = true // Roll if over the target Fragment length.
} else if cur.Begin == 0 {
// We should roll after the journal's very first write. This has the
// effect of "dirtying" the remote fragment index, and protects against
// data loss if N > R consistency is lost (eg, Etcd fails). When the
// remote index is dirty, recovering brokers are clued in that writes
// against this journal have already occurred (and `gazctl reset-head`
// must be run to recover). If the index were instead pristine,
// recovering brokers cannot distinguish this case from a newly-created
// journal, which risks double-writes to journal offsets.
flushFragment = true
} else if rollToOffset != 0 {
flushFragment = true
} else if rollToOffset != 0 && rollToOffset != cur.Begin {
flushFragment = true // Roll to a requested larger offset.
}

// If the flush interval of the fragment differs from current number of
Expand Down
50 changes: 32 additions & 18 deletions broker/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -109,32 +109,32 @@ func TestReplicaNextProposalCases(t *testing.T) {
defer func(f func() time.Time) { timeNow = f }(timeNow)

var testData = []struct {
prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment)
prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment)
out pb.Fragment
description string
}{
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 100
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 0, 100
spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2)
spec.Length = 200
spec.FlushInterval = time.Duration(time.Hour * 6)
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 1,
Begin: 0,
End: 100,
CompressionCodec: 1,
},
description: "Fragment does not need to be flushed",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 200
spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2)
spec.Length = 100
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Expand All @@ -145,12 +145,12 @@ func TestReplicaNextProposalCases(t *testing.T) {
description: "Fragment exceeds length, get flush proposal",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 50
spool.FirstAppendTime = time.Time{}.Add(time.Minute)
spec.Length = 100
spec.FlushInterval = time.Duration(time.Minute * 30)
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Expand All @@ -161,28 +161,42 @@ func TestReplicaNextProposalCases(t *testing.T) {
description: "Fragment contains data from previous flush interval",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 0, 10
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 10, 20
spec.Length = 100
return spool, spec
return spool, 20, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 10,
End: 10,
Begin: 20,
End: 20,
CompressionCodec: 1,
},
description: "Fragment is non-empty at Begin == 0",
description: "Fragment is has roll-to-offset",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 20, 30
spec.Length = 100
return spool, 20, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 20,
End: 30,
CompressionCodec: 1,
},
description: "Fragment is has already been rolled-to-offset",
},
}

timeNow = func() time.Time { return time.Time{}.Add(time.Hour) }
for _, test := range testData {
var spool, spec = test.prepArgs(
var spool, rollToOffset, spec = test.prepArgs(
fragment.NewSpool("a/journal", &testSpoolObserver{}),
pb.JournalSpec_Fragment{CompressionCodec: 1},
)
var proposal = maybeRollFragment(spool, 0, spec)
var proposal = maybeRollFragment(spool, rollToOffset, spec)
t.Log(test.description)
require.Equal(t, proposal, test.out)
}
Expand Down
6 changes: 4 additions & 2 deletions broker/replicate_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestReplicateStreamAndCommit(t *testing.T) {
require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6}))

// Precondition: content not observable in the Fragment index.
require.Equal(t, int64(0), broker.replica("a/journal").index.EndOffset())
var _, eo = broker.replica("a/journal").index.OffsetRange()
require.Equal(t, int64(0), eo)

// Commit.
require.NoError(t, stream.Send(&pb.ReplicateRequest{
Expand All @@ -56,7 +57,8 @@ func TestReplicateStreamAndCommit(t *testing.T) {
expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK})

// Post-condition: content is now observable.
require.Equal(t, int64(13), broker.replica("a/journal").index.EndOffset())
_, eo = broker.replica("a/journal").index.OffsetRange()
require.Equal(t, int64(13), eo)

// Send EOF and expect its returned.
require.NoError(t, stream.CloseSend())
Expand Down

0 comments on commit fdfe530

Please sign in to comment.