Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Jaeger grpc exporter (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
annanay25 authored and songy23 committed Mar 14, 2019
1 parent 392ee4a commit b597ff0
Show file tree
Hide file tree
Showing 13 changed files with 1,112 additions and 21 deletions.
19 changes: 19 additions & 0 deletions cmd/occollector/app/builder/processor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ThriftTChannelSenderType SenderType = "jaeger-thrift-tchannel"
// ThriftHTTPSenderType represents a thrift-format http-transport sender
ThriftHTTPSenderType = "jaeger-thrift-http"
// ProtoGRPCSenderType represents a proto-format grpc-transport sender
ProtoGRPCSenderType = "jaeger-proto-grpc"
// InvalidSenderType represents an invalid sender
InvalidSenderType = "invalid"
)
Expand Down Expand Up @@ -67,6 +69,16 @@ func NewJaegerThriftHTTPSenderCfg() *JaegerThriftHTTPSenderCfg {
return opts
}

// JaegerProtoGRPCSenderCfg holds configuration for Jaeger Proto GRPC sender
type JaegerProtoGRPCSenderCfg struct {
CollectorEndpoint string `mapstructure:"collector-endpoint"`
}

// NewJaegerProtoGRPCSenderCfg returns an instance of JaegerProtoGRPCSenderCfg with default values
func NewJaegerProtoGRPCSenderCfg() *JaegerProtoGRPCSenderCfg {
return &JaegerProtoGRPCSenderCfg{}
}

// BatchingConfig contains configuration around the queueing batching.
// It contains some advanced configurations, which should not be used
// by a typical user, but are provided as advanced features to increase
Expand Down Expand Up @@ -155,6 +167,13 @@ func (qOpts *QueuedSpanProcessorCfg) InitFromViper(v *viper.Viper) *QueuedSpanPr
vthsOpts.Unmarshal(thsOpts)
}
qOpts.SenderConfig = thsOpts
case ProtoGRPCSenderType:
pgopts := NewJaegerProtoGRPCSenderCfg()
vpgopts := v.Sub(string(ProtoGRPCSenderType))
if vpgopts != nil {
vpgopts.Unmarshal(pgopts)
}
qOpts.SenderConfig = pgopts
}
qOpts.RawConfig = v
return qOpts
Expand Down
11 changes: 9 additions & 2 deletions cmd/occollector/app/collector/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

tchReporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
Expand Down Expand Up @@ -74,7 +73,7 @@ func buildQueuedSpanProcessor(
DiscoveryMinPeers: thriftTChannelSenderOpts.DiscoveryMinPeers,
ConnCheckTimeout: thriftTChannelSenderOpts.DiscoveryConnCheckTimeout,
}
tchreporter, err := tchrepbuilder.CreateReporter(metrics.NullFactory, logger)
tchreporter, err := tchrepbuilder.CreateReporter(logger)
if err != nil {
logger.Fatal("Cannot create tchannel reporter.", zap.Error(err))
return nil, nil, err
Expand All @@ -90,6 +89,14 @@ func buildQueuedSpanProcessor(
logger,
sender.HTTPTimeout(thriftHTTPSenderOpts.Timeout),
)
case builder.ProtoGRPCSenderType:
protoGRPCSenderOpts := opts.SenderConfig.(*builder.JaegerProtoGRPCSenderCfg)
logger.Info("Initializing proto-GRPC sender",
zap.String("url", protoGRPCSenderOpts.CollectorEndpoint))
spanSender = sender.NewJaegerProtoGRPCSender(
protoGRPCSenderOpts.CollectorEndpoint,
logger,
)
}
doneFns, traceExporters, _ := createExporters(opts.RawConfig, logger)

Expand Down
68 changes: 68 additions & 0 deletions cmd/occollector/app/sender/jaeger_proto_grpc_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sender

import (
"context"

"go.uber.org/zap"
"google.golang.org/grpc"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"

"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data"
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
)

// JaegerProtoGRPCSender forwards spans encoded in the jaeger proto
// format, to a grpc server.
type JaegerProtoGRPCSender struct {
client jaegerproto.CollectorServiceClient
logger *zap.Logger
}

var _ consumer.TraceConsumer = (*JaegerThriftHTTPSender)(nil)

// NewJaegerProtoGRPCSender returns a new GRPC-backend span sender.
// The collector endpoint should be of the form "hostname:14250".
func NewJaegerProtoGRPCSender(collectorEndpoint string, zlogger *zap.Logger) *JaegerProtoGRPCSender {
client, err := grpc.Dial(collectorEndpoint, grpc.WithInsecure())
zlogger.Fatal("Failed to dail grpc connection", zap.Error(err))
collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
s := &JaegerProtoGRPCSender{
client: collectorServiceClient,
logger: zlogger,
}

return s
}

