diff --git a/config/config.go b/config/config.go index 20799722e1..a80048bf72 100644 --- a/config/config.go +++ b/config/config.go @@ -84,6 +84,7 @@ type Config struct { BlockProfileRate int `yaml:"block-profile-rate"` MutexProfileFraction int `yaml:"mutex-profile-fraction"` MemProfileRate int `yaml:"memory-profile-rate"` + FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"` DebugGcMetrics bool `yaml:"debug-gc-metrics"` RuntimeMetrics bool `yaml:"runtime-metrics"` ServeRouteMetrics bool `yaml:"serve-route-metrics"` @@ -369,6 +370,7 @@ func NewConfig() *Config { // logging, metrics, tracing: flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics") + flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation") flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span") flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.") @@ -382,7 +384,7 @@ func NewConfig() *Config { flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate") flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction") flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.SetMemProfileRate, keeps default 512 kB") - flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") + flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.") flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats") flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats") flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route") @@ -755,6 +757,7 @@ func (c *Config) ToOptions() skipper.Options { EnableProfile: c.EnableProfile, BlockProfileRate: c.BlockProfileRate, MutexProfileFraction: c.MutexProfileFraction, + FlightRecorderTargetURL: c.FlightRecorderTargetURL, EnableDebugGcMetrics: c.DebugGcMetrics, EnableRuntimeMetrics: c.RuntimeMetrics, EnableServeRouteMetrics: c.ServeRouteMetrics, diff --git a/go.mod b/go.mod index 080cfb0050..9def1404c6 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/yuin/gopher-lua v1.1.1 go4.org/netipx v0.0.0-20220925034521-797b0c90d8ab golang.org/x/crypto v0.27.0 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/net v0.29.0 golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 @@ -169,10 +169,10 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/automaxprocs v1.5.3 // indirect - golang.org/x/mod v0.19.0 // indirect + golang.org/x/mod v0.21.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.23.0 // indirect + golang.org/x/tools v0.25.0 // indirect gonum.org/v1/gonum v0.8.2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect @@ -182,4 +182,6 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -go 1.22 +go 1.22.0 + +toolchain go1.23.0 diff --git a/go.sum b/go.sum index 9eb1b19331..1ab064d335 100644 --- a/go.sum +++ b/go.sum @@ -538,8 +538,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -552,8 +552,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= -golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -638,8 +638,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/proxy/flightrecorder_test.go b/proxy/flightrecorder_test.go new file mode 100644 index 0000000000..bbf2af6679 --- /dev/null +++ b/proxy/flightrecorder_test.go @@ -0,0 +1,91 @@ +package proxy_test + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/diag" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/proxy/proxytest" + xtrace "golang.org/x/exp/trace" +) + +func TestFlightRecorder(t *testing.T) { + ch := make(chan int) + service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "PUT" { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed))) + ch <- http.StatusMethodNotAllowed + return + } + + var buf bytes.Buffer + n, err := io.Copy(&buf, r.Body) + if err != nil { + t.Fatalf("Failed to copy data: %v", err) + } + if n < 100 { + t.Fatalf("Failed to write enough data: %d bytes", n) + } + w.WriteHeader(http.StatusCreated) + w.Write([]byte(http.StatusText(http.StatusCreated))) + ch <- http.StatusCreated + })) + defer service.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(http.StatusText(http.StatusOK))) + })) + defer backend.Close() + + flightRecorder := xtrace.NewFlightRecorder() + flightRecorder.Start() + + spec := diag.NewLatency() + fr := make(filters.Registry) + fr.Register(spec) + + doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL) + rr := eskip.MustParse(doc) + + pr := proxytest.WithParams(fr, proxy.Params{ + FlightRecorder: flightRecorder, + FlightRecorderTargetURL: service.URL, + FlightRecorderPeriod: 90 * time.Millisecond, + }, rr...) + defer pr.Close() + + rsp, err := pr.Client().Get(pr.URL) + if err != nil { + t.Fatalf("Failed to GET %q: %v", pr.URL, err) + } + defer rsp.Body.Close() + _, err = io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + + switch rsp.StatusCode { + case http.StatusOK: + // ok + default: + t.Fatalf("Failed to get status OK: %d", rsp.StatusCode) + } + + statusCode := <-ch + switch statusCode { + case http.StatusCreated: + // ok + default: + t.Fatalf("Failed to get status OK: %d", rsp.StatusCode) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 0babad8da5..964665cbe9 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -22,6 +22,7 @@ import ( "unicode/utf8" "golang.org/x/exp/maps" + "golang.org/x/exp/trace" "golang.org/x/time/rate" ot "github.com/opentracing/opentracing-go" @@ -360,6 +361,18 @@ type Params struct { // PassiveHealthCheck defines the parameters for the healthy endpoints checker. PassiveHealthCheck *PassiveHealthCheck + + // FlightRecorder is a started instance of https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder + FlightRecorder *trace.FlightRecorder + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. + FlightRecorderTargetURL string + + // FlightRecorderPeriod is the time.Duration that is used for + // a slow skipper. If skipper is detected to be slow it tries + // to write out a trace as configured by the FlightRecorderTargetURL. + FlightRecorderPeriod time.Duration } type ( @@ -454,6 +467,10 @@ type Proxy struct { clientTLS *tls.Config hostname string onPanicSometimes rate.Sometimes + flightRecorder *trace.FlightRecorder + flightRecorderURL *url.URL + flightRecorderPeriod time.Duration + flightRecorderCH chan struct{} } // proxyError is used to wrap errors during proxying and to indicate @@ -845,6 +862,51 @@ func WithParams(p Params) *Proxy { maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio, } } + + log := &logging.DefaultLog{} + + var ( + frURL *url.URL + // buffered channel size 10k to allow unblocked requests + frChannel = make(chan struct{}, 10240) + ) + if p.FlightRecorder != nil { + var err error + frURL, err = url.Parse(p.FlightRecorderTargetURL) + if err != nil { + p.FlightRecorder.Stop() + p.FlightRecorder = nil + } else { + go func() { + weekly := 7 * 24 * time.Hour + timer := time.NewTimer(weekly) + defer timer.Stop() + + last := time.Now().Add(-time.Hour) + + for { + select { + case <-frChannel: + // range through all notifications until 1ms there is no notification + // reset timer to write trace after handling all the notifications + timer.Reset(time.Millisecond) + continue + case <-quit: + p.FlightRecorder.Stop() + return + case <-timer.C: + if time.Since(last) >= time.Hour { + writeTrace(p.FlightRecorder, frURL, log, tr) + } + last = time.Now() + + timer.Reset(weekly) + } + } + }() + } + } + return &Proxy{ routing: p.Routing, registry: p.EndpointRegistry, @@ -864,7 +926,7 @@ func WithParams(p Params) *Proxy { maxLoops: p.MaxLoopbacks, breakers: p.CircuitBreakers, limiters: p.RateLimiters, - log: &logging.DefaultLog{}, + log: log, defaultHTTPStatus: defaultHTTPStatus, tracing: newProxyTracing(p.OpenTracing), accessLogDisabled: p.AccessLogDisabled, @@ -873,6 +935,91 @@ func WithParams(p Params) *Proxy { clientTLS: tr.TLSClientConfig, hostname: hostname, onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute}, + flightRecorder: p.FlightRecorder, + flightRecorderURL: frURL, + flightRecorderPeriod: p.FlightRecorderPeriod, + flightRecorderCH: frChannel, + } +} + +func (p *Proxy) writeTraceIfTooSlow(ctx *context, span ot.Span) { + took := time.Since(ctx.startServe) + span.SetTag("proxy.took", took) + + if p.flightRecorder == nil { + return + } + + d := p.flightRecorderPeriod + if d < 1*time.Millisecond && d > took { + return + } + + // signal too slow + p.flightRecorderCH <- struct{}{} +} + +func writeTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) { + n, err := flightRecorder.WriteTo(w) + if err != nil { + switch err { + case trace.ErrSnapshotActive: + return 0, fmt.Errorf("flightRecorder already in progress") + default: + return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err) + } + } else { + log.Infof("FlightRecorder wrote %d bytes", n) + } + + return n, err +} + +func writeTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) { + if flightRecorder == nil || flightRecorderURL == nil { + return + } + + switch flightRecorderURL.Scheme { + case "file": + fd, err := os.Open(flightRecorderURL.Path) + if err != nil { + log.Errorf("Failed to write file %q: %v", err, flightRecorderURL.Path) + return + } + + _, err = writeTraceTo(log, flightRecorder, fd) + if err != nil { + log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err) + } + + case "http", "https": + var b bytes.Buffer + _, err := writeTraceTo(log, flightRecorder, &b) + if err != nil { + log.Errorf("Failed to write trace into in-memory buffer: %v", err) + return + } + + req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b) + if err != nil { + log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err) + } + + rsp, err := roundTripper.RoundTrip(req) + if err != nil { + log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err) + } else { + rsp.Body.Close() + } + switch rsp.StatusCode { + case 200, 201, 204: + log.Infof("Successful send of a trace to %q", flightRecorderURL.String()) + default: + log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status) + } + default: + log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme) } } @@ -1003,7 +1150,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co proxySpanOpts := []ot.StartSpanOption{ot.Tags{ SpanKindTag: SpanKindClient, }} - if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil { + parentSpan := ot.SpanFromContext(req.Context()) + if parentSpan != nil { proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context())) } ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...) @@ -1024,6 +1172,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co ctx.proxySpan.LogKV("http_roundtrip", StartEvent) req = injectClientTrace(req, ctx.proxySpan) + p.writeTraceIfTooSlow(ctx, parentSpan) + response, err := roundTripper.RoundTrip(req) if endpointMetrics != nil { endpointMetrics.IncRequests(routing.IncRequestsOptions{FailedRoundTrip: err != nil}) diff --git a/skipper.go b/skipper.go index 83a12a673a..9da461fe62 100644 --- a/skipper.go +++ b/skipper.go @@ -22,6 +22,7 @@ import ( ot "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "golang.org/x/exp/trace" "github.com/zalando/skipper/circuit" "github.com/zalando/skipper/dataclients/kubernetes" @@ -74,6 +75,9 @@ import ( const ( defaultSourcePollTimeout = 30 * time.Millisecond defaultRoutingUpdateBuffer = 1 << 5 + + defaultFlightRecorderPeriod = 1 * time.Minute + defaultFlightRecorderSize = 1 << 27 // 128 MB ) const DefaultPluginDir = "./plugins" @@ -462,6 +466,21 @@ type Options struct { // MemProfileRate calls runtime.SetMemProfileRate(MemProfileRate) if non zero value, deactivate with <0 MemProfileRate int + // FlightRecorderSize set size of the FlightRecorder https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder.SetSize + FlightRecorderSize int + + // FlightRecorderPeriod set period of the FlightRecorder https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder.SetPeriod + FlightRecorderPeriod time.Duration + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. Skipper + // will try to upload the trace data by an http PUT request to + // this http URL. This is required to set if you want to have + // trace.FlightRecorder + // https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder + // enabled to support Go tool trace. + FlightRecorderTargetURL string + // Flag that enables reporting of the Go garbage collector statistics exported in debug.GCStats EnableDebugGcMetrics bool @@ -2041,6 +2060,34 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { routing := routing.New(ro) defer routing.Close() + frPeriod := defaultFlightRecorderPeriod + var fr *trace.FlightRecorder + if o.FlightRecorderTargetURL != "" { + fr = trace.NewFlightRecorder() + + if o.FlightRecorderPeriod != 0 { + frPeriod = o.FlightRecorderPeriod + } + fr.SetPeriod(frPeriod) + + frSize := defaultFlightRecorderSize + if o.FlightRecorderSize != 0 { + fr.SetSize(o.FlightRecorderSize) + frSize = o.FlightRecorderSize + } else { + fr.SetSize(defaultFlightRecorderSize) + } + + err := fr.Start() + if err != nil { + log.Errorf("Failed to start FlightRecorder: %v", err) + fr.Stop() + fr = nil + } else { + log.Infof("FlightRecorder started with config (%s, %d) target: %s", frPeriod, frSize, o.FlightRecorderTargetURL) + } + } + proxyFlags := proxy.Flags(o.ProxyOptions) | o.ProxyFlags proxyParams := proxy.Params{ Routing: routing, @@ -2069,6 +2116,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { EndpointRegistry: endpointRegistry, EnablePassiveHealthCheck: passiveHealthCheckEnabled, PassiveHealthCheck: passiveHealthCheck, + FlightRecorder: fr, + FlightRecorderTargetURL: o.FlightRecorderTargetURL, + FlightRecorderPeriod: frPeriod, } if o.EnableBreakers || len(o.BreakerSettings) > 0 {