Skip to content

Commit

Permalink
broker: slightly relax offset skips from byte zero
Browse files Browse the repository at this point in the history
Seek forward a journal read at offset zero, to a first persisted
fragment of the journal.

This improves the ergonomics of a fairly common case where all data is
deleted from an idle journal (for example, because of bucket lifecycle),
and then new writes arrive, and a new reader is trying to read the
journal from "the beginning".

For this special case, we'd like reads to proceed forward as soon as a
persisted fragment is available in the index without waiting for the
full offsetJumpAgeThreshold.
  • Loading branch information
jgraettinger committed Mar 13, 2024
1 parent 44cd5a4 commit c0c7b24
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
27 changes: 14 additions & 13 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,20 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
var condCh = fi.condCh
var err error

// If the requested offset isn't covered by the index, but we do have a
// Fragment covering a *greater* offset, where that Fragment is also older
// than a large time.Duration, then: skip forward the request offset to
// the Fragment offset. This case allows us to recover from "holes" or
// deletions in the offset space of a Journal, while not impacting races
// which can occur between delayed persistence to the Fragment store
// vs hand-off of Journals to new brokers (eg, a new broker which isn't
// yet aware of a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
if !found && ind != len(fi.set) &&
fi.set[ind].ModTime != 0 &&
fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() {

// If the requested offset isn't covered by the index, but we do have
// a persisted fragment with a *greater* offset...
if !found && ind != len(fi.set) && fi.set[ind].ModTime != 0 &&
// AND the client is reading from the very beginning of the journal,
// OR the next available fragment was persisted quite a while ago.
(req.Offset == 0 || (fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix())) {

// Then skip the read forward to the first or next available offset.
// This case allows us to recover from "holes" or deletions in the
// offset space of a Journal, while not impacting races which can occur
// between delayed persistence to the Fragment store vs hand-off of
// Journals to new brokers (eg, a new broker which isn't yet aware of
// a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
resp.Offset = fi.set[ind].Begin
found = true
}
Expand Down
25 changes: 25 additions & 0 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) {
c.Check(err, gc.IsNil)
}

func (s *IndexSuite) TestQueryAtMissingByteZero(c *gc.C) {
var ind = NewIndex(context.Background())
var now = time.Now().Unix()

// Establish local fragment fixtures.
var set = buildSet(c, 100, 200, 200, 300)
ind.SpoolCommit(set[0])

// A read from byte zero cannot proceed.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp.Status, gc.Equals, pb.Status_OFFSET_NOT_YET_AVAILABLE)

// Set ModTime, marking the fragment as very recently persisted.
// A read from byte zero now skips forward.
set[0].ModTime = now
ind.ReplaceRemote(set)

resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp, gc.DeepEquals, &pb.ReadResponse{
Offset: 100,
WriteHead: 300,
Fragment: &pb.Fragment{Begin: 100, End: 200, ModTime: now},
})
}

func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
var ind = NewIndex(context.Background())
var baseTime = time.Unix(1500000000, 0)
Expand Down

0 comments on commit c0c7b24

Please sign in to comment.