Skip to content

Commit

Permalink
broker/client: quiet more noisy logging
Browse files Browse the repository at this point in the history
Don't warn on a clean server-side close of a List RPC that doesn't ever
return a response. This is expected for a resumed listing which doesn't
change.

Use an attached Route of a previously completed Read, if available,
and more importantly: clear the last Response of the previous stream.
We could enter a condition where this was never cleared (for example, on
NOT_JOURNAL_BROKER) if the _following_ request went to the correct
broker, stayed open for a while with no data, and was then closed server-side.
This can cause mis-leading and false-positive logged warnings.

Update RetryReader to account for attempt being incremented on the next
loop iteration.
  • Loading branch information
jgraettinger committed Sep 8, 2024
1 parent 8e0bd21 commit d0c9d56
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
3 changes: 3 additions & 0 deletions broker/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (pl *WatchedList) watch() {
} else {
stream = nil // Must restart.

if err == io.EOF {
attempt = 0 // Clean server-side close.
}
// Wait for back-off timer or context cancellation.
select {
case <-pl.ctx.Done():
Expand Down
16 changes: 12 additions & 4 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,18 @@ func (r *Reader) Read(p []byte) (n int, err error) {

// Lazy initialization: begin the Read RPC.
if r.stream == nil {
if r.stream, err = r.client.Read(
pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false),
&r.Request,
); err == nil {
var ctx context.Context

// Prefer a prior response header route, and fall back to the route cache.
if r.Response.Header != nil {
ctx = pb.WithDispatchRoute(r.ctx, r.Response.Header.Route, pb.ProcessSpec_ID{})
} else {
ctx = pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false)
}
// Clear last response of previous stream.
r.Response = pb.ReadResponse{}

if r.stream, err = r.client.Read(ctx, &r.Request); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.stream|.
} else {
err = mapGRPCCtxErr(r.ctx, err)
Expand Down
4 changes: 3 additions & 1 deletion broker/client/retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) {
// any data before being closed server-side, so clear `attempts` to
// reduce log noise: it's common to next see ErrNotJournalBroker
// on broker topology changes or when authorizations are refreshed.
attempt = 0
attempt = -1
squelch = true
default:
}
Expand Down Expand Up @@ -222,6 +222,8 @@ func backoff(attempt int) time.Duration {
// involves a couple of Nagle-like read delays (~30ms) as Etcd watch
// updates are applied by participants.
switch attempt {
case -1:
return 0
case 0, 1:
return time.Millisecond * 50
case 2, 3:
Expand Down

0 comments on commit d0c9d56

Please sign in to comment.