Skip to content

Commit

Permalink
feat: use a global tracer (#1822)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark authored Jun 28, 2024
1 parent 34d4306 commit 4127ac2
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 64 deletions.
12 changes: 4 additions & 8 deletions apps/agent/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ func run(c *cli.Context) error {

logger.Info().Str("file", configFile).Msg("configuration loaded")

tracer := tracing.NewNoop()
{
if cfg.Tracing != nil && cfg.Tracing.Axiom != nil {
t, closeTracer, err := tracing.New(context.Background(), tracing.Config{
closeTracer, err := tracing.Init(context.Background(), tracing.Config{
Dataset: cfg.Tracing.Axiom.Dataset,
Application: "agent",
Version: "1.0.0",
Expand All @@ -91,7 +90,6 @@ func run(c *cli.Context) error {
logger.Error().Err(err).Msg("failed to close tracer")
}
}()
tracer = t
logger.Info().Msg("tracing to axiom")
}
}
Expand Down Expand Up @@ -127,7 +125,7 @@ func run(c *cli.Context) error {
}
}

srv, err := connect.New(connect.Config{Logger: logger, Tracer: tracer, Image: cfg.Image})
srv, err := connect.New(connect.Config{Logger: logger, Image: cfg.Image})
if err != nil {
return err
}
Expand Down Expand Up @@ -160,7 +158,6 @@ func run(c *cli.Context) error {
if cfg.Services.EventRouter != nil {
er, err := eventrouter.New(eventrouter.Config{
Logger: logger,
Tracer: tracer,
BatchSize: cfg.Services.EventRouter.Tinybird.BatchSize,
BufferSize: cfg.Services.EventRouter.Tinybird.BufferSize,
FlushInterval: time.Duration(cfg.Services.EventRouter.Tinybird.FlushInterval) * time.Second,
Expand Down Expand Up @@ -214,15 +211,14 @@ func run(c *cli.Context) error {
if cfg.Services.Ratelimit != nil {
rl, err := ratelimit.New(ratelimit.Config{
Logger: logger,
Tracer: tracer,
Cluster: clus,
})
if err != nil {
logger.Fatal().Err(err).Msg("failed to create service")
}
rl = ratelimit.WithTracing(tracer)(rl)
rl = ratelimit.WithTracing(rl)

srv.AddService(connect.NewRatelimitServer(rl, logger, tracer))
srv.AddService(connect.NewRatelimitServer(rl, logger))
logger.Info().Msg("started ratelimit service")
}

Expand Down
21 changes: 10 additions & 11 deletions apps/agent/pkg/cache/middleware/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ import (
)

type tracingMiddleware[T any] struct {
next cache.Cache[T]
tracer tracing.Tracer
next cache.Cache[T]
}

func WithTracing[T any](c cache.Cache[T], t tracing.Tracer) cache.Cache[T] {
return &tracingMiddleware[T]{next: c, tracer: t}
func WithTracing[T any](c cache.Cache[T]) cache.Cache[T] {
return &tracingMiddleware[T]{next: c}
}

func (mw *tracingMiddleware[T]) Get(ctx context.Context, key string) (T, cache.CacheHit) {
ctx, span := mw.tracer.Start(ctx, "cache.Get", trace.WithAttributes(attribute.String("key", key)))
ctx, span := tracing.Start(ctx, "cache.Get", trace.WithAttributes(attribute.String("key", key)))
defer span.End()

value, hit := mw.next.Get(ctx, key)
Expand All @@ -29,43 +28,43 @@ func (mw *tracingMiddleware[T]) Get(ctx context.Context, key string) (T, cache.C
return value, hit
}
func (mw *tracingMiddleware[T]) Set(ctx context.Context, key string, value T) {
ctx, span := mw.tracer.Start(ctx, "cache.Set", trace.WithAttributes(attribute.String("key", key)))
ctx, span := tracing.Start(ctx, "cache.Set", trace.WithAttributes(attribute.String("key", key)))
defer span.End()

mw.next.Set(ctx, key, value)

}
func (mw *tracingMiddleware[T]) SetNull(ctx context.Context, key string) {
ctx, span := mw.tracer.Start(ctx, "cache.SetNull", trace.WithAttributes(attribute.String("key", key)))
ctx, span := tracing.Start(ctx, "cache.SetNull", trace.WithAttributes(attribute.String("key", key)))
defer span.End()

mw.next.SetNull(ctx, key)

}
func (mw *tracingMiddleware[T]) Remove(ctx context.Context, key string) {
ctx, span := mw.tracer.Start(ctx, "cache.Remove", trace.WithAttributes(attribute.String("key", key)))
ctx, span := tracing.Start(ctx, "cache.Remove", trace.WithAttributes(attribute.String("key", key)))
defer span.End()

mw.next.Remove(ctx, key)

}

func (mw *tracingMiddleware[T]) Dump(ctx context.Context) ([]byte, error) {
ctx, span := mw.tracer.Start(ctx, "cache.Dump")
ctx, span := tracing.Start(ctx, "cache.Dump")
defer span.End()

return mw.next.Dump(ctx)
}

func (mw *tracingMiddleware[T]) Restore(ctx context.Context, data []byte) error {
ctx, span := mw.tracer.Start(ctx, "cache.Restore")
ctx, span := tracing.Start(ctx, "cache.Restore")
defer span.End()

return mw.next.Restore(ctx, data)
}

func (mw *tracingMiddleware[T]) Clear(ctx context.Context) {
ctx, span := mw.tracer.Start(ctx, "cache.Clear")
ctx, span := tracing.Start(ctx, "cache.Clear")
defer span.End()

mw.next.Clear(ctx)
Expand Down
7 changes: 3 additions & 4 deletions apps/agent/pkg/connect/middleware_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ import (
)

type tracingMiddleware struct {
tracer tracing.Tracer
handler http.Handler
}

func newTracingMiddleware(handler http.Handler, tracer tracing.Tracer) http.Handler {
return &tracingMiddleware{handler: handler, tracer: tracer}
func newTracingMiddleware(handler http.Handler) http.Handler {
return &tracingMiddleware{handler: handler}
}

func (h *tracingMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, span := h.tracer.Start(r.Context(), "request")
ctx, span := tracing.Start(r.Context(), "request")
defer span.End()

h.handler.ServeHTTP(w, r.WithContext(ctx))
Expand Down
8 changes: 3 additions & 5 deletions apps/agent/pkg/connect/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
type ratelimitServer struct {
svc ratelimit.Service
logger logging.Logger
tracer tracing.Tracer
ratelimitv1connect.UnimplementedRatelimitServiceHandler
}

func NewRatelimitServer(svc ratelimit.Service, logger logging.Logger, tracer tracing.Tracer) *ratelimitServer {
func NewRatelimitServer(svc ratelimit.Service, logger logging.Logger) *ratelimitServer {

return &ratelimitServer{
svc: svc,
logger: logger,
tracer: tracer,
}

}
Expand All @@ -48,7 +46,7 @@ func (s *ratelimitServer) Ratelimit(
Msg("connect.Ratelimit")
}()

ctx, span := s.tracer.Start(ctx, "ratelimit.Ratelimit")
ctx, span := tracing.Start(ctx, "ratelimit.Ratelimit")
defer span.End()
authorization := req.Header().Get("Authorization")
err := auth.Authorize(ctx, authorization)
Expand All @@ -75,7 +73,7 @@ func (s *ratelimitServer) PushPull(
Int64("latency", time.Since(start).Milliseconds()).
Msg("connect.PushPull")
}()
ctx, span := s.tracer.Start(ctx, "ratelimit.PushPull")
ctx, span := tracing.Start(ctx, "ratelimit.PushPull")
defer span.End()
authorization := req.Header().Get("Authorization")
err := auth.Authorize(ctx, authorization)
Expand Down
6 changes: 1 addition & 5 deletions apps/agent/pkg/connect/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/bufbuild/connect-go"
ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
Expand All @@ -22,7 +21,6 @@ type Service interface {
type Server struct {
sync.Mutex
logger logging.Logger
tracer tracing.Tracer
mux *http.ServeMux
shutdownC chan struct{}
isShuttingDown bool
Expand All @@ -32,15 +30,13 @@ type Server struct {

type Config struct {
Logger logging.Logger
Tracer tracing.Tracer
Image string
}

func New(cfg Config) (*Server, error) {

return &Server{
logger: cfg.Logger,
tracer: cfg.Tracer,
isListening: false,
isShuttingDown: false,
mux: http.NewServeMux(),
Expand All @@ -54,7 +50,7 @@ func (s *Server) AddService(svc Service) {
pattern, handler := svc.CreateHandler()
s.logger.Info().Str("pattern", pattern).Msg("adding service")

h := newTracingMiddleware(newHeaderMiddleware(newLoggingMiddleware(handler, s.logger)), s.tracer)
h := newTracingMiddleware(newHeaderMiddleware(newLoggingMiddleware(handler, s.logger)))
s.mux.Handle(pattern, h)
}

Expand Down
20 changes: 8 additions & 12 deletions apps/agent/pkg/tracing/axiom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@ import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

axiom "github.com/axiomhq/axiom-go/axiom/otel"
"go.opentelemetry.io/otel"
)

type Tracer trace.Tracer

type Config struct {
Dataset string
Application string
Version string
AxiomToken string
}

func New(ctx context.Context, config Config) (Tracer, func() error, error) {
// Coser is a function that closes the global tracer.
type Closer func() error

func Init(ctx context.Context, config Config) (Closer, error) {

close, err := axiom.InitTracing(
ctx,
Expand All @@ -31,12 +30,9 @@ func New(ctx context.Context, config Config) (Tracer, func() error, error) {
)

if err != nil {
return nil, nil, fmt.Errorf("unable to init tracing: %w", err)
return nil, fmt.Errorf("unable to init tracing: %w", err)
}

return otel.Tracer("main"), close, nil
}

func NewNoop() Tracer {
return trace.NewNoopTracerProvider().Tracer("noop")
globalTracer = otel.Tracer("main")
return close, nil
}
18 changes: 18 additions & 0 deletions apps/agent/pkg/tracing/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tracing

import (
"context"

"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

var globalTracer trace.Tracer

func init() {
globalTracer = noop.NewTracerProvider().Tracer("noop")
}

func Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return globalTracer.Start(ctx, name, opts...)
}
5 changes: 1 addition & 4 deletions apps/agent/services/eventrouter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ type Config struct {

Tinybird *tinybird.Client
Logger logging.Logger
Tracer tracing.Tracer
}

type service struct {
logger logging.Logger
tracer tracing.Tracer
batcher batch.BatchProcessor[event]
tb *tinybird.Client
}
Expand Down Expand Up @@ -69,7 +67,6 @@ func New(config Config) (*service, error) {
return &service{
logger: config.Logger,
batcher: *batcher,
tracer: config.Tracer,
tb: config.Tinybird,
}, nil
}
Expand All @@ -80,7 +77,7 @@ func (s *service) CreateHandler() (string, http.Handler) {
defer func() {
s.logger.Info().Str("method", r.Method).Str("path", r.URL.Path).Int64("serviceLatency", time.Since(start).Milliseconds()).Msg("request")
}()
ctx, span := s.tracer.Start(r.Context(), "events")
ctx, span := tracing.Start(r.Context(), "events")
defer span.End()

err := auth.Authorize(ctx, r.Header.Get("Authorization"))
Expand Down
14 changes: 5 additions & 9 deletions apps/agent/services/ratelimit/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@ import (
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
)

func WithTracing(tracer tracing.Tracer) Middleware {

return func(svc Service) Service {
return &tracingMiddleware{next: svc, tracer: tracer}
}
func WithTracing(svc Service) Service {
return &tracingMiddleware{next: svc}
}

type tracingMiddleware struct {
next Service
tracer tracing.Tracer
next Service
}

func (mw *tracingMiddleware) Ratelimit(ctx context.Context, req *ratelimitv1.RatelimitRequest) (res *ratelimitv1.RatelimitResponse, err error) {

ctx, span := mw.tracer.Start(ctx, tracing.NewSpanName("ratelimit", "Ratelimit"))
ctx, span := tracing.Start(ctx, tracing.NewSpanName("ratelimit", "Ratelimit"))
defer span.End()

res, err = mw.next.Ratelimit(ctx, req)
Expand All @@ -32,7 +28,7 @@ func (mw *tracingMiddleware) Ratelimit(ctx context.Context, req *ratelimitv1.Rat
}

func (mw *tracingMiddleware) PushPull(ctx context.Context, req *ratelimitv1.PushPullRequest) (res *ratelimitv1.PushPullResponse, err error) {
ctx, span := mw.tracer.Start(ctx, tracing.NewSpanName("ratelimit", "PushPull"))
ctx, span := tracing.Start(ctx, tracing.NewSpanName("ratelimit", "PushPull"))
defer span.End()

res, err = mw.next.PushPull(ctx, req)
Expand Down
3 changes: 2 additions & 1 deletion apps/agent/services/ratelimit/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1"
"github.com/unkeyed/unkey/apps/agent/pkg/ratelimit"
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
)

func (s *service) PushPull(ctx context.Context, req *ratelimitv1.PushPullRequest) (*ratelimitv1.PushPullResponse, error) {
Expand All @@ -15,7 +16,7 @@ func (s *service) PushPull(ctx context.Context, req *ratelimitv1.PushPullRequest
Int64("latency", time.Since(start).Milliseconds()).
Msg("service.PushPull")
}()
ctx, span := s.tracer.Start(ctx, "PushPull")
ctx, span := tracing.Start(ctx, "PushPull")
defer span.End()
res := s.ratelimiter.Take(ratelimit.RatelimitRequest{
Identifier: req.Identifier,
Expand Down
3 changes: 2 additions & 1 deletion apps/agent/services/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1"
"github.com/unkeyed/unkey/apps/agent/pkg/ratelimit"
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
)

func (s *service) Ratelimit(ctx context.Context, req *ratelimitv1.RatelimitRequest) (*ratelimitv1.RatelimitResponse, error) {
Expand All @@ -15,7 +16,7 @@ func (s *service) Ratelimit(ctx context.Context, req *ratelimitv1.RatelimitReque
Int64("latency", time.Since(start).Milliseconds()).
Msg("service.Ratelimit")
}()
_, span := s.tracer.Start(ctx, "Ratelimit")
_, span := tracing.Start(ctx, "Ratelimit")
defer span.End()
res := s.ratelimiter.Take(ratelimit.RatelimitRequest{
Identifier: req.Identifier,
Expand Down
Loading

0 comments on commit 4127ac2

Please sign in to comment.