diff --git a/broker/append_fsm.go b/broker/append_fsm.go index 444ca54b..3ba9ee44 100644 --- a/broker/append_fsm.go +++ b/broker/append_fsm.go @@ -412,9 +412,15 @@ func (b *appendFSM) onValidatePreconditions() { } } + // It's possible a peer might have a larger end offset which is not + // reflected in our index, if a commit wasn't accepted by all peers. + // Such writes are reported as failed to the client and are retried + // (this failure mode is what makes journals at-least-once). + var indexMin, indexMax = b.resolved.replica.index.OffsetRange() + var maxOffset = b.pln.spool.End - if eo := b.resolved.replica.index.EndOffset(); eo > maxOffset { - maxOffset = eo + if indexMax > maxOffset { + maxOffset = indexMax } if b.req.CheckRegisters != nil && @@ -434,6 +440,17 @@ func (b *appendFSM) onValidatePreconditions() { // Re-sync the pipeline at the explicitly requested |maxOffset|. b.rollToOffset = maxOffset b.state = stateSendPipelineSync + } else if b.pln.spool.Begin == indexMin { + // The spool holds the journal's first known write and should be rolled. + // This has the effect of "dirtying" the remote fragment index, + // and protects against data loss if N > R consistency is lost (eg, Etcd fails). + // When the remote index is dirty, recovering brokers are clued in that writes + // against this journal have already occurred (and `gazctl reset-head` + // must be run to recover). If the index were instead pristine, + // recovering brokers cannot distinguish this case from a newly-created + // journal, which risks double-writes to journal offsets. + b.rollToOffset = b.pln.spool.End + b.state = stateStreamContent } else { b.state = stateStreamContent } @@ -457,7 +474,7 @@ func (b *appendFSM) onStreamContent(req *pb.AppendRequest, err error) { // Potentially roll the Fragment forward ahead of this append. Our // pipeline is synchronized, so we expect this will always succeed // and don't ask for an acknowledgement. - var proposal = maybeRollFragment(b.pln.spool, 0, b.resolved.journalSpec.Fragment) + var proposal = maybeRollFragment(b.pln.spool, b.rollToOffset, b.resolved.journalSpec.Fragment) if b.pln.spool.Fragment.Fragment != proposal { b.pln.scatter(&pb.ReplicateRequest{ diff --git a/broker/append_fsm_test.go b/broker/append_fsm_test.go index 11d4b7f7..3590fa12 100644 --- a/broker/append_fsm_test.go +++ b/broker/append_fsm_test.go @@ -579,6 +579,22 @@ func TestFSMStreamAndReadAcknowledgements(t *testing.T) { fsm.onStreamContent(&pb.AppendRequest{}, nil) // Intent to commit. fsm.onStreamContent(nil, io.EOF) // Client EOF. + // We previously completed a first write of this journal, + // which causes the _next_ write to roll the spool forward. + expect = pb.ReplicateRequest{ + Proposal: &pb.Fragment{ + Journal: "a/journal", + Begin: 2054, + End: 2054, + CompressionCodec: pb.CompressionCodec_GZIP, + }, + Registers: boxLabels("after", ""), // Union/subtract applied. + Acknowledge: false, // In-sync pipeline isn't acknowledged again. + } + peerRecv(expect) + + // Now the Append validation error causes a rollback. + expect.Acknowledge = true peerRecv(expect) // Rollback. peerSend(pb.ReplicateResponse{Status: pb.Status_OK}) // Send & read ACK. fsm.onReadAcknowledgements() diff --git a/broker/fragment/index.go b/broker/fragment/index.go index d4d8327b..2ed7620b 100644 --- a/broker/fragment/index.go +++ b/broker/fragment/index.go @@ -54,19 +54,20 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon var condCh = fi.condCh var err error - // If the requested offset isn't covered by the index, but we do have a - // Fragment covering a *greater* offset, where that Fragment is also older - // than a large time.Duration, then: skip forward the request offset to - // the Fragment offset. This case allows us to recover from "holes" or - // deletions in the offset space of a Journal, while not impacting races - // which can occur between delayed persistence to the Fragment store - // vs hand-off of Journals to new brokers (eg, a new broker which isn't - // yet aware of a Fragment currently being uploaded, should block a read - // of an offset covered by that Fragment until it becomes available). - if !found && ind != len(fi.set) && - fi.set[ind].ModTime != 0 && - fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() { - + // If the requested offset isn't covered by the index, but we do have + // a persisted fragment with a *greater* offset... + if !found && ind != len(fi.set) && fi.set[ind].ModTime != 0 && + // AND the client is reading from the very beginning of the journal, + // OR the next available fragment was persisted quite a while ago. + (req.Offset == 0 || (fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix())) { + + // Then skip the read forward to the first or next available offset. + // This case allows us to recover from "holes" or deletions in the + // offset space of a Journal, while not impacting races which can occur + // between delayed persistence to the Fragment store vs hand-off of + // Journals to new brokers (eg, a new broker which isn't yet aware of + // a Fragment currently being uploaded, should block a read + // of an offset covered by that Fragment until it becomes available). resp.Offset = fi.set[ind].Begin found = true } @@ -124,12 +125,12 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon } } -// EndOffset returns the last (largest) End offset in the index. -func (fi *Index) EndOffset() int64 { +// OffsetRange returns the [Begin, End) offset range of all Fragments in the index. +func (fi *Index) OffsetRange() (int64, int64) { defer fi.mu.RUnlock() fi.mu.RLock() - return fi.set.EndOffset() + return fi.set.BeginOffset(), fi.set.EndOffset() } // SpoolCommit adds local Spool Fragment |frag| to the index. diff --git a/broker/fragment/index_test.go b/broker/fragment/index_test.go index 39acbbc8..c32dc592 100644 --- a/broker/fragment/index_test.go +++ b/broker/fragment/index_test.go @@ -98,6 +98,31 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) { c.Check(err, gc.IsNil) } +func (s *IndexSuite) TestQueryAtMissingByteZero(c *gc.C) { + var ind = NewIndex(context.Background()) + var now = time.Now().Unix() + + // Establish local fragment fixtures. + var set = buildSet(c, 100, 200, 200, 300) + ind.SpoolCommit(set[0]) + + // A read from byte zero cannot proceed. + var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false}) + c.Check(resp.Status, gc.Equals, pb.Status_OFFSET_NOT_YET_AVAILABLE) + + // Set ModTime, marking the fragment as very recently persisted. + // A read from byte zero now skips forward. + set[0].ModTime = now + ind.ReplaceRemote(set) + + resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false}) + c.Check(resp, gc.DeepEquals, &pb.ReadResponse{ + Offset: 100, + WriteHead: 300, + Fragment: &pb.Fragment{Begin: 100, End: 200, ModTime: now}, + }) +} + func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) { var ind = NewIndex(context.Background()) var baseTime = time.Unix(1500000000, 0) @@ -263,7 +288,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) { <-ind.FirstRefreshCh() c.Check(ind.set, gc.HasLen, 3) - c.Check(ind.EndOffset(), gc.Equals, int64(0x255)) + var bo, eo = ind.OffsetRange() + c.Check(bo, gc.Equals, int64(0x0)) + c.Check(eo, gc.Equals, int64(0x255)) // Expect root/one provides Fragment 222-255. var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223}) @@ -279,7 +306,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) { ind.ReplaceRemote(set) c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected. - c.Check(ind.EndOffset(), gc.Equals, int64(0x555)) + bo, eo = ind.OffsetRange() + c.Check(bo, gc.Equals, int64(0x0)) + c.Check(eo, gc.Equals, int64(0x555)) // Expect root/two now provides Fragment 222-333. resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223}) diff --git a/broker/protocol/protocol.pb.go b/broker/protocol/protocol.pb.go index d3694d29..a1d20a99 100644 --- a/broker/protocol/protocol.pb.go +++ b/broker/protocol/protocol.pb.go @@ -12,10 +12,10 @@ import ( proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" golang_proto "github.com/golang/protobuf/proto" + _ "github.com/golang/protobuf/ptypes/duration" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - _ "google.golang.org/protobuf/types/known/durationpb" io "io" math "math" math_bits "math/bits" diff --git a/broker/replica.go b/broker/replica.go index b5ffe6d8..153fdcf1 100644 --- a/broker/replica.go +++ b/broker/replica.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "go.gazette.dev/core/allocator" "go.gazette.dev/core/broker/fragment" pb "go.gazette.dev/core/broker/protocol" @@ -228,18 +228,8 @@ func maybeRollFragment(cur fragment.Spool, rollToOffset int64, spec pb.JournalSp flushFragment = true // Empty fragment is trivially rolled. } else if cl > spec.Length { flushFragment = true // Roll if over the target Fragment length. - } else if cur.Begin == 0 { - // We should roll after the journal's very first write. This has the - // effect of "dirtying" the remote fragment index, and protects against - // data loss if N > R consistency is lost (eg, Etcd fails). When the - // remote index is dirty, recovering brokers are clued in that writes - // against this journal have already occurred (and `gazctl reset-head` - // must be run to recover). If the index were instead pristine, - // recovering brokers cannot distinguish this case from a newly-created - // journal, which risks double-writes to journal offsets. - flushFragment = true - } else if rollToOffset != 0 { - flushFragment = true + } else if rollToOffset != 0 && rollToOffset != cur.Begin { + flushFragment = true // Roll to a requested larger offset. } // If the flush interval of the fragment differs from current number of diff --git a/broker/replica_test.go b/broker/replica_test.go index 5f84fc39..ed5e9810 100644 --- a/broker/replica_test.go +++ b/broker/replica_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "go.gazette.dev/core/allocator" "go.gazette.dev/core/broker/fragment" pb "go.gazette.dev/core/broker/protocol" @@ -109,32 +109,32 @@ func TestReplicaNextProposalCases(t *testing.T) { defer func(f func() time.Time) { timeNow = f }(timeNow) var testData = []struct { - prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) + prepArgs func(fragment.Spool, pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) out pb.Fragment description string }{ { - prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) { - spool.Begin, spool.End = 1, 100 + prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) { + spool.Begin, spool.End = 0, 100 spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2) spec.Length = 200 spec.FlushInterval = time.Duration(time.Hour * 6) - return spool, spec + return spool, 0, spec }, out: pb.Fragment{ Journal: "a/journal", - Begin: 1, + Begin: 0, End: 100, CompressionCodec: 1, }, description: "Fragment does not need to be flushed", }, { - prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) { + prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) { spool.Begin, spool.End = 1, 200 spool.FirstAppendTime = time.Time{}.Add(time.Hour * 2) spec.Length = 100 - return spool, spec + return spool, 0, spec }, out: pb.Fragment{ Journal: "a/journal", @@ -145,12 +145,12 @@ func TestReplicaNextProposalCases(t *testing.T) { description: "Fragment exceeds length, get flush proposal", }, { - prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) { + prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) { spool.Begin, spool.End = 1, 50 spool.FirstAppendTime = time.Time{}.Add(time.Minute) spec.Length = 100 spec.FlushInterval = time.Duration(time.Minute * 30) - return spool, spec + return spool, 0, spec }, out: pb.Fragment{ Journal: "a/journal", @@ -161,28 +161,42 @@ func TestReplicaNextProposalCases(t *testing.T) { description: "Fragment contains data from previous flush interval", }, { - prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, pb.JournalSpec_Fragment) { - spool.Begin, spool.End = 0, 10 + prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) { + spool.Begin, spool.End = 10, 20 spec.Length = 100 - return spool, spec + return spool, 20, spec }, out: pb.Fragment{ Journal: "a/journal", - Begin: 10, - End: 10, + Begin: 20, + End: 20, CompressionCodec: 1, }, - description: "Fragment is non-empty at Begin == 0", + description: "Fragment is has roll-to-offset", + }, + { + prepArgs: func(spool fragment.Spool, spec pb.JournalSpec_Fragment) (fragment.Spool, int64, pb.JournalSpec_Fragment) { + spool.Begin, spool.End = 20, 30 + spec.Length = 100 + return spool, 20, spec + }, + out: pb.Fragment{ + Journal: "a/journal", + Begin: 20, + End: 30, + CompressionCodec: 1, + }, + description: "Fragment is has already been rolled-to-offset", }, } timeNow = func() time.Time { return time.Time{}.Add(time.Hour) } for _, test := range testData { - var spool, spec = test.prepArgs( + var spool, rollToOffset, spec = test.prepArgs( fragment.NewSpool("a/journal", &testSpoolObserver{}), pb.JournalSpec_Fragment{CompressionCodec: 1}, ) - var proposal = maybeRollFragment(spool, 0, spec) + var proposal = maybeRollFragment(spool, rollToOffset, spec) t.Log(test.description) require.Equal(t, proposal, test.out) } diff --git a/broker/replicate_api_test.go b/broker/replicate_api_test.go index 004ec34a..f3ef10f1 100644 --- a/broker/replicate_api_test.go +++ b/broker/replicate_api_test.go @@ -39,7 +39,8 @@ func TestReplicateStreamAndCommit(t *testing.T) { require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6})) // Precondition: content not observable in the Fragment index. - require.Equal(t, int64(0), broker.replica("a/journal").index.EndOffset()) + var _, eo = broker.replica("a/journal").index.OffsetRange() + require.Equal(t, int64(0), eo) // Commit. require.NoError(t, stream.Send(&pb.ReplicateRequest{ @@ -56,7 +57,8 @@ func TestReplicateStreamAndCommit(t *testing.T) { expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK}) // Post-condition: content is now observable. - require.Equal(t, int64(13), broker.replica("a/journal").index.EndOffset()) + _, eo = broker.replica("a/journal").index.OffsetRange() + require.Equal(t, int64(13), eo) // Send EOF and expect its returned. require.NoError(t, stream.CloseSend()) diff --git a/consumer/protocol/protocol.pb.go b/consumer/protocol/protocol.pb.go index 0ca62489..9b1a0e51 100644 --- a/consumer/protocol/protocol.pb.go +++ b/consumer/protocol/protocol.pb.go @@ -11,6 +11,7 @@ import ( proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" golang_proto "github.com/golang/protobuf/proto" + _ "github.com/golang/protobuf/ptypes/duration" go_gazette_dev_core_broker_protocol "go.gazette.dev/core/broker/protocol" protocol "go.gazette.dev/core/broker/protocol" recoverylog "go.gazette.dev/core/consumer/recoverylog" @@ -18,7 +19,6 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - _ "google.golang.org/protobuf/types/known/durationpb" io "io" math "math" math_bits "math/bits" diff --git a/consumer/recoverylog/recorded_op.pb.go b/consumer/recoverylog/recorded_op.pb.go index 5c5fecd0..b6ceea93 100644 --- a/consumer/recoverylog/recorded_op.pb.go +++ b/consumer/recoverylog/recorded_op.pb.go @@ -42,11 +42,11 @@ type RecordedOp struct { // // These are meta-fields: they're not literally serialized into written messages. // The offsets of a particular message will also vary over its lifetime: - // * When first recorded, the offsets at which the write will land within the journal - // cannot be known ahead of time, and Recorders use an approximate lower bound - // as |first_offset|. - // * During playback, players have the benefit of inspecting the committed log and - // attach exact byte offsets as they deserialized RecordedOps. + // - When first recorded, the offsets at which the write will land within the journal + // cannot be known ahead of time, and Recorders use an approximate lower bound + // as |first_offset|. + // - During playback, players have the benefit of inspecting the committed log and + // attach exact byte offsets as they deserialized RecordedOps. FirstOffset int64 `protobuf:"varint,9,opt,name=first_offset,json=firstOffset,proto3" json:"first_offset,omitempty"` LastOffset int64 `protobuf:"varint,10,opt,name=last_offset,json=lastOffset,proto3" json:"last_offset,omitempty"` Log go_gazette_dev_core_broker_protocol.Journal `protobuf:"bytes,11,opt,name=log,proto3,casttype=go.gazette.dev/core/broker/protocol.Journal" json:"log,omitempty"` diff --git a/kustomize/README.md b/kustomize/README.md index c080a960..930e2de1 100644 --- a/kustomize/README.md +++ b/kustomize/README.md @@ -32,9 +32,18 @@ $ make as-ci target=ci-release-gazette-broker $ make as-ci target=ci-release-gazette-examples # Install kind (if needed). -$ go get sigs.k8s.io/kind +$ go get sigs.k8s.io/kind@latest + +# Instal kail (if desired). +$ go get github.com/boz/kail/cmd/kail@latest + +# Create a local kubernetes cluster. $ kind create cluster +# Copy locally-build images into your `kind` cluster. +$ kind load docker-image gazette/broker +$ kind load docker-image gazette/examples + # Apply the complete soak test, running in namespace stream-sum. $ kubectl apply -k kustomize/test/deploy-stream-sum-with-crash-tests/