Skip to content

Commit

Permalink
journal client: re-create http client after receiving internal errors
Browse files Browse the repository at this point in the history
This addresses an issue we've encountered in production that results in a large
number of request failures.  This happens when one of the TCP connections is to
a server that starts sending RSTStream frames with an `INTERNAL_ERROR` status
(http2 stream status, not http request status). In this case, the Go http
client will continue to re-use the connection for other requests, which also
fail in the same way. What we'd really like is to have some sort of logic that
says, "if connection x has returned such an internal error, then stop using
that connection for future requests and shut it down". Unfortunately, such
logic is impossible to express with the current `Transport` and
`ClientConnPool` APIs. So the next best thing is a workaround where we
re-create the entire http client, to force it to re-establish all connections.
That's what this does. Plus a little extra for dealing with the dynamic
enablement/disablement of the file protocol transport.
  • Loading branch information
psFried committed Mar 15, 2024
1 parent 7a8baff commit ab11240
Showing 1 changed file with 91 additions and 15 deletions.
106 changes: 91 additions & 15 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"io"
"io/ioutil"
"net/http"
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.gazette.dev/core/broker/codecs"
pb "go.gazette.dev/core/broker/protocol"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -349,23 +353,22 @@ func (fr *FragmentReader) Close() error {
// fragments are persisted, and to which this client also has access. The
// returned cleanup function removes the handler and restores the prior http.Client.
//
// const root = "/mnt/shared-nas-array/path/to/fragment-root"
// defer client.InstallFileTransport(root)()
//
// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
// Journal: "a/journal/with/nas/fragment/store",
// DoNotProxy: true,
// })
// // rr.Read will read Fragments directly from NAS.
// const root = "/mnt/shared-nas-array/path/to/fragment-root"
// defer client.InstallFileTransport(root)()
//
// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
// Journal: "a/journal/with/nas/fragment/store",
// DoNotProxy: true,
// })
// // rr.Read will read Fragments directly from NAS.
func InstallFileTransport(root string) (remove func()) {
var transport = http.DefaultTransport.(*http.Transport).Clone()
transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root)))

var prevClient = httpClient
httpClient = &http.Client{Transport: transport}
maybeFileTransport.Store(root)
httpClient.swapClient(-1) // don't check the generation counter when swapping

return func() { httpClient = prevClient }
return func() {
maybeFileTransport.Store("")
httpClient.swapClient(-1)
}
}

// mapGRPCCtxErr returns ctx.Err() iff |err| represents a gRPC error with a
Expand All @@ -389,6 +392,75 @@ func fragmentLabels(fragment pb.Fragment) prometheus.Labels {
}
}

// httpClientWrapper wraps an `http.Client`, which can be swapped out on the fly.
// The main purpose of doing this is to force it to abandon and re-create network connections
// in cases where the connections seem to be to misbehaving servers. We _like_ to have some logic like
// "if the server returns 3 INTERNAL_ERRORs in a row, then stop using this connection for future requests".
// Unfortunately, the Go APIs don't really allow for that sort of thing, and so this is the workaround
// that seems easiest for now.
type httpClientWrapper struct {
inner *http.Client
clientGen int64
clientMu sync.RWMutex
}

// swapClient triggers the immediate re-creation of the underlying http client.
// The current `clientGen` is compared to `expectClientGen`, and the client will _not_ be re-created
// if they are different. This is used to prevent unnecessary re-creations in cases where multiple
// goroutines all call swapClient simultaneously. Passing `-1` causes it to swap the client unconditionally,
// which is only used during tests. Returns a boolean indicating whether the client was actually re-created.
func (c *httpClientWrapper) swapClient(expectClientGen int64) (swapped bool) {
c.clientMu.Lock()
if expectClientGen == -1 || c.clientGen == expectClientGen {
c.inner = newInnerClient()
c.clientGen++
swapped = true
}
c.clientMu.Unlock()
return
}

func (c *httpClientWrapper) Do(req *http.Request) (*http.Response, error) {
c.clientMu.RLock()
var client = c.inner
var clientGen = c.clientGen
c.clientMu.RUnlock()

var resp, err = client.Do(req)
var streamErr http2.StreamError
// Check for a specific error condition representing an internal error from a cloud storage
// server using http2. If this happens, then we'll re-create the client in order to force it to
// close existing connections and re-establish new ones. This is a smelly workaround for issues
// we've observed where Go's http2 client will keep re-using the same http connections when an
// unhealthy server just keeps sending `RSTStream` frames with internal error codes.
if errors.As(err, &streamErr) && streamErr.Code == http2.ErrCodeInternal {
// If swapClient returns false, then it just means that another goroutine beat us to it
if c.swapClient(clientGen) {
logrus.WithFields(logrus.Fields{
"err": streamErr,
"prevClientGen": clientGen,
}).Warn("re-establishing fragment reader storage connections due to http2 stream error")
}
}

return resp, err
}

func newInnerClient() *http.Client {
var transport = http.DefaultTransport.(*http.Transport).Clone()
http2.ConfigureTransport(transport)

var fileRoot = maybeFileTransport.Load()
if root, ok := fileRoot.(string); ok && root != "" {
logrus.Debug("installing file transport")
transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root)))
}

return &http.Client{
Transport: http.RoundTripper(transport),
}
}

var (
// Map common broker error statuses into named errors.
ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String())
Expand All @@ -411,6 +483,10 @@ var (
// underlying file did not return EOF at the expected Fragment End offset.
ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")

maybeFileTransport atomic.Value

// httpClient is the http.Client used by OpenFragmentURL
httpClient = http.DefaultClient
httpClient = &httpClientWrapper{
inner: newInnerClient(),
}
)

0 comments on commit ab11240

Please sign in to comment.