Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: flightrecorder to enable Go trace #2985

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
BlockProfileRate int `yaml:"block-profile-rate"`
MutexProfileFraction int `yaml:"mutex-profile-fraction"`
MemProfileRate int `yaml:"memory-profile-rate"`
FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"`
DebugGcMetrics bool `yaml:"debug-gc-metrics"`
RuntimeMetrics bool `yaml:"runtime-metrics"`
ServeRouteMetrics bool `yaml:"serve-route-metrics"`
Expand Down Expand Up @@ -369,6 +370,7 @@ func NewConfig() *Config {

// logging, metrics, tracing:
flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
szuecs marked this conversation as resolved.
Show resolved Hide resolved
flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation")
flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span")
flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.")
Expand All @@ -382,7 +384,7 @@ func NewConfig() *Config {
flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate")
flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction")
flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.SetMemProfileRate, keeps default 512 kB")
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.")
flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats")
flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")
flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route")
Expand Down Expand Up @@ -755,6 +757,7 @@ func (c *Config) ToOptions() skipper.Options {
EnableProfile: c.EnableProfile,
BlockProfileRate: c.BlockProfileRate,
MutexProfileFraction: c.MutexProfileFraction,
FlightRecorderTargetURL: c.FlightRecorderTargetURL,
EnableDebugGcMetrics: c.DebugGcMetrics,
EnableRuntimeMetrics: c.RuntimeMetrics,
EnableServeRouteMetrics: c.ServeRouteMetrics,
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/yuin/gopher-lua v1.1.1
go4.org/netipx v0.0.0-20220925034521-797b0c90d8ab
golang.org/x/crypto v0.27.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/net v0.29.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -169,10 +169,10 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/tools v0.25.0 // indirect
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
Expand All @@ -182,4 +182,6 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

go 1.22
go 1.22.0

toolchain go1.23.0
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand All @@ -552,8 +552,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -638,8 +638,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
91 changes: 91 additions & 0 deletions proxy/flightrecorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package proxy_test

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/diag"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/proxy/proxytest"
xtrace "golang.org/x/exp/trace"
)

func TestFlightRecorder(t *testing.T) {
ch := make(chan int)
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed)))
ch <- http.StatusMethodNotAllowed
return
}

var buf bytes.Buffer
n, err := io.Copy(&buf, r.Body)
if err != nil {
t.Fatalf("Failed to copy data: %v", err)
}
if n < 100 {
t.Fatalf("Failed to write enough data: %d bytes", n)
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte(http.StatusText(http.StatusCreated)))
ch <- http.StatusCreated
}))
defer service.Close()

backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
}))
defer backend.Close()

flightRecorder := xtrace.NewFlightRecorder()
flightRecorder.Start()

spec := diag.NewLatency()
fr := make(filters.Registry)
fr.Register(spec)

doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL)
rr := eskip.MustParse(doc)

pr := proxytest.WithParams(fr, proxy.Params{
FlightRecorder: flightRecorder,
FlightRecorderTargetURL: service.URL,
FlightRecorderPeriod: 90 * time.Millisecond,
}, rr...)
defer pr.Close()

rsp, err := pr.Client().Get(pr.URL)
if err != nil {
t.Fatalf("Failed to GET %q: %v", pr.URL, err)
}
defer rsp.Body.Close()
_, err = io.ReadAll(rsp.Body)
if err != nil {
t.Fatalf("Failed to read body: %v", err)
}

switch rsp.StatusCode {
case http.StatusOK:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}

statusCode := <-ch
switch statusCode {
case http.StatusCreated:
// ok
default:
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
}
}
154 changes: 152 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"unicode/utf8"

"golang.org/x/exp/maps"
"golang.org/x/exp/trace"
"golang.org/x/time/rate"

ot "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -360,6 +361,18 @@ type Params struct {

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck

// FlightRecorder is a started instance of https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder
FlightRecorder *trace.FlightRecorder

// FlightRecorderTargetURL is the target to write the trace
// to. Supported targets are http URL and file URL.
FlightRecorderTargetURL string

// FlightRecorderPeriod is the time.Duration that is used for
// a slow skipper. If skipper is detected to be slow it tries
// to write out a trace as configured by the FlightRecorderTargetURL.
FlightRecorderPeriod time.Duration
}