// ConsumeTraceData receives data.TraceData for processing by the JaegerProtoGRPCSender.
func (s *JaegerProtoGRPCSender) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
protoBatch, err := jaegertranslator.OCProtoToJaegerProto(td)
if err != nil {
s.logger.Warn("Error translating OC proto batch to Jaeger proto", zap.Error(err))
return err
}

_, err = s.client.PostSpans(context.Background(), &jaegerproto.PostSpansRequest{Batch: *protoBatch})
if err != nil {
s.logger.Warn("Error sending grpc batch", zap.Error(err))
return err
}

return nil
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/honeycombio/libhoney-go v1.8.2 // indirect
github.com/honeycombio/opencensus-exporter v0.0.0-20181101214123-9be2bb327b5a
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jaegertracing/jaeger v1.8.2
github.com/jaegertracing/jaeger v1.9.0
github.com/json-iterator/go v1.1.5 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
Expand Down Expand Up @@ -65,7 +65,7 @@ require (
github.com/tinylib/msgp v1.0.2 // indirect
github.com/uber-go/atomic v1.3.2 // indirect
github.com/uber/jaeger-client-go v2.15.0+incompatible // indirect
github.com/uber/jaeger-lib v1.5.0
github.com/uber/jaeger-lib v2.0.0+incompatible
github.com/uber/tchannel-go v1.10.0
github.com/yancl/opencensus-go-exporter-kafka v0.0.0-20181029030031-9c471c1bfbeb
go.opencensus.io v0.19.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/influxdb v0.0.0-20170331210902-15e594fc09f1/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jaegertracing/jaeger v1.8.2 h1:VNrL7qDS7IUJtjSJ3aa3+UU2Shhk26n4B/xi7A99mQc=
github.com/jaegertracing/jaeger v1.8.2/go.mod h1:LUWPSnzNPGRubM8pk0inANGitpiMOOxihXx0+53llXI=
github.com/jaegertracing/jaeger v1.9.0 h1:xtwGp/+H8kvT6q8LRrEOxMhZZGKO4Hsziy4pRmLIczM=
github.com/jaegertracing/jaeger v1.9.0/go.mod h1:LUWPSnzNPGRubM8pk0inANGitpiMOOxihXx0+53llXI=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -447,8 +447,8 @@ github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk=
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw=
github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/tchannel-go v1.10.0 h1:YOihLHuvkwT3nzvpgqFtexFW+pb5vD1Tz7h/bIWApgE=
github.com/uber/tchannel-go v1.10.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJVA32OvWKHo=
github.com/yancl/opencensus-go-exporter-kafka v0.0.0-20181029030031-9c471c1bfbeb h1:DSch+h+LW/9zO8ImnA2KzFylC/ShRAAgRPJVlx6FMSA=
Expand Down
4 changes: 2 additions & 2 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/gorilla/mux"
agentapp "github.com/jaegertracing/jaeger/cmd/agent/app"
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
Expand Down Expand Up @@ -289,7 +289,7 @@ func (jr *jReceiver) GetReporter() reporter.Reporter {
return jr
}

func (jr *jReceiver) GetManager() httpserver.ClientConfigManager {
func (jr *jReceiver) GetManager() configmanager.ClientConfigManager {
return jr
}

Expand Down
43 changes: 43 additions & 0 deletions translator/trace/jaeger/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import (
"errors"
)

const (
// Jaeger Tags
ocTimeEventUnknownType = "oc.timeevent.unknown.type"
ocTimeEventAnnotationDescription = "oc.timeevent.annotation.description"
ocTimeEventMessageEventType = "oc.timeevent.messageevent.type"
ocTimeEventMessageEventID = "oc.timeevent.messageevent.id"
ocTimeEventMessageEventUSize = "oc.timeevent.messageevent.usize"
ocTimeEventMessageEventCSize = "oc.timeevent.messageevent.csize"
ocSameProcessAsParentSpan = "oc.sameprocessasparentspan"
ocSpanChildCount = "oc.span.childcount"
opencensusLanguage = "opencensus.language"
opencensusExporterVersion = "opencensus.exporterversion"
opencensusCoreLibVersion = "opencensus.corelibversion"
)

var (
errZeroTraceID = errors.New("OC span has an all zeros trace ID")
errNilTraceID = errors.New("OC trace ID is nil")
errWrongLenTraceID = errors.New("TraceID does not have 16 bytes")
errZeroSpanID = errors.New("OC span has an all zeros span ID")
errNilID = errors.New("OC ID is nil")
errWrongLenID = errors.New("ID does not have 8 bytes")
)
Loading

0 comments on commit b597ff0

Please sign in to comment.