Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin-specific observer support #157

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package zipkintracer

import (
"time"

"github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
)

// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect
// options used during zipkin.Span creation
type ZipkinStartSpanOptions struct {
// Parent span context reference, if any
Parent *model.SpanContext

// Span's start time
StartTime time.Time

// Kind clarifies context of timestamp, duration and remoteEndpoint in a span.
Kind model.Kind

// Tags used during span creation
Tags map[string]string

// RemoteEndpoint used during span creation
RemoteEndpoint *model.Endpoint
}

// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans
type ZipkinObserver interface {
// OnStartSpan is called when new Span is created. Creates and returns span observer.
// If the observer is not interested in the given span, it must return nil.
OnStartSpan(sp zipkin.Span, operationName string, options *ZipkinStartSpanOptions) ZipkinSpanObserver
}

// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about
// other Span events.
type ZipkinSpanObserver interface {
// Callback called from zipkin.Span.SetName()
OnSetName(operationName string)

// Callback called from zipkin.Span.SetTag()
OnSetTag(key, value string)

// Callback called from zipkin.Span.SetRemoteEndpoint()
OnSetRemoteEndpoint(remote *model.Endpoint)

// Callback called from zipkin.Span.Annotate()
OnAnnotate(t time.Time, annotation string)

// Callback called from zipkin.Span.Finish()
OnFinish()

// Callback called from zipkin.Span.FinishedWithDuration()
OnFinishedWithDuration(dur time.Duration)
}
111 changes: 85 additions & 26 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package zipkintracer

import (
"fmt"
"net"
"time"

otobserver "github.com/opentracing-contrib/go-observer"
Expand All @@ -25,23 +26,23 @@ import (
"github.com/openzipkin/zipkin-go"
)

// FinisherWithDuration allows to finish span with given duration
type FinisherWithDuration interface {
FinishedWithDuration(d time.Duration)
}

type spanImpl struct {
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
tracer *tracerImpl
zipkinSpan zipkin.Span
observer otobserver.SpanObserver
zipkinObserver ZipkinSpanObserver
options ZipkinStartSpanOptions
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetName(operationName)
}

s.zipkinSpan.SetName(operationName)
return s
}
Expand All @@ -51,23 +52,52 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
s.observer.OnSetTag(key, value)
}

if key == string(ext.SamplingPriority) {
endpointChanged := false

switch key {
case string(ext.SamplingPriority):
// there are no means for now to change the sampling decision
// but when finishedSpanHandler is in place we could change this.
return s
case string(ext.SpanKind):
// this tag is translated into kind which can
// only be set on span creation
return s
case string(ext.PeerService):
serviceName, _ := value.(string)
s.options.RemoteEndpoint.ServiceName = serviceName
endpointChanged = true
case string(ext.PeerHostIPv4):
ipv4, _ := value.(string)
s.options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4)
endpointChanged = true
case string(ext.PeerHostIPv6):
ipv6, _ := value.(string)
s.options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6)
endpointChanged = true
case string(ext.PeerPort):
port, _ := value.(uint16)
s.options.RemoteEndpoint.Port = port
endpointChanged = true
}

if key == string(ext.SpanKind) ||
key == string(ext.PeerService) ||
key == string(ext.PeerHostIPv4) ||
key == string(ext.PeerHostIPv6) ||
key == string(ext.PeerPort) {
// this tags are translated into kind and remoteEndpoint which can
// only be set on span creation
if endpointChanged {
s.zipkinSpan.SetRemoteEndpoint(s.options.RemoteEndpoint)

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetRemoteEndpoint(s.options.RemoteEndpoint)
}

return s
}

s.zipkinSpan.Tag(key, fmt.Sprint(value))
strValue := fmt.Sprint(value)

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetTag(key, strValue)
}

s.zipkinSpan.Tag(key, strValue)
return s
}

Expand All @@ -78,7 +108,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
}

for _, field := range fields {
s.zipkinSpan.Annotate(time.Now(), field.String())
t := time.Now()
fieldValue := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, fieldValue)
}

s.zipkinSpan.Annotate(t, fieldValue)
}
}

Expand All @@ -88,7 +125,13 @@ func (s *spanImpl) LogFields(fields ...log.Field) {

func (s *spanImpl) logFields(t time.Time, fields ...log.Field) {
for _, field := range fields {
s.zipkinSpan.Annotate(t, field.String())
annotation := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, annotation)
}

s.zipkinSpan.Annotate(t, annotation)
}
}

