diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 573b1fad0..17a6b90d9 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -23,6 +23,6 @@ jobs: run: go build $(go list ./... | grep -v /example) - name: Upload coverage - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v3 with: file: coverage.txt diff --git a/cli/serve.go b/cli/serve.go index effe3f8ff..28d881f84 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -25,7 +25,6 @@ import ( "github.com/yomorun/yomo/core/router" pkgconfig "github.com/yomorun/yomo/pkg/config" "github.com/yomorun/yomo/pkg/log" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/pkg/bridge/ai" "github.com/yomorun/yomo/pkg/bridge/ai/provider/azopenai" @@ -51,16 +50,10 @@ var serveCmd = &cobra.Command{ return } ctx := context.Background() - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-zipper") - if err == nil { - log.InfoStatusEvent(os.Stdout, "[zipper] 🛰 trace enabled") - } - defer shutdown(ctx) // listening address. listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port) - options := []yomo.ZipperOption{yomo.WithZipperTracerProvider(tp)} + options := []yomo.ZipperOption{} tokenString := "" if _, ok := conf.Auth["type"]; ok { if tokenString, ok = conf.Auth["token"]; ok { diff --git a/cli/serverless/deno/runtime.go b/cli/serverless/deno/runtime.go index d2f825c1e..738b6e6a5 100644 --- a/cli/serverless/deno/runtime.go +++ b/cli/serverless/deno/runtime.go @@ -2,7 +2,6 @@ package deno import ( - "context" "encoding/binary" "errors" "io" @@ -15,7 +14,6 @@ import ( "github.com/yomorun/yomo" "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/pkg/file" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -90,17 +88,10 @@ func runDeno(jsPath string, socketPath string, errCh chan<- error) { } func startSfn(name string, zipperAddr string, credential string, observed []frame.Tag, conn net.Conn, errCh chan<- error) (yomo.StreamFunction, error) { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - log.Println("[sfn] 🛰 trace enabled") - } - defer shutdown(context.Background()) sfn := yomo.NewStreamFunction( name, zipperAddr, yomo.WithSfnCredential(credential), - yomo.WithSfnTracerProvider(tp), ) // init @@ -168,7 +159,7 @@ func startSfn(name string, zipperAddr string, credential string, observed []fram }, ) - err = sfn.Connect() + err := sfn.Connect() if err != nil { return nil, err } diff --git a/cli/serverless/golang/templates/main.tmpl b/cli/serverless/golang/templates/main.tmpl index f4c89521b..be686b5aa 100644 --- a/cli/serverless/golang/templates/main.tmpl +++ b/cli/serverless/golang/templates/main.tmpl @@ -24,17 +24,10 @@ func main() { } func runSFN(name string, addr string, credential string) (closeFn func() error, err error) { - // trace - tp, shutdown, e := trace.NewTracerProvider("yomo-sfn") - if e == nil { - log.Println("[sfn] 🛰 trace enabled") - } - defer shutdown(context.Background()) sfn := yomo.NewStreamFunction( name, addr, yomo.WithCredential(credential), - yomo.WithTracerProvider(tp), ) closeFn = sfn.Close diff --git a/cli/serverless/golang/templates/main_rx.tmpl b/cli/serverless/golang/templates/main_rx.tmpl index 55ae2eba1..e291434d7 100644 --- a/cli/serverless/golang/templates/main_rx.tmpl +++ b/cli/serverless/golang/templates/main_rx.tmpl @@ -24,17 +24,11 @@ func main() { } func runSFN(name string, addr string, credential string) (closeFn func() error, err error) { - // trace - tp, shutdown, e := trace.NewTracerProvider("yomo-sfn") - if e == nil { - log.Println("[sfn] 🛰 trace enabled") - } defer shutdown(context.Background()) sfn := yomo.NewStreamFunction( name, addr, yomo.WithCredential(credential), - yomo.WithTracerProvider(tp), ) closeFn = sfn.Close diff --git a/cli/serverless/wasm/serverless.go b/cli/serverless/wasm/serverless.go index 002c734c9..1a40e12dd 100644 --- a/cli/serverless/wasm/serverless.go +++ b/cli/serverless/wasm/serverless.go @@ -2,7 +2,6 @@ package wasm import ( - "context" "log" "os" "sync" @@ -10,7 +9,6 @@ import ( "github.com/yomorun/yomo" cli "github.com/yomorun/yomo/cli/serverless" pkglog "github.com/yomorun/yomo/pkg/log" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -53,21 +51,13 @@ func (s *wasmServerless) Build(clean bool) error { // Run the wasm serverless function func (s *wasmServerless) Run(verbose bool) error { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - pkglog.InfoStatusEvent(os.Stdout, "[sfn] 🛰 trace enabled") - } - defer shutdown(context.Background()) - sfn := yomo.NewStreamFunction( s.name, s.zipperAddr, yomo.WithSfnCredential(s.credential), - yomo.WithSfnTracerProvider(tp), ) // init - err = sfn.Init(func() error { + err := sfn.Init(func() error { return s.runtime.RunInit() }) if err != nil { diff --git a/core/client.go b/core/client.go index dbb15dc7b..cd879023b 100644 --- a/core/client.go +++ b/core/client.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "runtime" "time" @@ -16,24 +15,22 @@ import ( "github.com/yomorun/yomo/pkg/frame-codec/y3codec" "github.com/yomorun/yomo/pkg/id" yquic "github.com/yomorun/yomo/pkg/listener/quic" - oteltrace "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" ) // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { - zipperAddr string - name string // name of the client - clientID string // id of the client - reconnCounter uint // counter for reconnection - clientType ClientType // type of the client - processor func(*frame.DataFrame) // function to invoke when data arrived - errorfn func(error) // function to invoke when error occured - wantedTarget string - opts *clientOptions - Logger *slog.Logger - tracerProvider oteltrace.TracerProvider + zipperAddr string + name string // name of the client + clientID string // id of the client + reconnCounter uint // counter for reconnection + clientType ClientType // type of the client + processor func(*frame.DataFrame) // function to invoke when data arrived + errorfn func(error) // function to invoke when error occured + wantedTarget string + opts *clientOptions + Logger *slog.Logger // ctx and ctxCancel manage the lifecycle of client. ctx context.Context @@ -68,16 +65,15 @@ func NewClient(appName, zipperAddr string, clientType ClientType, opts ...Client ctx, ctxCancel := context.WithCancelCause(context.Background()) return &Client{ - zipperAddr: zipperAddr, - name: appName, - clientID: clientID, - processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") }, - clientType: clientType, - opts: option, - Logger: logger, - tracerProvider: option.tracerProvider, - ctx: ctx, - ctxCancel: ctxCancel, + zipperAddr: zipperAddr, + name: appName, + clientID: clientID, + processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") }, + clientType: clientType, + opts: option, + Logger: logger, + ctx: ctx, + ctxCancel: ctxCancel, done: make(chan struct{}), wrCh: make(chan frame.Frame), @@ -429,14 +425,3 @@ type Downstream interface { Close() error Connect(context.Context) error } - -// TracerProvider returns the tracer provider of client. -func (c *Client) TracerProvider() oteltrace.TracerProvider { - if c.tracerProvider == nil { - return nil - } - if reflect.ValueOf(c.tracerProvider).IsNil() { - return nil - } - return c.tracerProvider -} diff --git a/core/client_options.go b/core/client_options.go index 3388f9993..3f41572e4 100644 --- a/core/client_options.go +++ b/core/client_options.go @@ -16,7 +16,6 @@ import ( "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/ylog" pkgtls "github.com/yomorun/yomo/pkg/tls" - "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" ) @@ -32,7 +31,6 @@ type clientOptions struct { reconnect bool nonBlockWrite bool logger *slog.Logger - tracerProvider trace.TracerProvider // ai function aiFunctionInputModel any aiFunctionDescription string @@ -107,13 +105,6 @@ func WithLogger(logger *slog.Logger) ClientOption { } } -// WithTracerProvider sets tracer provider for the client. -func WithTracerProvider(tp trace.TracerProvider) ClientOption { - return func(o *clientOptions) { - o.tracerProvider = tp - } -} - // WithAIFunctionDefinition sets AI function definition for the client. func WithAIFunctionDefinition(description string, inputModel any) ClientOption { return func(o *clientOptions) { diff --git a/core/client_test.go b/core/client_test.go index 0f45beb57..2d0900c2a 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -163,7 +163,7 @@ func TestFrameRoundTrip(t *testing.T) { exited = checkClientExited(sfn2, time.Second) assert.False(t, exited, "sfn stream should not exited") - sfnMd := NewMetadata(source.clientID, "tid", "trace-id", "span-id", false) + sfnMd := NewMetadata(source.clientID, "tid") sfnMetaBytes, _ := sfnMd.Encode() @@ -186,7 +186,7 @@ func TestFrameRoundTrip(t *testing.T) { assert.ElementsMatch(t, nameList, []string{"source", "sfn-1", "sfn-2"}) md := metadata.New( - NewMetadata(source.clientID, "tid", "trace-id", "span-id", false), + NewMetadata(source.clientID, "tid"), metadata.M{ "foo": "bar", }, diff --git a/core/metadata.go b/core/metadata.go index c1f8f2a11..9994a3256 100644 --- a/core/metadata.go +++ b/core/metadata.go @@ -2,21 +2,15 @@ package core import ( "github.com/yomorun/yomo/core/metadata" - "github.com/yomorun/yomo/pkg/id" - "github.com/yomorun/yomo/pkg/trace" - oteltrace "go.opentelemetry.io/otel/trace" - "golang.org/x/exp/slog" ) // NewMetadata returns metadata for yomo working. -func NewMetadata(sourceID, tid string, traceID string, spanID string, traced bool) metadata.M { - return metadata.M{ +func NewMetadata(sourceID, tid string) metadata.M { + md := metadata.M{ metadata.SourceIDKey: sourceID, metadata.TIDKey: tid, - metadata.TraceIDKey: traceID, - metadata.SpanIDKey: spanID, - metadata.TracedKey: tracedString(traced), } + return md } // GetTIDFromMetadata gets TID from metadata. @@ -25,152 +19,7 @@ func GetTIDFromMetadata(m metadata.M) string { return tid } -// GetTracedFromMetadata gets traced from metadata. -func GetTracedFromMetadata(m metadata.M) bool { - tracedString, _ := m.Get(metadata.TracedKey) - return tracedString == "true" -} - // SetMetadataTarget sets target in metadata. func SetMetadataTarget(m metadata.M, target string) { m.Set(metadata.TargetKey, target) } - -// InitialSourceMetadata generates initial source metadata with trace information. -// the span name typically corresponds to the source's name. -func InitialSourceMetadata( - sourceID, tid string, - spanName string, - tp oteltrace.TracerProvider, logger *slog.Logger, -) (metadata.M, func()) { - return initialMetadata(sourceID, tid, "Source", spanName, tp, logger) -} - -// InitialSfnMetadata generates initial sfn metadata with trace information. -// the span name typically corresponds to the sfn's name. -func InitialSfnMetadata( - sourceID, tid string, - spanName string, - tp oteltrace.TracerProvider, logger *slog.Logger, -) (metadata.M, func()) { - return initialMetadata(sourceID, tid, "StreamFunction", spanName, tp, logger) -} - -// initialMetadata builds a metadata with trace information. -// the tracer name is `Source` or `StreamFunction`. -// span name typically corresponds to the source's name or sfn's name. -func initialMetadata( - sourceID, tid string, - tracerName string, - spanName string, - tp oteltrace.TracerProvider, logger *slog.Logger, -) (metadata.M, func()) { - var ( - traceID string - spanID string - traced bool - endFn = func() {} - ) - if tp != nil { - span, err := trace.NewSpan(tp, "Source", spanName, "", "") - if err != nil { - logger.Debug("trace error", "tracer_name", "Source", "span_name", spanName, "err", err) - } else { - endFn = func() { span.End() } - traceID = span.SpanContext().TraceID().String() - spanID = span.SpanContext().SpanID().String() - traced = true - } - } - if traceID == "" { - logger.Debug("create new traceID", "tracer_name", "Source", "span_name", spanName, "trace_id", traceID) - traceID = id.NewTraceID() - } - if spanID == "" { - logger.Debug("create new spanID", "tracer_name", "Source", "span_name", spanName, "span_id", spanID) - spanID = id.NewSpanID() - } - logger.Debug( - "trace metadata", - "tracer_name", "Source", "span_name", spanName, - "trace_id", traceID, "span_id", spanID, "traced", traced, - ) - md := NewMetadata(sourceID, id.New(), traceID, spanID, traced) - - return md, endFn -} - -// ExtendTraceMetadata extends source metadata with trace information. -func ExtendTraceMetadata( - md metadata.M, - tracerName string, // the tracer name is `StreamFunction` or `Zipper`. - spanName string, // the span name usually is the sfn name. - tp oteltrace.TracerProvider, logger *slog.Logger, -) (metadata.M, func()) { - var ( - traceID, _ = md.Get(metadata.TraceIDKey) - spanID, _ = md.Get(metadata.SpanIDKey) - parentTraced = GetTracedFromMetadata(md) - endFn = func() {} - ) - traced := false - if tp != nil { - var span oteltrace.Span - var err error - // set parent span, if not traced, use empty string - if parentTraced { - span, err = trace.NewSpan(tp, string(tracerName), spanName, traceID, spanID) - } else { - span, err = trace.NewSpan(tp, string(tracerName), spanName, "", "") - } - if err != nil { - logger.Debug("trace error", "tracer_name", tracerName, "span_name", spanName, "err", err) - } else { - endFn = func() { span.End() } - traceID = span.SpanContext().TraceID().String() - spanID = span.SpanContext().SpanID().String() - traced = true - } - } - if traceID == "" { - logger.Debug("create new traceID", "tracer_name", tracerName, "span_name", spanName, "trace_id", traceID) - traceID = id.NewTraceID() - } - if spanID == "" { - logger.Debug("create new spanID", "tracer_name", tracerName, "span_name", spanName, "span_id", spanID) - spanID = id.NewSpanID() - } - logger.Debug( - "trace metadata", - "tracer_name", tracerName, "span_name", spanName, - "trace_id", traceID, "span_id", spanID, "traced", traced, "parent_traced", parentTraced, - ) - - if tracerName == "Zipper" { - traced = traced || parentTraced - } - - // reallocate metadata with new TraceID and SpanID - md.Set(metadata.TraceIDKey, traceID) - md.Set(metadata.SpanIDKey, spanID) - md.Set(metadata.TracedKey, tracedString(traced)) - - return md, endFn -} - -// SfnTraceMetadata extends metadata for StreamFunction. -func SfnTraceMetadata(md metadata.M, sfnName string, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func()) { - return ExtendTraceMetadata(md, "StreamFunction", sfnName, tp, logger) -} - -// ZipperTraceMetadata extends metadata for Zipper. -func ZipperTraceMetadata(md metadata.M, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func()) { - return ExtendTraceMetadata(md, "Zipper", "zipper endpoint", tp, logger) -} - -func tracedString(traced bool) string { - if traced { - return "true" - } - return "false" -} diff --git a/core/metadata/metadata.go b/core/metadata/metadata.go index e133083c0..fdd0da83a 100644 --- a/core/metadata/metadata.go +++ b/core/metadata/metadata.go @@ -89,7 +89,6 @@ const ( // the keys for tracing. TraceIDKey = "yomo-trace-id" SpanIDKey = "yomo-span-id" - TracedKey = "yomo-traced" // the keys for target system working. TargetKey = "yomo-target" diff --git a/core/metadata_test.go b/core/metadata_test.go index f3943bfe5..f3532ce8f 100644 --- a/core/metadata_test.go +++ b/core/metadata_test.go @@ -8,7 +8,7 @@ import ( ) func TestMetadata(t *testing.T) { - md := NewMetadata("source", "tid", "traceID", "spanID", true) + md := NewMetadata("source", "tid") SetMetadataTarget(md, "target") v, ok := md.Get(metadata.TargetKey) @@ -16,5 +16,4 @@ func TestMetadata(t *testing.T) { assert.Equal(t, "target", v) assert.Equal(t, "tid", GetTIDFromMetadata(md)) - assert.Equal(t, true, GetTracedFromMetadata(md)) } diff --git a/core/server.go b/core/server.go index 886ecf2cc..29198735e 100644 --- a/core/server.go +++ b/core/server.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "os" - "reflect" "sync" "sync/atomic" @@ -21,7 +20,8 @@ import ( "github.com/yomorun/yomo/pkg/frame-codec/y3codec" yquic "github.com/yomorun/yomo/pkg/listener/quic" pkgtls "github.com/yomorun/yomo/pkg/tls" - oteltrace "go.opentelemetry.io/otel/trace" + "github.com/yomorun/yomo/pkg/trace" + "go.opentelemetry.io/otel/attribute" ) // ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close. @@ -58,7 +58,6 @@ type Server struct { connHandler ConnHandler listener frame.Listener logger *slog.Logger - tracerProvider oteltrace.TracerProvider versionNegotiateFunc VersionNegotiateFunc } @@ -81,7 +80,6 @@ func NewServer(name string, opts ...ServerOption) *Server { router: router.Default(), downstreams: make(map[string]Downstream), logger: logger, - tracerProvider: options.tracerProvider, codec: y3codec.Codec(), packetReadWriter: y3codec.PacketReadWriter(), opts: options, @@ -322,10 +320,15 @@ func (s *Server) routingDataFrame(c *Context) error { // counter +1 atomic.AddInt64(&s.counterOfDataFrame, 1) - md, endFn := ZipperTraceMetadata(c.FrameMetadata, s.TracerProvider(), c.Logger) - defer endFn() - - c.FrameMetadata = md + // add trace + tracer := trace.NewTracer("Zipper") + span := tracer.Start(c.FrameMetadata, "zipper endpoint") + defer tracer.End( + c.FrameMetadata, + span, + attribute.Key("routing_data_tag").Int(int(dataFrame.Tag)), + attribute.Key("routing_data_len").Int(dataLength), + ) mdBytes, err := c.FrameMetadata.Encode() if err != nil { @@ -335,7 +338,7 @@ func (s *Server) routingDataFrame(c *Context) error { dataFrame.Metadata = mdBytes // find stream function ids from the router. - connIDs := s.router.Route(dataFrame.Tag, md) + connIDs := s.router.Route(dataFrame.Tag, c.FrameMetadata) if len(connIDs) == 0 { c.Logger.Info("no observed", "tag", dataFrame.Tag, "data_length", dataLength) } @@ -499,17 +502,6 @@ func (s *Server) authNames() []string { // Name returns the name of server. func (s *Server) Name() string { return s.name } -// TracerProvider returns the tracer provider of server. -func (s *Server) TracerProvider() oteltrace.TracerProvider { - if s.tracerProvider == nil { - return nil - } - if reflect.ValueOf(s.tracerProvider).IsNil() { - return nil - } - return s.tracerProvider -} - func composeFrameHandler(handler FrameHandler, middlewares ...FrameMiddleware) FrameHandler { for i := len(middlewares) - 1; i >= 0; i-- { handler = middlewares[i](handler) diff --git a/core/server_options.go b/core/server_options.go index cd04bf6ca..71031ab3f 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -7,7 +7,6 @@ import ( "github.com/quic-go/quic-go" "github.com/yomorun/yomo/core/auth" "github.com/yomorun/yomo/core/ylog" - oteltrace "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" ) @@ -33,7 +32,6 @@ type serverOptions struct { tlsConfig *tls.Config auths map[string]auth.Authentication logger *slog.Logger - tracerProvider oteltrace.TracerProvider connMiddlewares []ConnMiddleware frameMiddlewares []FrameMiddleware } @@ -84,13 +82,6 @@ func WithServerLogger(logger *slog.Logger) ServerOption { } } -// WithServerTracerProvider sets tracer provider for the server. -func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption { - return func(o *serverOptions) { - o.tracerProvider = tp - } -} - // WithFrameMiddleware sets frame middleware for the client. func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption { return func(o *serverOptions) { diff --git a/example/3-multi-sfn/source/main.go b/example/3-multi-sfn/source/main.go index 896656c1d..b554ef92b 100644 --- a/example/3-multi-sfn/source/main.go +++ b/example/3-multi-sfn/source/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "log" "math/rand" @@ -9,7 +8,6 @@ import ( "time" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" ) type noiseData struct { @@ -19,19 +17,12 @@ type noiseData struct { } func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-source") - if err == nil { - log.Println("[source] 🛰 trace enabled") - } - defer shutdown(context.Background()) // connect to YoMo-Zipper. source := yomo.NewSource( "yomo-source", "localhost:9000", - yomo.WithTracerProvider(tp), ) - err = source.Connect() + err := source.Connect() if err != nil { log.Printf("❌ Emit the data to YoMo-Zipper failure with err: %v", err) return diff --git a/example/3-multi-sfn/stream-fn-1/app.go b/example/3-multi-sfn/stream-fn-1/app.go index 3d8dea22c..e2b3cb40f 100644 --- a/example/3-multi-sfn/stream-fn-1/app.go +++ b/example/3-multi-sfn/stream-fn-1/app.go @@ -10,7 +10,6 @@ import ( "time" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -24,24 +23,17 @@ type noiseData struct { // main will observe data with SeqID=0x10, and tranform to SeqID=0x14 with Noise value // to downstream sfn. func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - log.Println("[fn1] 🛰 trace enabled") - } - defer shutdown(context.Background()) // sfn sfn := yomo.NewStreamFunction( "Noise-1", "localhost:9000", - yomo.WithSfnTracerProvider(tp), ) sfn.SetObserveDataTags(0x10) defer sfn.Close() sfn.SetHandler(handler) - err = sfn.Connect() + err := sfn.Connect() if err != nil { log.Printf("[fn1] connect err=%v", err) os.Exit(1) diff --git a/example/3-multi-sfn/stream-fn-2/app.go b/example/3-multi-sfn/stream-fn-2/app.go index c47f25f5e..18dc909a7 100644 --- a/example/3-multi-sfn/stream-fn-2/app.go +++ b/example/3-multi-sfn/stream-fn-2/app.go @@ -9,7 +9,6 @@ import ( "os" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -31,24 +30,17 @@ var computePeek = func(_ context.Context, value float32) (float32, error) { // main will observe data with SeqID=0x14, and tranform to SeqID=0x15 with Noise value // to downstream sfn. func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - log.Println("[fn2] 🛰 trace enabled") - } - defer shutdown(context.Background()) // sfn sfn := yomo.NewStreamFunction( "Noise-2", "localhost:9000", - yomo.WithSfnTracerProvider(tp), ) sfn.SetObserveDataTags(0x14) defer sfn.Close() sfn.SetHandler(handler) - err = sfn.Connect() + err := sfn.Connect() if err != nil { log.Printf("[fn2] connect err=%v", err) os.Exit(1) diff --git a/example/3-multi-sfn/stream-fn-3/app.go b/example/3-multi-sfn/stream-fn-3/app.go index b5e2f0972..981ed15af 100644 --- a/example/3-multi-sfn/stream-fn-3/app.go +++ b/example/3-multi-sfn/stream-fn-3/app.go @@ -10,7 +10,6 @@ import ( "time" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -43,24 +42,17 @@ var slidingAvg = func(i interface{}) error { var observe = make(chan float32, 1) func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - log.Println("[fn3] 🛰 trace enabled") - } - defer shutdown(context.Background()) // sfn sfn := yomo.NewStreamFunction( "Noise-3", "localhost:9000", - yomo.WithSfnTracerProvider(tp), ) sfn.SetObserveDataTags(0x15) defer sfn.Close() sfn.SetHandler(handler) - err = sfn.Connect() + err := sfn.Connect() if err != nil { log.Printf("[fn3] connect err=%v", err) os.Exit(1) diff --git a/example/3-multi-sfn/stream-fn-4/app.go b/example/3-multi-sfn/stream-fn-4/app.go index 880afe8e7..98fddc4a5 100644 --- a/example/3-multi-sfn/stream-fn-4/app.go +++ b/example/3-multi-sfn/stream-fn-4/app.go @@ -1,34 +1,25 @@ package main import ( - "context" "log" "os" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-sfn") - if err == nil { - log.Println("[fn4] 🛰 trace enabled") - } - defer shutdown(context.Background()) // sfn sfn := yomo.NewStreamFunction( "Noise-4", "localhost:9000", - yomo.WithSfnTracerProvider(tp), ) sfn.SetObserveDataTags(0x10) defer sfn.Close() sfn.SetHandler(handler) - err = sfn.Connect() + err := sfn.Connect() if err != nil { log.Printf("[fn3] connect err=%v", err) os.Exit(1) diff --git a/example/9-cli/source/main.go b/example/9-cli/source/main.go index 404571870..543665141 100644 --- a/example/9-cli/source/main.go +++ b/example/9-cli/source/main.go @@ -1,14 +1,12 @@ package main import ( - "context" "encoding/json" "log" "math/rand" "time" "github.com/yomorun/yomo" - "github.com/yomorun/yomo/pkg/trace" "github.com/yomorun/yomo/serverless" ) @@ -19,18 +17,10 @@ type noiseData struct { } func main() { - // trace - tp, shutdown, err := trace.NewTracerProvider("yomo-source") - if err == nil { - log.Println("[source] 🛰 trace enabled") - } - defer shutdown(context.Background()) - addr := "localhost:9000" source := yomo.NewSource( "source", addr, - yomo.WithTracerProvider(tp), ) if err := source.Connect(); err != nil { log.Fatalln(err) diff --git a/options.go b/options.go index ccc3db5bd..86fb29283 100644 --- a/options.go +++ b/options.go @@ -5,7 +5,6 @@ import ( "github.com/quic-go/quic-go" "github.com/yomorun/yomo/core" - "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" ) @@ -33,9 +32,6 @@ var ( // WithSourceReConnect makes source Connect until success, unless authentication fails. WithSourceReConnect = func() SourceOption { return SourceOption(core.WithReConnect()) } - - // WithTracerProvider sets tracer provider for the Source. - WithTracerProvider = func(tp trace.TracerProvider) SourceOption { return SourceOption(core.WithTracerProvider(tp)) } ) // Sfn Options. @@ -55,9 +51,6 @@ var ( // WithSfnReConnect makes sfn Connect until success, unless authentication fails. WithSfnReConnect = func() SfnOption { return SfnOption(core.WithReConnect()) } - // WithSfnTracerProvider sets tracer provider for the Sfn. - WithSfnTracerProvider = func(tp trace.TracerProvider) SfnOption { return SfnOption(core.WithTracerProvider(tp)) } - // WithSfnAIFunctionDefinition sets AI function definition for the Sfn. WithSfnAIFunctionDefinition = func(description string, inputModel any) SfnOption { return SfnOption(core.WithAIFunctionDefinition(description, inputModel)) @@ -111,13 +104,6 @@ var ( } } - // WithZipperTracerProvider sets tracer provider for the zipper. - WithZipperTracerProvider = func(tp trace.TracerProvider) ZipperOption { - return func(o *zipperOptions) { - o.serverOption = append(o.serverOption, core.WithServerTracerProvider(tp)) - } - } - // WithConnMiddleware sets conn middleware for the zipper. WithZipperConnMiddleware = func(mw ...core.ConnMiddleware) ZipperOption { return func(o *zipperOptions) { diff --git a/pkg/id/id.go b/pkg/id/id.go index 99abc0178..380fbd997 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -2,8 +2,6 @@ package id import ( - "crypto/rand" - "encoding/hex" "strconv" "time" @@ -18,21 +16,3 @@ func New(l ...int) string { } return tid } - -// NewTraceID returns a trace id. -func NewTraceID() string { - bytes := make([]byte, 16) - if _, err := rand.Read(bytes); err != nil { - return "" - } - return hex.EncodeToString(bytes) -} - -// NewSpanID returns a span id. -func NewSpanID() string { - bytes := make([]byte, 8) - if _, err := rand.Read(bytes); err != nil { - return "" - } - return hex.EncodeToString(bytes) -} diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index 56a5116d2..9727713bd 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -10,17 +10,6 @@ func TestNew(t *testing.T) { t.Run("new a random id", func(t *testing.T) { str := New() assert.IsType(t, "", str) - }) - - t.Run("new trace id", func(t *testing.T) { - traceID := NewTraceID() - assert.IsType(t, "", traceID) - assert.Equal(t, 32, len(traceID)) - }) - - t.Run("new span id", func(t *testing.T) { - spanID := NewSpanID() - assert.IsType(t, "", spanID) - assert.Equal(t, 16, len(spanID)) + assert.Equal(t, 21, len(str)) }) } diff --git a/pkg/trace/trace.go b/pkg/trace/trace.go index cf2445475..cb8d94187 100644 --- a/pkg/trace/trace.go +++ b/pkg/trace/trace.go @@ -1,13 +1,11 @@ -// Package trace provides open tracing. package trace import ( "context" - "errors" - "log" "os" - "time" + "github.com/yomorun/yomo/core/metadata" + "github.com/yomorun/yomo/core/ylog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -17,110 +15,128 @@ import ( tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" ) -// tracerProvider returns an OpenTelemetry TracerProvider configured to use -// the Jaeger exporter that will send spans to the provided url. The returned +func init() { + SetTracerProvider("yomo") +} + +// SetTracerProvider sets an OpenTelemetry TracerProvider configured to use +// the Jaeger exporter that will send spans to the provided url. The global // TracerProvider will also use a Resource configured with all the information // about the application. -func tracerProvider(service string, exp tracesdk.SpanExporter) *tracesdk.TracerProvider { +func SetTracerProvider(service string) { + endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if endpoint == "" { + otel.SetTracerProvider(noop.NewTracerProvider()) + return + } + ylog.Info("enable tracing", "endpoint", endpoint) + + client := otlptracehttp.NewClient() + exp, err := otlptrace.New(context.Background(), client) + if err != nil { + panic("failed to create trace exporter: " + err.Error()) + } tp := tracesdk.NewTracerProvider( - // Always be sure to batch in production. tracesdk.WithBatcher(exp), tracesdk.WithSampler(tracesdk.AlwaysSample()), - // tracesdk.WithSyncer(exp), - // Record information about this application in an Resource. tracesdk.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(service), - // attribute.String("environment", environment), - // attribute.Int64("ID", id), )), ) - return tp + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) } -// NewTracerProvider creates a new tracer provider used by OTLP. -func NewTracerProvider(service string) (*tracesdk.TracerProvider, func(ctx context.Context), error) { - // tracer provider - endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") - if endpoint == "" { - return nil, func(context.Context) {}, errors.New("tracing disabled") - } - // Create the OTLP exporter - client := otlptracehttp.NewClient() - exp, err := otlptrace.New(context.Background(), client) - if err != nil { - return nil, func(context.Context) {}, err - } - // tracer provider - tp := tracerProvider(service, exp) - // shutdown - shutdown := func(ctx context.Context) { - // Do not make the application hang when it is shutdown. - ctx, cancel := context.WithTimeout(ctx, time.Second*5) +// ShutdownTracerProvider shutdown the global TracerProvider. +func ShutdownTracerProvider() { + tp := otel.GetTracerProvider() + switch i := tp.(type) { + case *tracesdk.TracerProvider: + ctx, cancel := context.WithTimeout(context.Background(), 5) defer cancel() - if err := tp.Shutdown(ctx); err != nil { - log.Printf("[trace] shutdown err: %v\n", err) - } + i.Shutdown(ctx) + case *noop.TracerProvider: + return } - // Register our TracerProvider as the global so any imported - // instrumentation in the future will default to using it. - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.TraceContext{}) - // otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) +} + +// Tracer is otel span tracer. +type Tracer struct { + tracer trace.Tracer + tracerName string + tracerProvider trace.TracerProvider +} + +// NewTracer create tracer instance. +func NewTracer(name string) *Tracer { + tp := otel.GetTracerProvider() + + return &Tracer{ + tracer: tp.Tracer(name), + tracerName: name, + tracerProvider: tp, + } +} - return tp, shutdown, nil +// Start start tracing span. +func (t *Tracer) Start(md metadata.M, operation string) trace.Span { + _, span := t.tracer.Start(NewContextWithMetadata(md), + operation, + ) + propagateTrace(md, span) + return span } -// NewSpan creates a new span of OpenTelemetry. -func NewSpan(tp trace.TracerProvider, tracerName string, spanName string, traceID string, spanID string) (trace.Span, error) { - return NewSpanWithAttrs(tp, tracerName, spanName, traceID, spanID, false) +// yomo uses metadata to propagate the trace info. +func propagateTrace(md metadata.M, span trace.Span) { + if span.SpanContext().TraceID().IsValid() { + md.Set(metadata.TraceIDKey, span.SpanContext().TraceID().String()) + } + + if span.SpanContext().SpanID().IsValid() { + md.Set(metadata.SpanIDKey, span.SpanContext().SpanID().String()) + } } -// NewRemoteSpan creates a new span of OpenTelemetry from remote parent tracing. -func NewRemoteSpan(tp trace.TracerProvider, tracerName string, spanName string, traceID string, spanID string) (trace.Span, error) { - return NewSpanWithAttrs(tp, tracerName, spanName, traceID, spanID, true) +// End finish tracing span. +func (t *Tracer) End(md metadata.M, span trace.Span, kv ...attribute.KeyValue) { + for _, v := range kv { + span.SetAttributes(v) + } + span.End() } -// NewSpanWithAttrs creates a new span of OpenTelemetry with attributes. -func NewSpanWithAttrs(tp trace.TracerProvider, tracerName string, spanName string, traceID string, spanID string, remote bool, attrs ...map[string]string) (trace.Span, error) { - if tp == nil { - return nil, errors.New("tracer provider is nil") +// NewContextWithMetadata create new context with metadata for tracer starting. +// In yomo, we use metadata from dataFrame as the trace Propagator. And yomo only +// carries traceID and spanID in metadata. +func NewContextWithMetadata(md metadata.M) context.Context { + traceID, ok := md.Get(metadata.TraceIDKey) + if !ok { + return context.Background() } - ctx := context.Background() - // root span - if traceID == "" && spanID == "" { - tr := tp.Tracer(tracerName) - _, span := tr.Start(ctx, spanName) - if len(attrs) > 0 { - for k, v := range attrs[0] { - span.SetAttributes(attribute.Key(k).String(v)) - } - } - return span, nil + spanID, ok := md.Get(metadata.SpanIDKey) + if !ok { + return context.Background() } - // child span + tid, err := trace.TraceIDFromHex(traceID) if err != nil { - return nil, err + return context.Background() } sid, err := trace.SpanIDFromHex(spanID) if err != nil { - return nil, err + return context.Background() } + scc := trace.SpanContextConfig{ TraceID: tid, SpanID: sid, - Remote: remote, } - ctx = trace.ContextWithSpanContext(ctx, trace.NewSpanContext(scc)) - tr := tp.Tracer(tracerName) - _, span := tr.Start(ctx, spanName) - if len(attrs) > 0 { - for k, v := range attrs[0] { - span.SetAttributes(attribute.Key(k).String(v)) - } - } - return span, nil + spanContext := trace.NewSpanContext(scc) + + return trace.ContextWithSpanContext(context.Background(), spanContext) } diff --git a/pkg/trace/trace_test.go b/pkg/trace/trace_test.go new file mode 100644 index 000000000..240fdb51c --- /dev/null +++ b/pkg/trace/trace_test.go @@ -0,0 +1,53 @@ +package trace + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yomorun/yomo/core/metadata" +) + +func TestTraceProvider(t *testing.T) { + os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:43118") + SetTracerProvider("yomo-test") + + tracer := NewTracer("Source") + + md := metadata.New() + + span := tracer.Start(md, "source") + time.Sleep(time.Millisecond * 100) + tracer.End(md, span) + tid1 := assertMd(t, md) + + tracer = NewTracer("Zipper") + + span = tracer.Start(md, "zipper-endpoint") + time.Sleep(time.Millisecond * 150) + tracer.End(md, span) + tid2 := assertMd(t, md) + + tracer = NewTracer("StreamFunction") + + span = tracer.Start(md, "sink") + time.Sleep(time.Millisecond * 200) + tracer.End(md, span) + tid3 := assertMd(t, md) + + assert.True(t, tid1 == tid2 && tid2 == tid3 && tid3 == tid1) + ShutdownTracerProvider() +} + +func assertMd(t *testing.T, md metadata.M) string { + traceID, ok1 := md.Get(metadata.TraceIDKey) + assert.True(t, ok1) + assert.Equal(t, 32, len(traceID)) + + spanID, ok2 := md.Get(metadata.SpanIDKey) + assert.True(t, ok2) + assert.Equal(t, 16, len(spanID)) + + return traceID +} diff --git a/sfn.go b/sfn.go index 7e4aa3cc5..0adce9629 100644 --- a/sfn.go +++ b/sfn.go @@ -11,7 +11,8 @@ import ( "github.com/yomorun/yomo/core/metadata" "github.com/yomorun/yomo/core/serverless" "github.com/yomorun/yomo/pkg/id" - oteltrace "go.opentelemetry.io/otel/trace" + "github.com/yomorun/yomo/pkg/trace" + "go.opentelemetry.io/otel/attribute" ) // StreamFunction defines serverless streaming functions. @@ -125,14 +126,11 @@ func (s *streamFunction) Connect() error { if hasCron { s.cron = cron.New() s.cron.AddFunc(s.cronSpec, func() { - md, deferFunc := core.InitialSfnMetadata( - s.client.ClientID(), - id.New(), - s.name, - s.client.TracerProvider(), - s.client.Logger, - ) - defer deferFunc() + md := core.NewMetadata(s.client.ClientID(), id.New()) + // add trace + tracer := trace.NewTracer("StreamFunction") + span := tracer.Start(md, s.name) + defer tracer.End(md, span, attribute.String("sfn_handler_type", "corn_handler")) cronCtx := serverless.NewCronContext(s.client, md) s.cronFn(cronCtx) @@ -172,15 +170,23 @@ func (s *streamFunction) Connect() error { break } - newMd, endFn := core.SfnTraceMetadata(md, s.client.Name(), s.client.TracerProvider(), s.client.Logger) - defer endFn() - - newMetadata, err := newMd.Encode() + // add trace + tracer := trace.NewTracer("StreamFunction") + span := tracer.Start(md, s.name) + defer tracer.End( + md, + span, + attribute.String("sfn_handler_type", "pipe_handler"), + attribute.Int("recv_data_tag", int(data.Tag)), + attribute.Int("recv_data_len", len(data.Payload)), + ) + + rawMd, err := md.Encode() if err != nil { s.client.Logger.Error("sfn encode metadata error", "err", err) break } - data.Metadata = newMetadata + data.Metadata = rawMd frame := &frame.DataFrame{ Tag: data.Tag, @@ -219,6 +225,8 @@ func (s *streamFunction) Close() error { } } + trace.ShutdownTracerProvider() + return nil } @@ -231,20 +239,27 @@ func (s *streamFunction) Wait() { // func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) { func (s *streamFunction) onDataFrame(dataFrame *frame.DataFrame) { if s.fn != nil { - tp := s.client.TracerProvider() - go func(tp oteltrace.TracerProvider, dataFrame *frame.DataFrame) { + go func(dataFrame *frame.DataFrame) { md, err := metadata.Decode(dataFrame.Metadata) if err != nil { s.client.Logger.Error("sfn decode metadata error", "err", err) return } - newMd, endFn := core.SfnTraceMetadata(md, s.client.Name(), s.client.TracerProvider(), s.client.Logger) - defer endFn() + // add trace + tracer := trace.NewTracer("StreamFunction") + span := tracer.Start(md, s.name) + defer tracer.End( + md, + span, + attribute.String("sfn_handler_type", "async_handler"), + attribute.Int("recv_data_tag", int(dataFrame.Tag)), + attribute.Int("recv_data_len", len(dataFrame.Payload)), + ) - serverlessCtx := serverless.NewContext(s.client, dataFrame.Tag, newMd, dataFrame.Payload) + serverlessCtx := serverless.NewContext(s.client, dataFrame.Tag, md, dataFrame.Payload) s.fn(serverlessCtx) - }(tp, dataFrame) + }(dataFrame) } else if s.pfn != nil { data := dataFrame.Payload s.client.Logger.Debug("pipe sfn receive", "data_len", len(data), "data", data) diff --git a/source.go b/source.go index 3cc29ba90..6500253c8 100644 --- a/source.go +++ b/source.go @@ -6,6 +6,8 @@ import ( "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/pkg/id" + "github.com/yomorun/yomo/pkg/trace" + "go.opentelemetry.io/otel/attribute" ) // Source is responsible for sending data to yomo. @@ -60,6 +62,7 @@ func (s *yomoSource) Close() error { s.client.Logger.Error("failed to close the source", "err", err) return err } + trace.ShutdownTracerProvider() s.client.Logger.Debug("the source is closed") return nil } @@ -71,8 +74,16 @@ func (s *yomoSource) Connect() error { // Write writes data with specified tag. func (s *yomoSource) Write(tag uint32, data []byte) error { - md, deferFunc := core.InitialSourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger) - defer deferFunc() + md := core.NewMetadata(s.client.ClientID(), id.New()) + // add trace + tracer := trace.NewTracer("Source") + span := tracer.Start(md, s.name) + defer tracer.End( + md, + span, + attribute.Int("send_data_tag", int(tag)), + attribute.Int("send_data_len", len(data)), + ) mdBytes, err := md.Encode() // metadata @@ -93,8 +104,17 @@ func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) err if data == nil { return nil } - md, deferFunc := core.InitialSourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger) - defer deferFunc() + md := core.NewMetadata(s.client.ClientID(), id.New()) + // add trace + tracer := trace.NewTracer("Source") + span := tracer.Start(md, s.name) + defer tracer.End( + md, + span, + attribute.Int("send_data_tag", int(tag)), + attribute.String("send_data_target", target), + attribute.Int("send_data_len", len(data)), + ) if target != "" { core.SetMetadataTarget(md, target) diff --git a/zipper_notwindows.go b/zipper_notwindows.go index c5e44945d..61a33c020 100644 --- a/zipper_notwindows.go +++ b/zipper_notwindows.go @@ -11,6 +11,7 @@ import ( "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/ylog" + "github.com/yomorun/yomo/pkg/trace" ) // initialize when zipper running as server. support inspection: @@ -27,6 +28,7 @@ func waitSignalForShutdownServer(server *core.Server) { ylog.Debug("graceful shutting down ...", "sign", p1) // waiting for the server to finish processing the current request server.Close() + trace.ShutdownTracerProvider() os.Exit(0) } else if p1 == syscall.SIGUSR2 { var m runtime.MemStats diff --git a/zipper_windows.go b/zipper_windows.go index 6a0a5f90b..8a646720f 100644 --- a/zipper_windows.go +++ b/zipper_windows.go @@ -10,6 +10,7 @@ import ( "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/ylog" + "github.com/yomorun/yomo/pkg/trace" ) // initialize when zipper running as server. support inspection: @@ -22,6 +23,7 @@ func waitSignalForShutdownServer(server *core.Server) { ylog.Debug("Received signal", "signal", p1) if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { server.Close() + trace.ShutdownTracerProvider() ylog.Debug("graceful shutting down ...", "sign", p1) os.Exit(0) }