Skip to content

Commit

Permalink
feature: allow configuration for Go x/trace.FlightRecorder
Browse files Browse the repository at this point in the history
Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs committed Jul 1, 2024
1 parent 00480c8 commit baa60cf
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 37 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
BlockProfileRate int `yaml:"block-profile-rate"`
MutexProfileFraction int `yaml:"mutex-profile-fraction"`
MemProfileRate int `yaml:"memory-profile-rate"`
FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"`
DebugGcMetrics bool `yaml:"debug-gc-metrics"`
RuntimeMetrics bool `yaml:"runtime-metrics"`
ServeRouteMetrics bool `yaml:"serve-route-metrics"`
Expand Down Expand Up @@ -369,6 +370,7 @@ func NewConfig() *Config {

// logging, metrics, tracing:
flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation")
flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span")
flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.")
Expand All @@ -382,7 +384,7 @@ func NewConfig() *Config {
flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate")
flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction")
flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.SetMemProfileRate, keeps default 512 kB")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.")
flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats")
flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")
flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route")
Expand Down Expand Up @@ -755,6 +757,7 @@ func (c *Config) ToOptions() skipper.Options {
EnableProfile: c.EnableProfile,
BlockProfileRate: c.BlockProfileRate,
MutexProfileFraction: c.MutexProfileFraction,
FlightRecorderTargetURL: c.FlightRecorderTargetURL,
EnableDebugGcMetrics: c.DebugGcMetrics,
EnableRuntimeMetrics: c.RuntimeMetrics,
EnableServeRouteMetrics: c.ServeRouteMetrics,
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func Filters() []filters.Spec {
diag.NewNormalResponseLatency(),
diag.NewHistogramRequestLatency(),
diag.NewHistogramResponseLatency(),
diag.NewTrace(),
tee.NewTee(),
tee.NewTeeDeprecated(),
tee.NewTeeNoFollow(),
Expand Down
52 changes: 52 additions & 0 deletions filters/diag/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package diag

import (
"time"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/filters"
)

func getDurationArg(a interface{}) (time.Duration, error) {
if s, ok := a.(string); ok {
return time.ParseDuration(s)
}
return 0, filters.ErrInvalidFilterParameters
}

type traceSpec struct{}

type trace struct {
d time.Duration
}

// NewTrace creates a filter specification for the trace() filter
func NewTrace() filters.Spec {
return &traceSpec{}
}

func (*traceSpec) Name() string {
return filters.TraceName
}

func (ts *traceSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
if len(args) != 1 {
return nil, filters.ErrInvalidFilterParameters
}

d, err := getDurationArg(args[0])
if err != nil {
log.Warnf("d failed on creation of trace(): %v", err)
return nil, filters.ErrInvalidFilterParameters
}

return &trace{
d: d,
}, nil
}

func (tr *trace) Request(ctx filters.FilterContext) {
ctx.StateBag()[filters.TraceName] = tr.d
}

func (*trace) Response(filters.FilterContext) {}
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ const (
NormalResponseLatencyName = "normalResponseLatency"
HistogramRequestLatencyName = "histogramRequestLatency"
HistogramResponseLatencyName = "histogramResponseLatency"
TraceName = "trace"
LogBodyName = "logBody"
LogHeaderName = "logHeader"
TeeName = "tee"
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/yuin/gopher-lua v1.1.1
go4.org/netipx v0.0.0-20220925034521-797b0c90d8ab
golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
golang.org/x/net v0.26.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
Expand Down Expand Up @@ -170,7 +170,6 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f h1:3CW0unweImhOzd5FmYuRsD4Y4oQFKZIjAnKbjV4WIrw=
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down
76 changes: 76 additions & 0 deletions proxy/flightrecorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package proxy_test

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/diag"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/proxy/proxytest"
xtrace "golang.org/x/exp/trace"
)

func TestFlightRecorder(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed)))
return
}

var buf bytes.Buffer
n, err := io.Copy(&buf, r.Body)
if err != nil {
t.Fatalf("Failed to copy data: %v", err)
}
if n < 100 {
t.Fatalf("Failed to write enough data: %d bytes", n)
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte(http.StatusText(http.StatusCreated)))

}))
defer service.Close()

backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer backend.Close()

flightRecorder := xtrace.NewFlightRecorder()
flightRecorder.Start()

spec := diag.NewTrace()
fr := make(filters.Registry)
fr.Register(spec)

doc := fmt.Sprintf(`r: * -> trace("20µs") -> "%s"`, backend.URL)
rr := eskip.MustParse(doc)

pr := proxytest.WithParams(fr, proxy.Params{
FlightRecorder: flightRecorder,
FlightRecorderTargetURL: service.URL,
}, rr...)
defer pr.Close()

