Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generalize first-write persistence and relax byte-zero read skipping #367

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,15 @@ func (b *appendFSM) onValidatePreconditions() {
}
}

// It's possible a peer might have a larger end offset which is not
// reflected in our index, if a commit wasn't accepted by all peers.
// Such writes are reported as failed to the client and are retried
// (this failure mode is what makes journals at-least-once).
var indexMin, indexMax = b.resolved.replica.index.OffsetRange()

var maxOffset = b.pln.spool.End
if eo := b.resolved.replica.index.EndOffset(); eo > maxOffset {
maxOffset = eo
if indexMax > maxOffset {
maxOffset = indexMax
}

if b.req.CheckRegisters != nil &&
Expand All @@ -434,6 +440,17 @@ func (b *appendFSM) onValidatePreconditions() {
// Re-sync the pipeline at the explicitly requested |maxOffset|.
b.rollToOffset = maxOffset
b.state = stateSendPipelineSync
} else if b.pln.spool.Begin == indexMin {
// The spool holds the journal's first known write and should be rolled.
// This has the effect of "dirtying" the remote fragment index,
// and protects against data loss if N > R consistency is lost (eg, Etcd fails).
// When the remote index is dirty, recovering brokers are clued in that writes
// against this journal have already occurred (and `gazctl reset-head`
// must be run to recover). If the index were instead pristine,
// recovering brokers cannot distinguish this case from a newly-created
// journal, which risks double-writes to journal offsets.
b.rollToOffset = b.pln.spool.End
b.state = stateStreamContent
} else {
b.state = stateStreamContent
}
Expand All @@ -457,7 +474,7 @@ func (b *appendFSM) onStreamContent(req *pb.AppendRequest, err error) {
// Potentially roll the Fragment forward ahead of this append. Our
// pipeline is synchronized, so we expect this will always succeed
// and don't ask for an acknowledgement.
var proposal = maybeRollFragment(b.pln.spool, 0, b.resolved.journalSpec.Fragment)
var proposal = maybeRollFragment(b.pln.spool, b.rollToOffset, b.resolved.journalSpec.Fragment)

if b.pln.spool.Fragment.Fragment != proposal {
b.pln.scatter(&pb.ReplicateRequest{
Expand Down
16 changes: 16 additions & 0 deletions broker/append_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,22 @@ func TestFSMStreamAndReadAcknowledgements(t *testing.T) {
fsm.onStreamContent(&pb.AppendRequest{}, nil) // Intent to commit.
fsm.onStreamContent(nil, io.EOF) // Client EOF.

// We previously completed a first write of this journal,
// which causes the _next_ write to roll the spool forward.
expect = pb.ReplicateRequest{
Proposal: &pb.Fragment{
Journal: "a/journal",
Begin: 2054,
End: 2054,
CompressionCodec: pb.CompressionCodec_GZIP,
},
Registers: boxLabels("after", ""), // Union/subtract applied.
Acknowledge: false, // In-sync pipeline isn't acknowledged again.
}
peerRecv(expect)

// Now the Append validation error causes a rollback.
expect.Acknowledge = true
peerRecv(expect) // Rollback.
peerSend(pb.ReplicateResponse{Status: pb.Status_OK}) // Send & read ACK.
fsm.onReadAcknowledgements()
Expand Down
33 changes: 17 additions & 16 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,20 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
var condCh = fi.condCh
var err error

// If the requested offset isn't covered by the index, but we do have a
// Fragment covering a *greater* offset, where that Fragment is also older
// than a large time.Duration, then: skip forward the request offset to
// the Fragment offset. This case allows us to recover from "holes" or
// deletions in the offset space of a Journal, while not impacting races
// which can occur between delayed persistence to the Fragment store
// vs hand-off of Journals to new brokers (eg, a new broker which isn't
// yet aware of a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
if !found && ind != len(fi.set) &&
fi.set[ind].ModTime != 0 &&
fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() {

// If the requested offset isn't covered by the index, but we do have
// a persisted fragment with a *greater* offset...
if !found && ind != len(fi.set) && fi.set[ind].ModTime != 0 &&
// AND the client is reading from the very beginning of the journal,
// OR the next available fragment was persisted quite a while ago.
(req.Offset == 0 || (fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix())) {

// Then skip the read forward to the first or next available offset.
// This case allows us to recover from "holes" or deletions in the
// offset space of a Journal, while not impacting races which can occur
// between delayed persistence to the Fragment store vs hand-off of
// Journals to new brokers (eg, a new broker which isn't yet aware of
// a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
resp.Offset = fi.set[ind].Begin
found = true
}
Expand Down Expand Up @@ -124,12 +125,12 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
}
}

// EndOffset returns the last (largest) End offset in the index.
func (fi *Index) EndOffset() int64 {
// OffsetRange returns the [Begin, End) offset range of all Fragments in the index.
func (fi *Index) OffsetRange() (int64, int64) {
defer fi.mu.RUnlock()
fi.mu.RLock()

return fi.set.EndOffset()
return fi.set.BeginOffset(), fi.set.EndOffset()
}

// SpoolCommit adds local Spool Fragment |frag| to the index.
Expand Down
33 changes: 31 additions & 2 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) {
c.Check(err, gc.IsNil)
}

func (s *IndexSuite) TestQueryAtMissingByteZero(c *gc.C) {
var ind = NewIndex(context.Background())
var now = time.Now().Unix()

// Establish local fragment fixtures.
var set = buildSet(c, 100, 200, 200, 300)
ind.SpoolCommit(set[0])

// A read from byte zero cannot proceed.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp.Status, gc.Equals, pb.Status_OFFSET_NOT_YET_AVAILABLE)

// Set ModTime, marking the fragment as very recently persisted.
// A read from byte zero now skips forward.
set[0].ModTime = now
ind.ReplaceRemote(set)

resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp, gc.DeepEquals, &pb.ReadResponse{
Offset: 100,
WriteHead: 300,
Fragment: &pb.Fragment{Begin: 100, End: 200, ModTime: now},
})
}

func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
var ind = NewIndex(context.Background())
var baseTime = time.Unix(1500000000, 0)
Expand Down Expand Up @@ -263,7 +288,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
<-ind.FirstRefreshCh()

c.Check(ind.set, gc.HasLen, 3)
c.Check(ind.EndOffset(), gc.Equals, int64(0x255))
var bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x255))

// Expect root/one provides Fragment 222-255.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand All @@ -279,7 +306,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
ind.ReplaceRemote(set)

c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected.
c.Check(ind.EndOffset(), gc.Equals, int64(0x555))
bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x555))

