From a6f7fe7fb8d3bb40ed742a569d6e6d4ff30579e0 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 6 Sep 2024 00:36:22 -0500 Subject: [PATCH] broker/client: surface JOURNAL_NOT_FOUND from RetryReader JOURNAL_NOT_FOUND is generally not a retry-able condition. It was orignally made so with a "shrug", on the premise that users may create journals after they had started their consumers. In practice, I'm not aware of any utility for it. Also tweak logging to reduce noise in environments that have many journals with no current append volume, where read re-authorizations are somewhat frequently obtained. --- broker/client/reader.go | 15 +++++++++------ broker/client/retry_reader.go | 17 ++++++++++++++--- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 93222a5d..0f7d6dfe 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -26,9 +26,9 @@ import ( // seek to the requested offset, and read its content. // // Reader returns EOF if: -// * The broker closes the RPC, eg because its assignment has change or it's shutting down. -// * The requested EndOffset has been read through. -// * A Fragment being read by the Reader reaches EOF. +// - The broker closes the RPC, eg because its assignment has change or it's shutting down. +// - The requested EndOffset has been read through. +// - A Fragment being read by the Reader reaches EOF. // // If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable // is returned upon reaching the journal write head. @@ -179,6 +179,8 @@ func (r *Reader) Read(p []byte) (n int, err error) { if err != io.EOF { panic(err.Error()) // Status_OK implies graceful stream closure. } + case pb.Status_JOURNAL_NOT_FOUND: + err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_BROKER: err = ErrNotJournalBroker case pb.Status_INSUFFICIENT_JOURNAL_BROKERS: @@ -199,9 +201,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 { } // Seek provides a limited form of seeking support. Specifically, if: -// * A Fragment URL is being directly read, and -// * The Seek offset is ahead of the current Reader offset, and -// * The Fragment also covers the desired Seek offset +// - A Fragment URL is being directly read, and +// - The Seek offset is ahead of the current Reader offset, and +// - The Fragment also covers the desired Seek offset +// // Then a seek is performed by reading and discarding to the seeked offset. // Seek will otherwise return ErrSeekRequiresNewReader. func (r *Reader) Seek(offset int64, whence int) (int64, error) { diff --git a/broker/client/retry_reader.go b/broker/client/retry_reader.go index 72e731b5..c62da1ea 100644 --- a/broker/client/retry_reader.go +++ b/broker/client/retry_reader.go @@ -100,13 +100,24 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) { } else { return // Surface to caller. } - case ErrInsufficientJournalBrokers, ErrNotJournalBroker, ErrJournalNotFound: + case ErrInsufficientJournalBrokers, ErrNotJournalBroker: // Suppress logging for expected errors on first read attempt. // We may be racing a concurrent Etcd watch and assignment of the broker cluster. squelch = attempt == 0 + case ErrJournalNotFound: + // If a Header was attached to the request, the journal was not found + // even after honoring its read-through Etcd revision. If not, + // we allow a handful of attempts to work around expected propagation + // delays of Etcd revisions if the journal was just created. + if rr.Reader.Request.Header != nil || attempt > 3 { + return // Surface to caller. + } case io.EOF: - // Repeated EOF is common if topology changes or authorizations - // expire on a journal with no active appends. + // EOF means we had an established RPC, but it might not have sent + // any data before being closed server-side, so clear `attempts` to + // reduce log noise: it's common to next see ErrNotJournalBroker + // on broker topology changes or when authorizations are refreshed. + attempt = 0 squelch = true default: }