rsp, err := pr.Client().Get(pr.URL)
if err != nil {
t.Fatalf("Failed to GET %q: %v", pr.URL, err)
}
defer rsp.Body.Close()
_, err = io.ReadAll(rsp.Body)
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

switch rsp.StatusCode {
case 200, 201, 204:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}
}
100 changes: 70 additions & 30 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -362,6 +361,13 @@ type Params struct {

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck

// FlightRecorder is a started instance of https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder
FlightRecorder *trace.FlightRecorder

// FlightRecorderTargetURL is the target to write the trace
// to. Supported targets are http URL and file URL.
FlightRecorderTargetURL string
}

type (
Expand Down Expand Up @@ -457,8 +463,7 @@ type Proxy struct {
hostname string
onPanicSometimes rate.Sometimes
flightRecorder *trace.FlightRecorder
traceOnce sync.Once
tooLong time.Duration
flightRecorderURL *url.URL
}

// proxyError is used to wrap errors during proxying and to indicate
Expand Down Expand Up @@ -850,13 +855,15 @@ func WithParams(p Params) *Proxy {
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
}
}
// TODO(sszuecs): expose an option to start it
fr := trace.NewFlightRecorder()
//fr.SetPeriod(d)
//fr.SetSize(bytes int)
err := fr.Start()
if err != nil {
println("Failed to start FlightRecorder:", err.Error())

var frURL *url.URL
if p.FlightRecorder != nil {
var err error
frURL, err = url.Parse(p.FlightRecorderTargetURL)
if err != nil {
p.FlightRecorder.Stop()
p.FlightRecorder = nil
}
}

return &Proxy{
Expand Down Expand Up @@ -887,32 +894,62 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
flightRecorder: fr,
traceOnce: sync.Once{},
tooLong: 250 * time.Millisecond,
flightRecorder: p.FlightRecorder,
flightRecorderURL: frURL,
}
}

func (p *Proxy) writeTraceIfTooSlow(ctx *context) {
p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), p.tooLong)
if time.Since(ctx.startServe) > p.tooLong {
p.log.Info("too slow")
// Do it only once for simplicitly, but you can take more than one.
p.traceOnce.Do(func() {
p.log.Info("write trace because we were too slow")
// Grab the snapshot.
var b bytes.Buffer
_, err := p.flightRecorder.WriteTo(&b)
if err != nil {
p.log.Errorf("Failed to write flightrecorder data: %v", err)
if p.flightRecorder == nil || p.flightRecorderURL == nil {
return
}

var d time.Duration
if e, ok := ctx.StateBag()[filters.TraceName]; ok {
d = e.(time.Duration)
}
if d < 1*time.Microsecond {
return
}

p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), d)
if time.Since(ctx.startServe) > d {
var b bytes.Buffer
_, err := p.flightRecorder.WriteTo(&b)
if err != nil {
p.log.Errorf("Failed to write flightrecorder data: %v", err)
return
}

switch p.flightRecorderURL.Scheme {
case "file":
if err := os.WriteFile(p.flightRecorderURL.Path, b.Bytes(), 0o644); err != nil {
p.log.Errorf("Failed to write file trace.out: %v", err)
return
} else {
p.log.Infof("FlightRecorder wrote %d bytes to trace file %q", b.Len(), p.flightRecorderURL.Path)
}
// Write it to a file.
if err := os.WriteFile("trace.out", b.Bytes(), 0o755); err != nil {
p.log.Errorf("Failed to write trace.out: %v", err)
return
case "http", "https":
req, err := http.NewRequest("PUT", p.flightRecorderURL.String(), &b)
if err != nil {
p.log.Errorf("Failed to create request to %q to send a trace: %v", p.flightRecorderURL.String(), err)
}
})

rsp, err := p.roundTripper.RoundTrip(req)
if err != nil {
p.log.Errorf("Failed to write trace to %q: %v", p.flightRecorderURL.String(), err)
} else {
rsp.Body.Close()
}
switch rsp.StatusCode {
case 200, 201, 204:
p.log.Infof("Successful send of a trace to %q", p.flightRecorderURL.String())
default:
p.log.Errorf("Failed to get successful response from %s: (%d) %s", p.flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
default:
p.log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", p.flightRecorderURL.Scheme)
}
}
}

Expand Down Expand Up @@ -1686,7 +1723,10 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (p *Proxy) Close() error {
close(p.quit)
p.registry.Close()
p.flightRecorder.Stop()
if p.flightRecorder != nil {
p.flightRecorder.Stop()
}

return nil
}

Expand Down
Loading

0 comments on commit baa60cf

Please sign in to comment.