Skip to content

Commit

Permalink
Add EarlyReturn feature for legacy compatibility (#253)
Browse files Browse the repository at this point in the history
This feature would allow a user to emulate the legacy core batch
processor's behavior. I expect this to be controlled by a feature flag
in the upstream repository.
  • Loading branch information
jmacd authored Sep 25, 2024
1 parent df78dcb commit 3f9c778
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 2 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Concurrent batch processor: synchronize with upstream; removes in-flight bytes metric,
- Concurrent batch processor: EarlyReturn legacy compat feature. [#253](https://github.com/open-telemetry/otel-arrow/pull/253)
- Concurrent batch processor: Synchronize with upstream; removes in-flight bytes metric,
removes panic recovery as unnecessary divergence. [#251](https://github.com/open-telemetry/otel-arrow/pull/251)
- Update collector dependencies to v0.110.0/v1.16.0; remove validation connector
- Update collector dependencies to v0.110.0/v1.16.0; remove validation connector [#252](https://github.com/open-telemetry/otel-arrow/pull/252)

## [0.26.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.26.0) - 2024-09-06

Expand Down
3 changes: 3 additions & 0 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ ignored as data will be sent immediately, subject to only `send_batch_max_size`.
not empty, this setting limits the number of unique combinations of
metadata key values that will be processed over the lifetime of the
process.
- `early_return` (default = false): When enabled, this pipeline component
will return immediate success to the caller after enqueuing the item
for eventual delivery.

See notes about metadata batching below.

Expand Down
10 changes: 10 additions & 0 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type batchProcessor struct {
// metadataLimit is the limiting size of the batchers map.
metadataLimit int

// earlyReturn is the value of Config.EarlyReturn.
earlyReturn bool

shutdownC chan struct{}
goroutines sync.WaitGroup

Expand Down Expand Up @@ -182,6 +185,7 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
earlyReturn: cfg.EarlyReturn,
tracer: set.TelemetrySettings.TracerProvider.Tracer(metadata.ScopeName),
}

Expand Down Expand Up @@ -370,7 +374,10 @@ func (b *shard) sendItems(trigger trigger) {

b.totalSent = numItemsAfter

b.processor.goroutines.Add(1)

go func() {
defer b.processor.goroutines.Done()
var err error

var parentSpan trace.Span
Expand Down Expand Up @@ -487,6 +494,9 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
case <-ctx.Done():
return ctx.Err()
case b.newItem <- item:
if b.processor.earlyReturn {
return nil
}
}

var err error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1754,3 +1754,58 @@ func TestErrorPropagation(t *testing.T) {
require.NoError(t, batcher.Shutdown(context.Background()))
}
}

func TestBatchProcessorEarlyReturn(t *testing.T) {
bg := context.Background()
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.EarlyReturn = true
cfg.Timeout = time.Minute
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)

start := time.Now()
require.NoError(t, batcher.Start(bg, componenttest.NewNopHost()))

requestCount := 1000
spansPerRequest := 100
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())
// Note: not using sendTraces()-- this test is synchronous.
require.NoError(t, batcher.ConsumeTraces(bg, td))
}

// This should take very little time.
require.Less(t, time.Since(start), cfg.Timeout/2)

// Shutdown, then wait for callers. Note that Shutdown is not
// properly synchronized when EarlyReturn is true, so up to
// the capacity of the channel times spansPerRequest may go
// missing.
require.NoError(t, batcher.Shutdown(context.Background()))

wg.Wait()

// Despite the early return, we expect 100% completion because
// Shutdown flushes the nextItem channel and waits for pending exports.
require.LessOrEqual(t, requestCount*spansPerRequest, sink.SpanCount())
receivedTraces := sink.AllTraces()
spansReceivedByName := spansReceivedByName(receivedTraces)
for requestNum := 0; requestNum < requestCount; requestNum++ {
spans := sentResourceSpans.At(requestNum).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
require.EqualValues(t,
spans.At(spanIndex),
spansReceivedByName[getTestSpanName(requestNum, spanIndex)])
}
}
}
6 changes: 6 additions & 0 deletions collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type Config struct {
// batcher instances that will be created through a distinct
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

// EarlyReturn dictates whether the batch processor will
// return success as soon as the data item has been accepted
// into a pending batch. When set, the return will be
// unconditional success, not determined by the actual outcome.
EarlyReturn bool `mapstructure:"early_return"`
}

var _ component.Config = (*Config)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
EarlyReturn: true,
}, cfg)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
timeout: 10s
send_batch_size: 10000
send_batch_max_size: 11000
early_return: true

0 comments on commit 3f9c778

Please sign in to comment.