Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/log: Processor.OnEmit accetps a Record pointer #5636

Merged
merged 23 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
65934b2
sdk/log: Change Processor and Record.Clone to operate on pointers
pellared Jul 22, 2024
7c70511
Update BenchmarkProcessor
pellared Jul 22, 2024
eae4fa9
Update unit tests
pellared Jul 22, 2024
fd713c8
gofumpt
pellared Jul 22, 2024
07d7af9
Update DESIGN.md
pellared Jul 22, 2024
94f3c6e
Make BatchProcessor safer
pellared Jul 22, 2024
3fe660b
Update CHANGELOG.md
pellared Jul 22, 2024
4f5cdb6
Refine commments
pellared Jul 22, 2024
3c0a490
Merge branch 'pointer-processor' of https://github.com/pellared/opent…
pellared Jul 22, 2024
6a944cf
Merge branch 'main' into pointer-processor
pellared Jul 22, 2024
8235dc1
Merge branch 'main' into pointer-processor
pellared Jul 23, 2024
947aeaf
Merge branch 'main' into pointer-processor
pellared Jul 24, 2024
8b64c68
Apply suggestions from code review
pellared Jul 24, 2024
790ab22
Revert Record.Clone changes
pellared Jul 24, 2024
55dafb5
Add comment why we are cloning the record
pellared Jul 24, 2024
82d5cf7
Merge branch 'main' into pointer-processor
pellared Jul 24, 2024
ac5e2c4
Merge branch 'main' into pointer-processor
pellared Jul 25, 2024
16fad13
Merge branch 'main' into pointer-processor
pellared Jul 29, 2024
b8b7fb8
Update sdk/log/example_test.go
pellared Jul 30, 2024
79f79ee
Merge branch 'main' into pointer-processor
pellared Jul 31, 2024
d304818
Merge branch 'main' into pointer-processor
pellared Jul 31, 2024
dfa72a6
Processor.Enabled to accept Record value
pellared Jul 31, 2024
76c6d0a
Merge branch 'main' into pointer-processor
pellared Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This module is unstable and breaking changes may be introduced.
See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629)

### Changed

- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)
- `Record.Clone` in `go.opentelemetry.io/otel/sdk/log` now returns a pointer to `Record`. (#5636)
pellared marked this conversation as resolved.
Show resolved Hide resolved

### Fixed

- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5584)
Expand Down
18 changes: 18 additions & 0 deletions sdk/log/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,25 @@ provided via API.
Moreover it is safer to have these abstraction decoupled.
E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors.

### Processor to accept Record values

There was a proposal to make the [Processor](#processor) accept a
[Record](#record) value instead of a pointer.

There have been long discussions within the OpenTelemetry Specification SIG[^5]
about whether such a design would comply with the specification. The summary
was that the current processor design flaws are present in other languages as
well. Therefore, it would be favorable to introduce new processing concepts
(e.g. chaining processors) in the specification that would coexist with the
current "mutable" processor design.

The performance disadvantages caused by using a pointer (which at the time of
writing causes an additional heap allocation) may be mitigated by future
versions of the Go compiler, thanks to improved escape analysis and
profile-guided optimization (PGO).
pellared marked this conversation as resolved.
Show resolved Hide resolved

[^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide)
[^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs)
[^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480)
[^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9)
[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067)
pellared marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 3 additions & 3 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
if n := b.q.Enqueue(*r.Clone()); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
Expand All @@ -193,7 +193,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
func (b *BatchProcessor) Enabled(context.Context, *Record) bool {
return !b.stopped.Load() && b.q != nil
}

Expand Down
38 changes: 19 additions & 19 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
Expand Down Expand Up @@ -197,8 +197,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < size; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var got []Record
assert.Eventually(t, func() bool {
Expand All @@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 10*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
Expand All @@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 2*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}

var n int
Expand All @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) {

var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
err = b.OnEmit(ctx, new(Record))
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
Expand All @@ -272,10 +272,10 @@ func TestBatchProcessor(t *testing.T) {

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))
assert.True(t, b.Enabled(ctx, new(Record)))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
assert.False(t, b.Enabled(ctx, new(Record)))
})

t.Run("Shutdown", func(t *testing.T) {
Expand Down Expand Up @@ -303,15 +303,15 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))

want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})

t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)

assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.NoError(t, b.Shutdown(ctx))

assert.NoError(t, b.ForceFlush(ctx))
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) {
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))

Expand All @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) {
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, r, got[0][0])
assert.Equal(t, *r, got[0][0])
}
}
})
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) {

// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
require.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) {
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) {
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)

Expand Down
113 changes: 79 additions & 34 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,77 @@ import (
func BenchmarkProcessor(b *testing.B) {
for _, tc := range []struct {
name string
f func() Processor
f func() []LoggerProviderOption
}{
{
name: "Simple",
f: func() Processor {
return NewSimpleProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))}
},
},
{
name: "Batch",
f: func() Processor {
return NewBatchProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))}
},
},
{
name: "SetTimestampSimple",
f: func() Processor {
return timestampDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetTimestampBatch",
f: func() Processor {
return timestampDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesSimple",
f: func() Processor {
return attrAddDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesBatch",
f: func() Processor {
return attrAddDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesSimple",
f: func() Processor {
return attrSetDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesBatch",
f: func() Processor {
return attrSetDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
} {
b.Run(tc.name, func(b *testing.B) {
provider := NewLoggerProvider(WithProcessor(tc.f()))
provider := NewLoggerProvider(tc.f()...)
b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) })
logger := provider.Logger(b.Name())

Expand All @@ -91,32 +109,59 @@ func BenchmarkProcessor(b *testing.B) {
}
}

type timestampDecorator struct {
Processor
}
type timestampProcessor struct{}

func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error {
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
return nil
}

func (p timestampProcessor) Enabled(context.Context, *Record) bool {
return true
}

type attrAddDecorator struct {
Processor
func (p timestampProcessor) Shutdown(ctx context.Context) error {
return nil
}

func (e attrAddDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) ForceFlush(ctx context.Context) error {
return nil
}

type attrAddProcessor struct{}

func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error {
r.AddAttributes(log.String("add", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}

type attrSetDecorator struct {
Processor
func (p attrAddProcessor) Enabled(context.Context, *Record) bool {
return true
}

func (e attrSetDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p attrAddProcessor) Shutdown(ctx context.Context) error {
return nil
}

func (p attrAddProcessor) ForceFlush(ctx context.Context) error {
return nil
}

type attrSetDecorator struct{}

func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error {
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}

func (p attrSetDecorator) Enabled(context.Context, *Record) bool {
return true
}

func (p attrSetDecorator) Shutdown(ctx context.Context) error {
return nil
}

func (p attrSetDecorator) ForceFlush(ctx context.Context) error {
return nil
}
Loading