type (
Expand Down Expand Up @@ -454,6 +467,10 @@ type Proxy struct {
clientTLS *tls.Config
hostname string
onPanicSometimes rate.Sometimes
flightRecorder *trace.FlightRecorder
flightRecorderURL *url.URL
flightRecorderPeriod time.Duration
flightRecorderCH chan struct{}
}

// proxyError is used to wrap errors during proxying and to indicate
Expand Down Expand Up @@ -845,6 +862,51 @@ func WithParams(p Params) *Proxy {
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
}
}

log := &logging.DefaultLog{}

var (
frURL *url.URL
// buffered channel size 10k to allow unblocked requests
frChannel = make(chan struct{}, 10240)
)
if p.FlightRecorder != nil {
var err error
frURL, err = url.Parse(p.FlightRecorderTargetURL)
if err != nil {
p.FlightRecorder.Stop()
p.FlightRecorder = nil
} else {
go func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forever runs and tries to be an efficient way of handling multiple goroutines fire "we are slow" and write only once in an hour trace data to check.

weekly := 7 * 24 * time.Hour
timer := time.NewTimer(weekly)
defer timer.Stop()

last := time.Now().Add(-time.Hour)

for {
select {
case <-frChannel:
// range through all notifications until 1ms there is no notification
// reset timer to write trace after handling all the notifications
timer.Reset(time.Millisecond)
continue
case <-quit:
p.FlightRecorder.Stop()
return
case <-timer.C:
if time.Since(last) >= time.Hour {
writeTrace(p.FlightRecorder, frURL, log, tr)
}
last = time.Now()

timer.Reset(weekly)
}
}
}()
}
}

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
Expand All @@ -864,7 +926,7 @@ func WithParams(p Params) *Proxy {
maxLoops: p.MaxLoopbacks,
breakers: p.CircuitBreakers,
limiters: p.RateLimiters,
log: &logging.DefaultLog{},
log: log,
defaultHTTPStatus: defaultHTTPStatus,
tracing: newProxyTracing(p.OpenTracing),
accessLogDisabled: p.AccessLogDisabled,
Expand All @@ -873,6 +935,91 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
flightRecorder: p.FlightRecorder,
flightRecorderURL: frURL,
flightRecorderPeriod: p.FlightRecorderPeriod,
flightRecorderCH: frChannel,
}
}

func (p *Proxy) writeTraceIfTooSlow(ctx *context, span ot.Span) {
took := time.Since(ctx.startServe)
span.SetTag("proxy.took", took)

if p.flightRecorder == nil {
return
}

d := p.flightRecorderPeriod
if d < 1*time.Millisecond && d > took {
return
}

// signal too slow
p.flightRecorderCH <- struct{}{}
}

func writeTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
n, err := flightRecorder.WriteTo(w)
if err != nil {
switch err {
case trace.ErrSnapshotActive:
return 0, fmt.Errorf("flightRecorder already in progress")
default:
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
}
} else {
log.Infof("FlightRecorder wrote %d bytes", n)
}

return n, err
}

func writeTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
if flightRecorder == nil || flightRecorderURL == nil {
return
}

switch flightRecorderURL.Scheme {
case "file":
fd, err := os.Open(flightRecorderURL.Path)
if err != nil {
log.Errorf("Failed to write file %q: %v", err, flightRecorderURL.Path)
return
}

_, err = writeTraceTo(log, flightRecorder, fd)
if err != nil {
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
}

case "http", "https":
var b bytes.Buffer
_, err := writeTraceTo(log, flightRecorder, &b)
if err != nil {
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
return
}

req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
if err != nil {
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
}

rsp, err := roundTripper.RoundTrip(req)
if err != nil {
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
} else {
rsp.Body.Close()
}
switch rsp.StatusCode {
case 200, 201, 204:
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
default:
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
}
default:
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
}
}

Expand Down Expand Up @@ -1003,7 +1150,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindClient,
}}
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
parentSpan := ot.SpanFromContext(req.Context())
if parentSpan != nil {
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
}
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
Expand All @@ -1024,6 +1172,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
ctx.proxySpan.LogKV("http_roundtrip", StartEvent)
req = injectClientTrace(req, ctx.proxySpan)

p.writeTraceIfTooSlow(ctx, parentSpan)

response, err := roundTripper.RoundTrip(req)
if endpointMetrics != nil {
endpointMetrics.IncRequests(routing.IncRequestsOptions{FailedRoundTrip: err != nil})
Expand Down
Loading
Loading