Skip to content

Commit

Permalink
Migrate supported otel components to xray service
Browse files Browse the repository at this point in the history
  • Loading branch information
jcreixell committed Jan 5, 2024
1 parent f61a967 commit 9bd6224
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 62 deletions.
17 changes: 12 additions & 5 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package exporter
import (
"context"
"errors"
"fmt"
"os"

"github.com/grafana/agent/component"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/grafana/agent/component/otelcol/internal/views"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util/zapadapter"
"github.com/grafana/agent/service/xray"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
otelexporter "go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -101,9 +103,18 @@ var (
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
data, err := opts.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(opts.ID)
}

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyexporterconsumer.New(ctx)
consumer := lazyexporterconsumer.New(ctx, debugStreamCallback)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand Down Expand Up @@ -235,7 +246,3 @@ func (e *Exporter) Update(args component.Arguments) error {
func (e *Exporter) CurrentHealth() component.Health {
return e.sched.CurrentHealth()
}

func (e *Exporter) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
e.consumer.HookDebugStream(active, debugStreamCallback)
}
25 changes: 14 additions & 11 deletions component/otelcol/exporter/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter/loki/internal/convert"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/service/xray"
)

func init() {
Expand All @@ -38,6 +39,7 @@ type Component struct {

converter *convert.Converter
logsReceiverStreamDebug *logsReceiverStreamDebug
xray *xray.Service
}

var (
Expand All @@ -48,15 +50,21 @@ var (
func New(o component.Options, c Arguments) (*Component, error) {
converter := convert.New(o.Logger, o.Registerer, c.ForwardTo)

data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)

res := &Component{
log: o.Logger,
opts: o,

converter: converter,
logsReceiverStreamDebug: &logsReceiverStreamDebug{
entries: make(chan loki.Entry),
debugStreamCallback: func(func() string) {},
entries: make(chan loki.Entry),
},
xray: xray,
}
if err := res.Update(c); err != nil {
return nil, err
Expand All @@ -79,9 +87,9 @@ func (c *Component) Run(ctx context.Context) error {
case <-ctx.Done():
return nil
case entry := <-c.logsReceiverStreamDebug.Chan():
c.logsReceiverStreamDebug.debugStreamCallback(func() string {
return fmt.Sprintf("ts=%s, labels=%s, entry=%s", entry.Timestamp.Format(time.RFC3339Nano), entry.Labels.String(), entry.Line)
})
if ds := c.xray.GetDebugStream(c.opts.ID); ds != nil {
ds(fmt.Sprintf("ts=%s, labels=%s, entry=%s", entry.Timestamp.Format(time.RFC3339Nano), entry.Labels.String(), entry.Line))
}
}
}
}
Expand All @@ -94,14 +102,9 @@ func (c *Component) Update(newConfig component.Arguments) error {
}

type logsReceiverStreamDebug struct {
entries chan loki.Entry
debugStreamCallback func(computeDataFunc func() string)
entries chan loki.Entry
}

func (l *logsReceiverStreamDebug) Chan() chan loki.Entry {
return l.entries
}

func (c *Component) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.logsReceiverStreamDebug.debugStreamCallback = debugStreamCallback
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ import (
)

type Consumer struct {
debugStreamCallback func(computeDataFunc func() string)
debugStreamCallback func() func(string)
logsMarshaler plog.Marshaler
metricsMarshaler pmetric.Marshaler
tracesMarshaler ptrace.Marshaler
isActive bool
}

var _ otelcol.Consumer = (*Consumer)(nil)

func New() *Consumer {
func New(debugStreamCallback func() func(string)) *Consumer {
return &Consumer{
debugStreamCallback: func(computeDataFunc func() string) {},
debugStreamCallback: debugStreamCallback,
logsMarshaler: NewTextLogsMarshaler(),
metricsMarshaler: NewTextMetricsMarshaler(),
tracesMarshaler: NewTextTracesMarshaler(),
Expand All @@ -37,32 +36,27 @@ func (c *Consumer) Capabilities() otelconsumer.Capabilities {

// ConsumeTraces implements otelcol.ConsumeTraces.
func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if c.isActive {
if cb := c.debugStreamCallback(); cb != nil {
data, _ := c.tracesMarshaler.MarshalTraces(td)
c.debugStreamCallback(func() string { return string(data) })
cb(string(data))
}
return nil
}

// ConsumeMetrics implements otelcol.ConsumeMetrics.
func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if c.isActive {
if cb := c.debugStreamCallback(); cb != nil {
data, _ := c.metricsMarshaler.MarshalMetrics(md)
c.debugStreamCallback(func() string { return string(data) })
cb(string(data))
}
return nil
}

// ConsumeLogs implements otelcol.ConsumeLogs.
func (c *Consumer) ConsumeLogs(ctx context.Context, md plog.Logs) error {
if c.isActive {
if cb := c.debugStreamCallback(); cb != nil {
data, _ := c.logsMarshaler.MarshalLogs(md)
c.debugStreamCallback(func() string { return string(data) })
cb(string(data))
}
return nil
}

func (c *Consumer) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamCallback = debugStreamCallback
c.isActive = active
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var (
// New creates a new Consumer. The provided ctx is used to determine when the
// Consumer should stop accepting data; if the ctx is closed, no further data
// will be accepted.
func New(ctx context.Context) *Consumer {
return &Consumer{ctx: ctx, debugStreamConsumer: debugstreamconsumer.New()}
func New(ctx context.Context, debugStreamCallback func() func(string)) *Consumer {
return &Consumer{ctx: ctx, debugStreamConsumer: debugstreamconsumer.New(debugStreamCallback)}
}

// Capabilities implements otelconsumer.baseConsumer.
Expand Down Expand Up @@ -127,7 +127,3 @@ func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l
c.logsConsumer = l
c.tracesConsumer = t
}

func (c *Consumer) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamConsumer.HookDebugStream(active, debugStreamCallback)
}
15 changes: 10 additions & 5 deletions component/otelcol/processor/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/pkg/flow/logging/level"
promsdconsumer "github.com/grafana/agent/pkg/traces/promsdprocessor/consumer"
"github.com/grafana/agent/service/xray"
"github.com/grafana/river"
)

Expand Down Expand Up @@ -91,7 +92,15 @@ func New(o component.Options, c Arguments) (*Component, error) {
level.Warn(o.Logger).Log("msg", "non-trace output detected; this component only works for traces")
}

debugStreamConsumer := debugstreamconsumer.New()
data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(o.ID)
}
debugStreamConsumer := debugstreamconsumer.New(debugStreamCallback)

nextTraces := fanoutconsumer.Traces(append(c.Output.Traces, debugStreamConsumer))

Expand Down Expand Up @@ -163,7 +172,3 @@ func (c *Component) Update(newConfig component.Arguments) error {

return nil
}

func (c *Component) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamConsumer.HookDebugStream(active, debugStreamCallback)
}
17 changes: 12 additions & 5 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package processor
import (
"context"
"errors"
"fmt"
"os"

"github.com/grafana/agent/component"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util/zapadapter"
"github.com/grafana/agent/service/xray"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
Expand Down Expand Up @@ -73,6 +75,15 @@ var (
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Processor, error) {
data, err := opts.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(opts.ID)
}

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
Expand All @@ -99,7 +110,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc

sched: scheduler.New(opts.Logger),
collector: collector,
debugStreamConsumer: debugstreamconsumer.New(),
debugStreamConsumer: debugstreamconsumer.New(debugStreamCallback),
}
if err := p.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -208,7 +219,3 @@ func (p *Processor) Update(args component.Arguments) error {
func (p *Processor) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}

func (p *Processor) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
p.debugStreamConsumer.HookDebugStream(active, debugStreamCallback)
}
18 changes: 13 additions & 5 deletions component/otelcol/receiver/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loki

import (
"context"
"fmt"
"path"
"strings"
"sync"
Expand All @@ -14,6 +15,7 @@ import (
debugstreamconsumer "github.com/grafana/agent/component/otelcol/internal/debugStreamConsumer"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/xray"
loki_translator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -62,10 +64,20 @@ var _ component.Component = (*Component)(nil)
func New(o component.Options, c Arguments) (*Component, error) {
// TODO(@tpaschalis) Create a metrics struct to count
// total/successful/errored log entries?

data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(o.ID)
}

res := &Component{
log: o.Logger,
opts: o,
debugStreamConsumer: debugstreamconsumer.New(),
debugStreamConsumer: debugstreamconsumer.New(debugStreamCallback),
}

// Create and immediately export the receiver which remains the same for
Expand Down Expand Up @@ -150,7 +162,3 @@ func convertLokiEntryToPlog(lokiEntry loki.Entry) plog.Logs {

return logs
}

func (c *Component) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamConsumer.HookDebugStream(active, debugStreamCallback)
}
17 changes: 12 additions & 5 deletions component/otelcol/receiver/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheus

import (
"context"
"fmt"
"os"
"regexp"
"sync"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/grafana/agent/component/otelcol/receiver/prometheus/internal"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util/zapadapter"
"github.com/grafana/agent/service/xray"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -65,10 +67,19 @@ var (

// New creates a new otelcol.receiver.prometheus component.
func New(o component.Options, c Arguments) (*Component, error) {
data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(o.ID)
}

res := &Component{
log: o.Logger,
opts: o,
debugStreamConsumer: debugstreamconsumer.New(),
debugStreamConsumer: debugstreamconsumer.New(debugStreamCallback),
}

if err := res.Update(c); err != nil {
Expand Down Expand Up @@ -159,7 +170,3 @@ func (c *Component) Update(newConfig component.Arguments) error {

return nil
}

func (c *Component) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamConsumer.HookDebugStream(active, debugStreamCallback)
}
Loading

0 comments on commit 9bd6224

Please sign in to comment.