Skip to content

Commit

Permalink
feature: flightrecorder to enable Go trace
Browse files Browse the repository at this point in the history
feature: allow configuration for Go x/trace.FlightRecorder
pass default period to define what means skipper to be slow

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs committed Sep 10, 2024
1 parent bd87e31 commit 40a3067
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 13 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
BlockProfileRate int `yaml:"block-profile-rate"`
MutexProfileFraction int `yaml:"mutex-profile-fraction"`
MemProfileRate int `yaml:"memory-profile-rate"`
FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"`
DebugGcMetrics bool `yaml:"debug-gc-metrics"`
RuntimeMetrics bool `yaml:"runtime-metrics"`
ServeRouteMetrics bool `yaml:"serve-route-metrics"`
Expand Down Expand Up @@ -369,6 +370,7 @@ func NewConfig() *Config {

// logging, metrics, tracing:
flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation")
flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span")
flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.")
Expand All @@ -382,7 +384,7 @@ func NewConfig() *Config {
flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate")
flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction")
flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.SetMemProfileRate, keeps default 512 kB")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.")
flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats")
flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")
flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route")
Expand Down Expand Up @@ -755,6 +757,7 @@ func (c *Config) ToOptions() skipper.Options {
EnableProfile: c.EnableProfile,
BlockProfileRate: c.BlockProfileRate,
MutexProfileFraction: c.MutexProfileFraction,
FlightRecorderTargetURL: c.FlightRecorderTargetURL,
EnableDebugGcMetrics: c.DebugGcMetrics,
EnableRuntimeMetrics: c.RuntimeMetrics,
EnableServeRouteMetrics: c.ServeRouteMetrics,
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/yuin/gopher-lua v1.1.1
go4.org/netipx v0.0.0-20220925034521-797b0c90d8ab
golang.org/x/crypto v0.27.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/net v0.29.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -169,10 +169,10 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/tools v0.25.0 // indirect
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
Expand All @@ -182,4 +182,6 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

go 1.22
go 1.22.0

toolchain go1.23.0
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand All @@ -552,8 +552,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -638,8 +638,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
91 changes: 91 additions & 0 deletions proxy/flightrecorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package proxy_test

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/diag"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/proxy/proxytest"
xtrace "golang.org/x/exp/trace"
)

func TestFlightRecorder(t *testing.T) {
ch := make(chan int)
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed)))
ch <- http.StatusMethodNotAllowed
return
}

var buf bytes.Buffer
n, err := io.Copy(&buf, r.Body)
if err != nil {
t.Fatalf("Failed to copy data: %v", err)
}
if n < 100 {
t.Fatalf("Failed to write enough data: %d bytes", n)
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte(http.StatusText(http.StatusCreated)))
ch <- http.StatusCreated
}))
defer service.Close()

backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
}))
defer backend.Close()

flightRecorder := xtrace.NewFlightRecorder()
flightRecorder.Start()

spec := diag.NewLatency()
fr := make(filters.Registry)
fr.Register(spec)

doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL)
rr := eskip.MustParse(doc)

pr := proxytest.WithParams(fr, proxy.Params{
FlightRecorder: flightRecorder,
FlightRecorderTargetURL: service.URL,
FlightRecorderPeriod: 90 * time.Millisecond,
}, rr...)
defer pr.Close()

rsp, err := pr.Client().Get(pr.URL)
if err != nil {
t.Fatalf("Failed to GET %q: %v", pr.URL, err)
}
defer rsp.Body.Close()
_, err = io.ReadAll(rsp.Body)
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

switch rsp.StatusCode {
case http.StatusOK:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}

statusCode := <-ch
switch statusCode {
case http.StatusCreated:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}
}
147 changes: 145 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"unicode/utf8"

"golang.org/x/exp/maps"
"golang.org/x/exp/trace"
"golang.org/x/time/rate"

ot "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -360,6 +361,18 @@ type Params struct {

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck

// FlightRecorder is a started instance of https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder
FlightRecorder *trace.FlightRecorder

// FlightRecorderTargetURL is the target to write the trace
// to. Supported targets are http URL and file URL.
FlightRecorderTargetURL string

// FlightRecorderPeriod is the time.Duration that is used for
// a slow skipper. If skipper is detected to be slow it tries
// to write out a trace as configured by the FlightRecorderTargetURL.
FlightRecorderPeriod time.Duration
}

