Skip to content

Commit

Permalink
Implement blocking queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Dec 18, 2024
1 parent e5294cf commit bcd3fe8
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 6 deletions.
19 changes: 17 additions & 2 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
be.queueFactory = exporterqueue.NewBlockingMemoryQueue[internal.Request]()
be.queueCfg.QueueSize = 20
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
})
}
runTest("enable_queue_batcher", true)
// We don't expect the same behavior when disable_queue_batcher is true
runTest("disable_queue_batcher", false)
}

Expand Down
8 changes: 5 additions & 3 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
c := context.WithoutCancel(ctx)
if !usePullingBasedExporterQueueBatcher.IsEnabled() && !qs.queue.IsBlocking() {
ctx = context.WithoutCancel(ctx)
}

span := trace.SpanFromContext(c)
if err := qs.queue.Offer(c, req); err != nil {
span := trace.SpanFromContext(ctx)
if err := qs.queue.Offer(ctx, req); err != nil {
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
return err
}
Expand Down
12 changes: 12 additions & 0 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ func NewMemoryQueueFactory[T any]() Factory[T] {
}
}

// NewBlockingMemoryQueue returns a factory to create a new blocking memory queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewBlockingMemoryQueue[T any]() Factory[T] {
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
return queue.NewBlockingMemoryQueue[T](queue.BlockingMemoryQueueSettings[T]{
Sizer: &queue.RequestSizer[T]{},
Capacity: int64(cfg.QueueSize),
})
}
}

// PersistentQueueSettings defines developer settings for the persistent queue factory.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand Down
93 changes: 93 additions & 0 deletions exporter/internal/queue/blocking_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// boundedMemoryQueue blocks insert until the batch containing the request is sent out.
type blockingMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[blockingMemQueueEl[T]]
sizer Sizer[T]

mu sync.Mutex
nextIndex uint64
doneCh map[uint64](chan error)
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type BlockingMemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int64
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBlockingMemoryQueue[T any](set BlockingMemoryQueueSettings[T]) Queue[T] {
return &blockingMemoryQueue[T]{
sizedChannel: newSizedChannel[blockingMemQueueEl[T]](set.Capacity, nil, 0),
sizer: set.Sizer,
nextIndex: 0,
doneCh: make(map[uint64](chan error)),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *blockingMemoryQueue[T]) Offer(ctx context.Context, req T) error {
q.mu.Lock()
index := q.nextIndex
q.nextIndex++
done := make(chan error)
q.doneCh[index] = done

if err := q.sizedChannel.push(
blockingMemQueueEl[T]{ctx: ctx, req: req, index: index},
q.sizer.Sizeof(req),
nil); err != nil {
delete(q.doneCh, index)
q.mu.Unlock()
return err
}

Check warning on line 56 in exporter/internal/queue/blocking_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/blocking_queue.go#L53-L56

Added lines #L53 - L56 were not covered by tests

q.mu.Unlock()
err := <-done
return err
}

func (q *blockingMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop(func(el blockingMemQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
return item.index, item.ctx, item.req, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *blockingMemoryQueue[T]) OnProcessingFinished(index uint64, err error) {
q.mu.Lock()
q.doneCh[index] <- err
delete(q.doneCh, index)
q.mu.Unlock()
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *blockingMemoryQueue[T]) Shutdown(context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()
q.sizedChannel.shutdown()
return nil
}

func (q *blockingMemoryQueue[T]) IsBlocking() bool {
return true

Check warning on line 86 in exporter/internal/queue/blocking_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/blocking_queue.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

type blockingMemQueueEl[T any] struct {
index uint64
req T
ctx context.Context
}
12 changes: 12 additions & 0 deletions exporter/internal/queue/blocking_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"testing"
)

func TestBlockingMemoryQueue(t *testing.T) {

}
4 changes: 4 additions & 0 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
return nil
}

func (q *boundedMemoryQueue[T]) IsBlocking() bool {
return false
}

type memQueueEl[T any] struct {
req T
ctx context.Context
Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,7 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
}
return val, nil
}

func (pq *persistentQueue[T]) IsBlocking() bool {
return false
}
2 changes: 2 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Queue[T any] interface {
Read(context.Context) (uint64, context.Context, T, bool)
// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
// Returns a boolean to tell whether the queue is blocking
IsBlocking() bool
}

// Sizer is an interface that returns the size of the given element.
Expand Down

0 comments on commit bcd3fe8

Please sign in to comment.