// Expect root/two now provides Fragment 222-333.
resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand Down
2 changes: 1 addition & 1 deletion broker/protocol/protocol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 3 additions & 13 deletions broker/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -228,18 +228,8 @@ func maybeRollFragment(cur fragment.Spool, rollToOffset int64, spec pb.JournalSp
flushFragment = true // Empty fragment is trivially rolled.
} else if cl > spec.Length {
flushFragment = true // Roll if over the target Fragment length.
} else if cur.Begin == 0 {
// We should roll after the journal's very first write. This has the
// effect of "dirtying" the remote fragment index, and protects against
// data loss if N > R consistency is lost (eg, Etcd fails). When the
// remote index is dirty, recovering brokers are clued in that writes
// against this journal have already occurred (and `gazctl reset-head`
// must be run to recover). If the index were instead pristine,
// recovering brokers cannot distinguish this case from a newly-created
// journal, which risks double-writes to journal offsets.
flushFragment = true
} else if rollToOffset != 0 {
flushFragment = true
} else if rollToOffset != 0 && rollToOffset != cur.Begin {
flushFragment = true // Roll to a requested larger offset.
}

// If the flush interval of the fragment differs from current number of
Expand Down
50 changes: 32 additions & 18 deletions broker/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
"go.gazette.dev/core/broker/fragment"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -109,32 +109,32 @@ func TestReplicaNextProposalCases(t *testing.T) {
defer func(f func() time.Time) { timeNow = f }(timeNow)

var testData = []struct {
prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment)
prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment)
out pb.Fragment
description string
}{
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 100
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 0, 100
spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2)
spec.Length = 200
spec.FlushInterval = time.Duration(time.Hour * 6)
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 1,
Begin: 0,
End: 100,
CompressionCodec: 1,
},
description: "Fragment does not need to be flushed",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 200
spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2)
spec.Length = 100
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Expand All @@ -145,12 +145,12 @@ func TestReplicaNextProposalCases(t *testing.T) {
description: "Fragment exceeds length, get flush proposal",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 1, 50
spool.FirstAppendTime = time.Time{}.Add(time.Minute)
spec.Length = 100
spec.FlushInterval = time.Duration(time.Minute * 30)
return spool, spec
return spool, 0, spec
},
out: pb.Fragment{
Journal: "a/journal",
Expand All @@ -161,28 +161,42 @@ func TestReplicaNextProposalCases(t *testing.T) {
description: "Fragment contains data from previous flush interval",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 0, 10
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 10, 20
spec.Length = 100
return spool, spec
return spool, 20, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 10,
End: 10,
Begin: 20,
End: 20,
CompressionCodec: 1,
},
description: "Fragment is non-empty at Begin == 0",
description: "Fragment is has roll-to-offset",
},
{
prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) {
spool.Begin, spool.End = 20, 30
spec.Length = 100
return spool, 20, spec
},
out: pb.Fragment{
Journal: "a/journal",
Begin: 20,
End: 30,
CompressionCodec: 1,
},
description: "Fragment is has already been rolled-to-offset",
},
}

timeNow = func() time.Time { return time.Time{}.Add(time.Hour) }
for _, test := range testData {
var spool, spec = test.prepArgs(
var spool, rollToOffset, spec = test.prepArgs(
fragment.NewSpool("a/journal", &testSpoolObserver{}),
pb.JournalSpec_Fragment{CompressionCodec: 1},
)
var proposal = maybeRollFragment(spool, 0, spec)
var proposal = maybeRollFragment(spool, rollToOffset, spec)
t.Log(test.description)
require.Equal(t, proposal, test.out)
}
Expand Down
6 changes: 4 additions & 2 deletions broker/replicate_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestReplicateStreamAndCommit(t *testing.T) {
require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6}))

// Precondition: content not observable in the Fragment index.
require.Equal(t, int64(0), broker.replica("a/journal").index.EndOffset())
var _, eo = broker.replica("a/journal").index.OffsetRange()
require.Equal(t, int64(0), eo)

// Commit.
require.NoError(t, stream.Send(&pb.ReplicateRequest{
Expand All @@ -56,7 +57,8 @@ func TestReplicateStreamAndCommit(t *testing.T) {
expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK})

// Post-condition: content is now observable.
require.Equal(t, int64(13), broker.replica("a/journal").index.EndOffset())
_, eo = broker.replica("a/journal").index.OffsetRange()
require.Equal(t, int64(13), eo)

// Send EOF and expect its returned.
require.NoError(t, stream.CloseSend())
Expand Down
2 changes: 1 addition & 1 deletion consumer/protocol/protocol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions consumer/recoverylog/recorded_op.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading