diff --git a/keyspace/key_space.go b/keyspace/key_space.go index f5600b6f..c326fb2e 100644 --- a/keyspace/key_space.go +++ b/keyspace/key_space.go @@ -15,6 +15,8 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/mirror" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // A KeySpace is a local mirror of a decoded portion of the Etcd key/value space, @@ -179,13 +181,20 @@ func (ks *KeySpace) Watch(ctx context.Context, client clientv3.Watcher) error { case resp, ok := <-watchCh: if !ok { return ctx.Err() // Watch contract implies the context is cancelled. - } else if err := resp.Err(); err == rpctypes.ErrNoLeader { - // ErrNoLeader indicates our watched Etcd is in a partitioned - // minority. Retry, attempting to find a member in the majority. + } else if err := resp.Err(); err != nil && !resp.Canceled { + log.WithFields(log.Fields{"err": err, "attempt": attempt}). + Warn("non-terminal watch error (will continue)") + } else if status, _ := status.FromError(err); err == rpctypes.ErrNoLeader || status.Code() == codes.Unknown { + // ErrNoLeader indicates our watched Etcd is in a partitioned minority. + // Unknown gRPC status can happen during Etcd shutdowns, such as in: + // + // rpc error: code = Unknown desc = malformed header: missing HTTP content-type + // + // In both cases we restart the watch. watchCh = nil log.WithFields(log.Fields{"err": err, "attempt": attempt}). - Warn("watch failed (will retry)") + Warn("watch channel failed (will restart)") select { case <-time.After(backoff(attempt)): // Pass.