diff --git a/broker/client/reader.go b/broker/client/reader.go index fc1e5583..781fd26d 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -8,10 +8,14 @@ import ( "io" "io/ioutil" "net/http" + "sync" + "sync/atomic" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" "go.gazette.dev/core/broker/codecs" pb "go.gazette.dev/core/broker/protocol" + "golang.org/x/net/http2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -349,23 +353,22 @@ func (fr *FragmentReader) Close() error { // fragments are persisted, and to which this client also has access. The // returned cleanup function removes the handler and restores the prior http.Client. // -// const root = "/mnt/shared-nas-array/path/to/fragment-root" -// defer client.InstallFileTransport(root)() -// -// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{ -// Journal: "a/journal/with/nas/fragment/store", -// DoNotProxy: true, -// }) -// // rr.Read will read Fragments directly from NAS. +// const root = "/mnt/shared-nas-array/path/to/fragment-root" +// defer client.InstallFileTransport(root)() // +// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{ +// Journal: "a/journal/with/nas/fragment/store", +// DoNotProxy: true, +// }) +// // rr.Read will read Fragments directly from NAS. func InstallFileTransport(root string) (remove func()) { - var transport = http.DefaultTransport.(*http.Transport).Clone() - transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root))) - - var prevClient = httpClient - httpClient = &http.Client{Transport: transport} + maybeFileTransport.Store(root) + httpClient.swapClient(-1) // don't check the generation counter when swapping - return func() { httpClient = prevClient } + return func() { + maybeFileTransport.Store("") + httpClient.swapClient(-1) + } } // mapGRPCCtxErr returns ctx.Err() iff |err| represents a gRPC error with a @@ -389,6 +392,75 @@ func fragmentLabels(fragment pb.Fragment) prometheus.Labels { } } +// httpClientWrapper wraps an `http.Client`, which can be swapped out on the fly. +// The main purpose of doing this is to force it to abandon and re-create network connections +// in cases where the connections seem to be to misbehaving servers. We _like_ to have some logic like +// "if the server returns 3 INTERNAL_ERRORs in a row, then stop using this connection for future requests". +// Unfortunately, the Go APIs don't really allow for that sort of thing, and so this is the workaround +// that seems easiest for now. +type httpClientWrapper struct { + inner *http.Client + clientGen int64 + clientMu sync.RWMutex +} + +// swapClient triggers the immediate re-creation of the underlying http client. +// The current `clientGen` is compared to `expectClientGen`, and the client will _not_ be re-created +// if they are different. This is used to prevent unnecessary re-creations in cases where multiple +// goroutines all call swapClient simultaneously. Passing `-1` causes it to swap the client unconditionally, +// which is only used during tests. Returns a boolean indicating whether the client was actually re-created. +func (c *httpClientWrapper) swapClient(expectClientGen int64) (swapped bool) { + c.clientMu.Lock() + if expectClientGen == -1 || c.clientGen == expectClientGen { + c.inner = newInnerClient() + c.clientGen++ + swapped = true + } + c.clientMu.Unlock() + return +} + +func (c *httpClientWrapper) Do(req *http.Request) (*http.Response, error) { + c.clientMu.RLock() + var client = c.inner + var clientGen = c.clientGen + c.clientMu.RUnlock() + + var resp, err = client.Do(req) + var streamErr http2.StreamError + // Check for a specific error condition representing an internal error from a cloud storage + // server using http2. If this happens, then we'll re-create the client in order to force it to + // close existing connections and re-establish new ones. This is a smelly workaround for issues + // we've observed where Go's http2 client will keep re-using the same http connections when an + // unhealthy server just keeps sending `RSTStream` frames with internal error codes. + if errors.As(err, &streamErr) && streamErr.Code == http2.ErrCodeInternal { + // If swapClient returns false, then it just means that another goroutine beat us to it + if c.swapClient(clientGen) { + logrus.WithFields(logrus.Fields{ + "err": streamErr, + "prevClientGen": clientGen, + }).Warn("re-establishing fragment reader storage connections due to http2 stream error") + } + } + + return resp, err +} + +func newInnerClient() *http.Client { + var transport = http.DefaultTransport.(*http.Transport).Clone() + http2.ConfigureTransport(transport) + + var fileRoot = maybeFileTransport.Load() + if root, ok := fileRoot.(string); ok && root != "" { + logrus.Debug("installing file transport") + transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root))) + } + + return &http.Client{ + Transport: http.RoundTripper(transport), + } +} + var ( // Map common broker error statuses into named errors. ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String()) @@ -411,6 +483,10 @@ var ( // underlying file did not return EOF at the expected Fragment End offset. ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End") + maybeFileTransport atomic.Value + // httpClient is the http.Client used by OpenFragmentURL - httpClient = http.DefaultClient + httpClient = &httpClientWrapper{ + inner: newInnerClient(), + } )