Skip to content

Commit

Permalink
change proxy as discussed in a meet
Browse files Browse the repository at this point in the history
Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs committed Sep 10, 2024
1 parent b7913a9 commit 904c64c
Showing 1 changed file with 95 additions and 43 deletions.
138 changes: 95 additions & 43 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,13 +862,43 @@ func WithParams(p Params) *Proxy {
}
}

var frURL *url.URL
log := &logging.DefaultLog{}

var (
frURL *url.URL
frChannel = make(chan struct{}, 10240)
)
if p.FlightRecorder != nil {
var err error
frURL, err = url.Parse(p.FlightRecorderTargetURL)
if err != nil {
p.FlightRecorder.Stop()
p.FlightRecorder = nil
} else {
go func() {
d := 7 * 24 * time.Hour
last := time.Now()

for {
select {
case <-frChannel:
// range through all notifications until 1ms there is no notification
d = time.Millisecond
continue
case <-quit:
p.FlightRecorder.Stop()
return
case <-time.After(d):
if time.Since(last) >= time.Hour {
writeTrace(p.FlightRecorder, frURL, log, tr)
}
last = time.Now()

// reset d
d = 7 * 24 * time.Hour
}
}
}()
}
}

Expand All @@ -891,7 +921,7 @@ func WithParams(p Params) *Proxy {
maxLoops: p.MaxLoopbacks,
breakers: p.CircuitBreakers,
limiters: p.RateLimiters,
log: &logging.DefaultLog{},
log: log,
defaultHTTPStatus: defaultHTTPStatus,
tracing: newProxyTracing(p.OpenTracing),
accessLogDisabled: p.AccessLogDisabled,
Expand All @@ -906,57 +936,81 @@ func WithParams(p Params) *Proxy {
}
}

func (p *Proxy) writeTraceIfTooSlow(ctx *context) {
if p.flightRecorder == nil || p.flightRecorderURL == nil {
func (p *Proxy) writeTraceIfTooSlow(ctx *context, span ot.Span) {
took := time.Since(ctx.startServe)
span.SetTag("proxy.took", took)

// signal too slow

d := p.flightRecorderPeriod
if d < 1*time.Millisecond && d > took {
return
}
}

d := p.flightRecorderPeriod
if e, ok := ctx.StateBag()[filters.TraceName]; ok {
d = e.(time.Duration)
func writeTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
n, err := flightRecorder.WriteTo(w)
if err != nil {
switch err {
case trace.ErrSnapshotActive:
return 0, fmt.Errorf("flightRecorder already in progress")
default:
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
}
} else {
log.Infof("FlightRecorder wrote %d bytes", n)
}
if d < 1*time.Microsecond {

return n, err
}

func writeTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
if flightRecorder == nil || flightRecorderURL == nil {
return
}

p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), d)
if time.Since(ctx.startServe) > d {
log.Info("write trace")

switch flightRecorderURL.Scheme {
case "file":
fd, err := os.Open(flightRecorderURL.Path)
if err != nil {
log.Errorf("Failed to write file %q: %v", err, flightRecorderURL.Path)
return
}

_, err = writeTraceTo(log, flightRecorder, fd)
if err != nil {
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
}

case "http", "https":
var b bytes.Buffer
_, err := p.flightRecorder.WriteTo(&b)
_, err := writeTraceTo(log, flightRecorder, &b)
if err != nil {
p.log.Errorf("Failed to write flightrecorder data: %v", err)
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
return
}

switch p.flightRecorderURL.Scheme {
case "file":
if err := os.WriteFile(p.flightRecorderURL.Path, b.Bytes(), 0o644); err != nil {
p.log.Errorf("Failed to write file trace.out: %v", err)
return
} else {
p.log.Infof("FlightRecorder wrote %d bytes to trace file %q", b.Len(), p.flightRecorderURL.Path)
}
case "http", "https":
req, err := http.NewRequest("PUT", p.flightRecorderURL.String(), &b)
if err != nil {
p.log.Errorf("Failed to create request to %q to send a trace: %v", p.flightRecorderURL.String(), err)
}
req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
if err != nil {
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
}

rsp, err := p.roundTripper.RoundTrip(req)
if err != nil {
p.log.Errorf("Failed to write trace to %q: %v", p.flightRecorderURL.String(), err)
} else {
rsp.Body.Close()
}
switch rsp.StatusCode {
case 200, 201, 204:
p.log.Infof("Successful send of a trace to %q", p.flightRecorderURL.String())
default:
p.log.Errorf("Failed to get successful response from %s: (%d) %s", p.flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
rsp, err := roundTripper.RoundTrip(req)
if err != nil {
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
} else {
rsp.Body.Close()
}
switch rsp.StatusCode {
case 200, 201, 204:
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
default:
p.log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", p.flightRecorderURL.Scheme)
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
default:
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
}
}

Expand Down Expand Up @@ -1087,7 +1141,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindClient,
}}
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
parentSpan := ot.SpanFromContext(req.Context())
if parentSpan != nil {
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
}
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
Expand All @@ -1108,7 +1163,7 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
ctx.proxySpan.LogKV("http_roundtrip", StartEvent)
req = injectClientTrace(req, ctx.proxySpan)

p.writeTraceIfTooSlow(ctx)
p.writeTraceIfTooSlow(ctx, parentSpan)

response, err := roundTripper.RoundTrip(req)
if endpointMetrics != nil {
Expand Down Expand Up @@ -1742,9 +1797,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (p *Proxy) Close() error {
close(p.quit)
p.registry.Close()
if p.flightRecorder != nil {
p.flightRecorder.Stop()
}

return nil
}
Expand Down

0 comments on commit 904c64c

Please sign in to comment.