From 5e32151e9ba22c2f1d4df7a1716413347b306ccd Mon Sep 17 00:00:00 2001 From: laminar Date: Wed, 23 Mar 2022 19:33:02 +0800 Subject: [PATCH] Set the exit span before sending the payload to the target. (#45) Signed-off-by: laminar --- context/context.go | 48 ++++++++++++++++++- .../test/binding-event/expected.data.yml | 2 +- .../test/binding-event/provider/provider.go | 25 ++-------- .../test/topic-event/expected.data.yml | 2 +- .../test/topic-event/publish/publish.go | 25 ++-------- 5 files changed, 54 insertions(+), 48 deletions(-) diff --git a/context/context.go b/context/context.go index e1ff846..805a8e4 100644 --- a/context/context.go +++ b/context/context.go @@ -12,11 +12,12 @@ import ( "sync" "time" + "github.com/SkyAPM/go2sky" cloudevents "github.com/cloudevents/sdk-go/v2" + dapr "github.com/dapr/go-sdk/client" "github.com/dapr/go-sdk/service/common" "k8s.io/klog/v2" - - dapr "github.com/dapr/go-sdk/client" + agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) var ( @@ -54,6 +55,7 @@ const ( SelfHostMode = "self-host" TestModeOn = "on" innerEventTypePrefix = "io.openfunction.function" + tracingProviderSkywalking = "skywalking" ) type Runtime string @@ -154,6 +156,9 @@ type RuntimeContext interface { // GetPluginsTracingCfg returns the TracingConfig interface. GetPluginsTracingCfg() TracingConfig + + // HasPluginsTracingCfg returns nil if there is no TracingConfig. + HasPluginsTracingCfg() bool } type Context interface { @@ -361,6 +366,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error) ie := NewInnerEvent(ctx) ie.MergeMetadata(ctx.GetInnerEvent()) ie.SetUserData(data) + + // Set the exit span for tracing + if err := setExitSpan(ctx, ie, outputName); err != nil { + klog.Warningf("failed to set exit span: %v", err) + } + payload = ie.GetCloudEventJSON() } @@ -573,6 +584,10 @@ func (ctx *FunctionContext) GetPluginsTracingCfg() TracingConfig { return ctx.PluginsTracing } +func (ctx *FunctionContext) HasPluginsTracingCfg() bool { + return ctx.PluginsTracing != nil +} + func (ctx *FunctionContext) WithOut(out *FunctionOut) RuntimeContext { ctx.mu.Lock() defer ctx.mu.Unlock() @@ -824,6 +839,35 @@ func getBuildingBlockType(componentType string) (ResourceType, error) { return "", errors.New("invalid component type") } +func setExitSpan(ctx *FunctionContext, innerEvent InnerEvent, target string) error { + if !ctx.HasPluginsTracingCfg() || !ctx.GetPluginsTracingCfg().IsEnabled() { + return nil + } + + switch ctx.GetPluginsTracingCfg().ProviderName() { + case tracingProviderSkywalking: + tracer := go2sky.GetGlobalTracer() + if tracer == nil { + return errors.New("skywalking is not enabled") + } + + span, err := tracer.CreateExitSpan(ctx.GetNativeContext(), ctx.GetName(), target, func(headerKey, headerValue string) error { + innerEvent.SetMetadata(headerKey, headerValue) + return nil + }) + if err != nil { + return err + } + defer span.End() + + span.SetSpanLayer(agentv3.SpanLayer_FAAS) + span.SetComponent(5013) + return nil + default: + return nil + } +} + func ConvertUserDataToBytes(data interface{}) []byte { if d, ok := data.([]byte); ok { return d diff --git a/plugin/skywalking/test/binding-event/expected.data.yml b/plugin/skywalking/test/binding-event/expected.data.yml index 560b62f..c7a73b2 100644 --- a/plugin/skywalking/test/binding-event/expected.data.yml +++ b/plugin/skywalking/test/binding-event/expected.data.yml @@ -7,7 +7,7 @@ segmentItems: - segmentId: {{ notEmpty .segmentId }} spans: {{- contains .spans }} - - operationName: sample-topic + - operationName: provider parentSpanId: 0 spanId: 1 spanLayer: FAAS diff --git a/plugin/skywalking/test/binding-event/provider/provider.go b/plugin/skywalking/test/binding-event/provider/provider.go index 7576c89..896f7cf 100644 --- a/plugin/skywalking/test/binding-event/provider/provider.go +++ b/plugin/skywalking/test/binding-event/provider/provider.go @@ -4,36 +4,17 @@ import ( "context" "time" + "k8s.io/klog/v2" + ofctx "github.com/OpenFunction/functions-framework-go/context" "github.com/OpenFunction/functions-framework-go/framework" "github.com/OpenFunction/functions-framework-go/plugin" "github.com/OpenFunction/functions-framework-go/plugin/skywalking" - "github.com/SkyAPM/go2sky" - "k8s.io/klog/v2" - agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) func bindingsFunction(ofCtx ofctx.Context, in []byte) (ofctx.Out, error) { - tracer := go2sky.GetGlobalTracer() - if tracer == nil { - klog.Warning("go2sky is not enabled") - return ofCtx.ReturnOnInternalError().WithData([]byte("go2sky is not enabled")), nil - } - - span, err := tracer.CreateExitSpan(ofCtx.GetNativeContext(), "sample-topic", "sample-topic", func(headerKey, headerValue string) error { - ofCtx.GetInnerEvent().SetMetadata(headerKey, headerValue) - return nil - }) - if err != nil { - klog.Error(err) - return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err - } - defer span.End() - - span.SetSpanLayer(agentv3.SpanLayer_FAAS) - span.SetComponent(5013) - _, err = ofCtx.Send("sample-topic", []byte(time.Now().String())) + _, err := ofCtx.Send("sample-topic", []byte(time.Now().String())) if err != nil { klog.Error(err) return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err diff --git a/plugin/skywalking/test/topic-event/expected.data.yml b/plugin/skywalking/test/topic-event/expected.data.yml index 0fee412..155135d 100644 --- a/plugin/skywalking/test/topic-event/expected.data.yml +++ b/plugin/skywalking/test/topic-event/expected.data.yml @@ -7,7 +7,7 @@ segmentItems: - segmentId: {{ notEmpty .segmentId }} spans: {{- contains .spans }} - - operationName: publish-topic + - operationName: publish parentSpanId: 0 spanId: 1 spanLayer: FAAS diff --git a/plugin/skywalking/test/topic-event/publish/publish.go b/plugin/skywalking/test/topic-event/publish/publish.go index dd8ade6..000e04c 100644 --- a/plugin/skywalking/test/topic-event/publish/publish.go +++ b/plugin/skywalking/test/topic-event/publish/publish.go @@ -4,37 +4,18 @@ import ( "context" "time" + "k8s.io/klog/v2" + ofctx "github.com/OpenFunction/functions-framework-go/context" "github.com/OpenFunction/functions-framework-go/framework" "github.com/OpenFunction/functions-framework-go/plugin" "github.com/OpenFunction/functions-framework-go/plugin/skywalking" - "github.com/SkyAPM/go2sky" - "k8s.io/klog/v2" - agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) func pubsubFunction(ofCtx ofctx.Context, in []byte) (ofctx.Out, error) { - tracer := go2sky.GetGlobalTracer() - if tracer == nil { - klog.Warningf("go2sky is not enabled") - return ofCtx.ReturnOnInternalError().WithData([]byte("go2sky is not enabled")), nil - } - - span, err := tracer.CreateExitSpan(ofCtx.GetNativeContext(), "publish-topic", "publish-topic", func(headerKey, headerValue string) error { - ofCtx.GetInnerEvent().SetMetadata(headerKey, headerValue) - return nil - }) - if err != nil { - klog.Error(err) - return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err - } - defer span.End() - - span.SetSpanLayer(agentv3.SpanLayer_FAAS) - span.SetComponent(5013) // topic - _, err = ofCtx.Send("publish-topic", []byte(time.Now().String())) + _, err := ofCtx.Send("publish-topic", []byte(time.Now().String())) if err != nil { klog.Error(err)