type (
Expand Down Expand Up @@ -454,6 +467,10 @@ type Proxy struct {
clientTLS *tls.Config
hostname string
onPanicSometimes rate.Sometimes
flightRecorder *trace.FlightRecorder
flightRecorderURL *url.URL
flightRecorderPeriod time.Duration
flightRecorderCH chan struct{}
}

// proxyError is used to wrap errors during proxying and to indicate
Expand Down Expand Up @@ -845,6 +862,48 @@ func WithParams(p Params) *Proxy {
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
}
}

log := &logging.DefaultLog{}

var (
frURL *url.URL
// buffered channel size 10k to allow unblocked requests
frChannel = make(chan struct{}, 10240)
)
if p.FlightRecorder != nil {
var err error
frURL, err = url.Parse(p.FlightRecorderTargetURL)
if err != nil {
p.FlightRecorder.Stop()
p.FlightRecorder = nil
} else {
go func() {
d := 7 * 24 * time.Hour
last := time.Now().Add(-time.Hour)

for {
select {
case <-frChannel:
// range through all notifications until 1ms there is no notification
d = time.Millisecond
continue
case <-quit:
p.FlightRecorder.Stop()
return
case <-time.After(d):
if time.Since(last) >= time.Hour {
writeTrace(p.FlightRecorder, frURL, log, tr)
}
last = time.Now()

// reset d
d = 7 * 24 * time.Hour
}
}
}()
}
}

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
Expand All @@ -864,7 +923,7 @@ func WithParams(p Params) *Proxy {
maxLoops: p.MaxLoopbacks,
breakers: p.CircuitBreakers,
limiters: p.RateLimiters,
log: &logging.DefaultLog{},
log: log,
defaultHTTPStatus: defaultHTTPStatus,
tracing: newProxyTracing(p.OpenTracing),
accessLogDisabled: p.AccessLogDisabled,
Expand All @@ -873,6 +932,87 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
flightRecorder: p.FlightRecorder,
flightRecorderURL: frURL,
flightRecorderPeriod: p.FlightRecorderPeriod,
flightRecorderCH: frChannel,
}
}

func (p *Proxy) writeTraceIfTooSlow(ctx *context, span ot.Span) {
took := time.Since(ctx.startServe)
span.SetTag("proxy.took", took)

d := p.flightRecorderPeriod
if d < 1*time.Millisecond && d > took {
return
}

// signal too slow
p.flightRecorderCH <- struct{}{}
}

func writeTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
n, err := flightRecorder.WriteTo(w)
if err != nil {
switch err {
case trace.ErrSnapshotActive:
return 0, fmt.Errorf("flightRecorder already in progress")
default:
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
}
} else {
log.Infof("FlightRecorder wrote %d bytes", n)
}

return n, err
}

func writeTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
if flightRecorder == nil || flightRecorderURL == nil {
return
}

switch flightRecorderURL.Scheme {
case "file":
fd, err := os.Open(flightRecorderURL.Path)
if err != nil {
log.Errorf("Failed to write file %q: %v", err, flightRecorderURL.Path)
return
}

_, err = writeTraceTo(log, flightRecorder, fd)
if err != nil {
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
}

case "http", "https":
var b bytes.Buffer
_, err := writeTraceTo(log, flightRecorder, &b)
if err != nil {
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
return
}

req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
if err != nil {
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
}

rsp, err := roundTripper.RoundTrip(req)
if err != nil {
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
} else {
rsp.Body.Close()
}
switch rsp.StatusCode {
case 200, 201, 204:
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
default:
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
default:
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
}
}

Expand Down Expand Up @@ -1003,7 +1143,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindClient,
}}
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
parentSpan := ot.SpanFromContext(req.Context())
if parentSpan != nil {
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
}
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
Expand All @@ -1024,6 +1165,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
ctx.proxySpan.LogKV("http_roundtrip", StartEvent)
req = injectClientTrace(req, ctx.proxySpan)

p.writeTraceIfTooSlow(ctx, parentSpan)

response, err := roundTripper.RoundTrip(req)
if endpointMetrics != nil {
endpointMetrics.IncRequests(routing.IncRequestsOptions{FailedRoundTrip: err != nil})
Expand Down
Loading

0 comments on commit 40a3067

Please sign in to comment.