Expand All @@ -110,14 +153,24 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload))
annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload)

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation)
}

s.zipkinSpan.Annotate(ld.Timestamp, annotation)
}

func (s *spanImpl) Finish() {
if s.observer != nil {
s.observer.OnFinish(opentracing.FinishOptions{})
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

Expand All @@ -131,15 +184,21 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
}

if !opts.FinishTime.IsZero() {
f, ok := s.zipkinSpan.(FinisherWithDuration)
if !ok {
return
dur := opts.FinishTime.Sub(s.options.StartTime)

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinishedWithDuration(dur)
}
f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime))

s.zipkinSpan.FinishedWithDuration(dur)
return
}

s.Finish()
if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

func (s *spanImpl) Tracer() opentracing.Tracer {
Expand Down
51 changes: 28 additions & 23 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,43 +58,47 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp

zopts := make([]zipkin.SpanOption, 0)

sp := &spanImpl{
tracer: t,
}

// Parent
if len(startSpanOptions.References) > 0 {
parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext)
if ok {
zopts = append(zopts, zipkin.Parent(model.SpanContext(parent)))
sp.options.Parent = (*model.SpanContext)(&parent)
}
}

startTime := time.Now()
// Time
sp.options.StartTime = time.Now()
if !startSpanOptions.StartTime.IsZero() {
zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime))
startTime = startSpanOptions.StartTime
sp.options.StartTime = startSpanOptions.StartTime
zopts = append(zopts, zipkin.StartTime(sp.options.StartTime))
}

zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...)
zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &sp.options)...)

newSpan := t.zipkinTracer.StartSpan(operationName, zopts...)
sp.zipkinSpan = t.zipkinTracer.StartSpan(operationName, zopts...)

sp := &spanImpl{
zipkinSpan: newSpan,
tracer: t,
startTime: startTime,
}
if t.opts.observer != nil {
observer, _ := t.opts.observer.OnStartSpan(sp, operationName, startSpanOptions)
sp.observer = observer
}

if t.opts.zipkinObserver != nil {
sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, &sp.options)
}

return sp
}

func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption {
zopts := make([]zipkin.SpanOption, 0)

tags := map[string]string{}
remoteEndpoint := &model.Endpoint{}
options.Tags = map[string]string{}
options.RemoteEndpoint = &model.Endpoint{}

var kind string
if val, ok := t[string(ext.SpanKind)]; ok {
Expand All @@ -112,29 +116,30 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
mKind == model.Producer ||
mKind == model.Consumer {
zopts = append(zopts, zipkin.Kind(mKind))
options.Kind = mKind
} else {
tags["span.kind"] = kind
options.Tags["span.kind"] = kind
}
}

if val, ok := t[string(ext.PeerService)]; ok {
serviceName, _ := val.(string)
remoteEndpoint.ServiceName = serviceName
options.RemoteEndpoint.ServiceName = serviceName
}

if val, ok := t[string(ext.PeerHostIPv4)]; ok {
ipv4, _ := val.(string)
remoteEndpoint.IPv4 = net.ParseIP(ipv4)
options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4)
}

if val, ok := t[string(ext.PeerHostIPv6)]; ok {
ipv6, _ := val.(string)
remoteEndpoint.IPv6 = net.ParseIP(ipv6)
options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6)
}

if val, ok := t[string(ext.PeerPort)]; ok {
port, _ := val.(uint16)
remoteEndpoint.Port = port
options.RemoteEndpoint.Port = port
}

for key, val := range t {
Expand All @@ -146,15 +151,15 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
continue
}

tags[key] = fmt.Sprint(val)
options.Tags[key] = fmt.Sprint(val)
}

if len(tags) > 0 {
zopts = append(zopts, zipkin.Tags(tags))
if len(options.Tags) > 0 {
zopts = append(zopts, zipkin.Tags(options.Tags))
}

if !remoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint))
if !options.RemoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(options.RemoteEndpoint))
}

return zopts
Expand Down
12 changes: 10 additions & 2 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (

// TracerOptions allows creating a customized Tracer.
type TracerOptions struct {
observer otobserver.Observer
b3InjectOpt B3InjectOption
observer otobserver.Observer
b3InjectOpt B3InjectOption
zipkinObserver ZipkinObserver
}

// TracerOption allows for functional options.
Expand All @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption {
}
}

// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver
func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption {
return func(opts *TracerOptions) {
opts.zipkinObserver = zipkinObserver
}
}

// WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier
func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption {
return func(opts *TracerOptions) {
Expand Down
Loading