From c0c7b241eb823216fc358a72a46195327930643f Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 12 Mar 2024 23:30:38 +0000 Subject: [PATCH] broker: slightly relax offset skips from byte zero Seek forward a journal read at offset zero, to a first persisted fragment of the journal. This improves the ergonomics of a fairly common case where all data is deleted from an idle journal (for example, because of bucket lifecycle), and then new writes arrive, and a new reader is trying to read the journal from "the beginning". For this special case, we'd like reads to proceed forward as soon as a persisted fragment is available in the index without waiting for the full offsetJumpAgeThreshold. --- broker/fragment/index.go | 27 ++++++++++++++------------- broker/fragment/index_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/broker/fragment/index.go b/broker/fragment/index.go index 85fe4cbf..2ed7620b 100644 --- a/broker/fragment/index.go +++ b/broker/fragment/index.go @@ -54,19 +54,20 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon var condCh = fi.condCh var err error - // If the requested offset isn't covered by the index, but we do have a - // Fragment covering a *greater* offset, where that Fragment is also older - // than a large time.Duration, then: skip forward the request offset to - // the Fragment offset. This case allows us to recover from "holes" or - // deletions in the offset space of a Journal, while not impacting races - // which can occur between delayed persistence to the Fragment store - // vs hand-off of Journals to new brokers (eg, a new broker which isn't - // yet aware of a Fragment currently being uploaded, should block a read - // of an offset covered by that Fragment until it becomes available). - if !found && ind != len(fi.set) && - fi.set[ind].ModTime != 0 && - fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() { - + // If the requested offset isn't covered by the index, but we do have + // a persisted fragment with a *greater* offset... + if !found && ind != len(fi.set) && fi.set[ind].ModTime != 0 && + // AND the client is reading from the very beginning of the journal, + // OR the next available fragment was persisted quite a while ago. + (req.Offset == 0 || (fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix())) { + + // Then skip the read forward to the first or next available offset. + // This case allows us to recover from "holes" or deletions in the + // offset space of a Journal, while not impacting races which can occur + // between delayed persistence to the Fragment store vs hand-off of + // Journals to new brokers (eg, a new broker which isn't yet aware of + // a Fragment currently being uploaded, should block a read + // of an offset covered by that Fragment until it becomes available). resp.Offset = fi.set[ind].Begin found = true } diff --git a/broker/fragment/index_test.go b/broker/fragment/index_test.go index ed385f20..c32dc592 100644 --- a/broker/fragment/index_test.go +++ b/broker/fragment/index_test.go @@ -98,6 +98,31 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) { c.Check(err, gc.IsNil) } +func (s *IndexSuite) TestQueryAtMissingByteZero(c *gc.C) { + var ind = NewIndex(context.Background()) + var now = time.Now().Unix() + + // Establish local fragment fixtures. + var set = buildSet(c, 100, 200, 200, 300) + ind.SpoolCommit(set[0]) + + // A read from byte zero cannot proceed. + var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false}) + c.Check(resp.Status, gc.Equals, pb.Status_OFFSET_NOT_YET_AVAILABLE) + + // Set ModTime, marking the fragment as very recently persisted. + // A read from byte zero now skips forward. + set[0].ModTime = now + ind.ReplaceRemote(set) + + resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false}) + c.Check(resp, gc.DeepEquals, &pb.ReadResponse{ + Offset: 100, + WriteHead: 300, + Fragment: &pb.Fragment{Begin: 100, End: 200, ModTime: now}, + }) +} + func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) { var ind = NewIndex(context.Background()) var baseTime = time.Unix(1500000000, 0)