Skip to content

Commit

Permalink
feat(open-telemetry#5408): use stdout as default output
Browse files Browse the repository at this point in the history
Signed-off-by: thomasgouveia <[email protected]>
  • Loading branch information
thomasgouveia committed Sep 6, 2024
1 parent fc7212e commit 7244adc
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 79 deletions.
83 changes: 51 additions & 32 deletions exporters/otlp/otlplog/otlplogfile/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,71 @@

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"

import "time"
import (
"errors"
"fmt"
"io"
"os"
"time"
)

type fnOpt func(config) config

func (f fnOpt) applyOption(c config) config { return f(c) }

// Option sets the configuration value for an Exporter.
type Option interface {
applyOption(config) config
}
// Option configures a field of the configuration or return an error if needed.
type Option func(*config) (*config, error)

// config contains options for the OTLP Log file exporter.
type config struct {
// Path to a file on disk where records must be appended.
// This file is preferably a json line file as stated in the specification.
// See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file
// See: https://jsonlines.org
path string
// Out is the output where the records should be written.
out io.WriteCloser
// Duration represents the interval when the buffer should be flushed.
flushInterval time.Duration
}

func newConfig(options []Option) config {
c := config{
path: "/var/log/opentelemetry/logs.jsonl",
func newConfig(options []Option) (*config, error) {
c := &config{
out: os.Stdout,
flushInterval: 5 * time.Second,
}

var configErr error
for _, opt := range options {
c = opt.applyOption(c)
if _, err := opt(c); err != nil {
configErr = errors.Join(configErr, err)
}
}
return c

if configErr != nil {
return nil, configErr
}

return c, nil
}

// WithFlushInterval configures the duration after which the buffer is periodically flushed to the disk.
func WithFlushInterval(flushInterval time.Duration) Option {
return fnOpt(func(c config) config {
c.flushInterval = flushInterval
return c
})
// WithFile configures a file where the records will be exported.
// An error is returned if the file could not be created or opened.
func WithFile(path string) Option {
return func(c *config) (*config, error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}

return WithWriter(file)(c)
}
}

// WithWriter configures the destination where the exporter should output
// the records. By default, if not specified, stdout is used.
func WithWriter(w io.WriteCloser) Option {
return func(c *config) (*config, error) {
c.out = w
return c, nil
}
}

// WithPath defines a path to a file where the log records will be written.
// If not set, will default to /var/log/opentelemetry/logs.jsonl.
func WithPath(path string) Option {
return fnOpt(func(c config) config {
c.path = path
return c
})
// WithFlushInterval configures the duration after which the buffer is periodically flushed to the output.
func WithFlushInterval(flushInterval time.Duration) Option {
return func(c *config) (*config, error) {
c.flushInterval = flushInterval
return c, nil
}
}
2 changes: 1 addition & 1 deletion exporters/otlp/otlplog/otlplogfile/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func Example() {
ctx := context.Background()
exp, err := otlplogfile.New(
otlplogfile.WithPath("/tmp/otlp-logs.jsonl"),
otlplogfile.WithFile("/tmp/otlp-logs.jsonl"),
otlplogfile.WithFlushInterval(time.Second),
)
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions exporters/otlp/otlplog/otlplogfile/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md
type Exporter struct {
mu sync.Mutex
fw *writer.FileWriter
w *writer.Writer
stopped bool
}

Expand All @@ -29,15 +29,18 @@ var _ log.Exporter = &Exporter{}

// New returns a new [Exporter].
func New(options ...Option) (*Exporter, error) {
cfg := newConfig(options)
cfg, err := newConfig(options)
if err != nil {
return nil, err
}

fw, err := writer.NewFileWriter(cfg.path, cfg.flushInterval)
w, err := writer.New(cfg.out, cfg.flushInterval)
if err != nil {
return nil, err
}

return &Exporter{
fw: fw,
w: w,
stopped: false,
}, nil
}
Expand Down Expand Up @@ -65,7 +68,7 @@ func (e *Exporter) Export(ctx context.Context, records []log.Record) error {
return err
}

return e.fw.Export(by)
return e.w.Export(by)
}

// ForceFlush flushes data to the file.
Expand All @@ -77,7 +80,7 @@ func (e *Exporter) ForceFlush(_ context.Context) error {
return nil
}

return e.fw.Flush()
return e.w.Flush()
}

// Shutdown shuts down the exporter. Buffered data is written to disk,
Expand All @@ -91,5 +94,5 @@ func (e *Exporter) Shutdown(_ context.Context) error {
}

e.stopped = true
return e.fw.Shutdown()
return e.w.Shutdown()
}
12 changes: 6 additions & 6 deletions exporters/otlp/otlplog/otlplogfile/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (

// tempFile creates a temporary file for the given test case and returns its path on disk.
// The file is automatically cleaned up when the test ends.
func tempFile(tb testing.TB) string {
func tempFile(tb testing.TB) *os.File {
f, err := os.CreateTemp(tb.TempDir(), tb.Name())
assert.NoError(tb, err, "must not error when creating temporary file")
tb.Cleanup(func() {
assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written")
})
return f.Name()
return f
}

// makeRecords is a helper function to generate an array of log record with the desired size.
Expand All @@ -48,10 +48,10 @@ func makeRecords(count int, message string) []sdklog.Record {
}

func TestExporter(t *testing.T) {
filepath := tempFile(t)
file := tempFile(t)
records := makeRecords(1, "hello, world!")

exporter, err := New(WithPath(filepath))
exporter, err := New(WithWriter(file))
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, exporter.Shutdown(context.TODO()))
Expand All @@ -64,8 +64,8 @@ func TestExporter(t *testing.T) {
}

func TestExporterConcurrentSafe(t *testing.T) {
filepath := tempFile(t)
exporter, err := New(WithPath(filepath))
file := tempFile(t)
exporter, err := New(WithWriter(file))
require.NoError(t, err, "New()")

const goroutines = 10
Expand Down
47 changes: 19 additions & 28 deletions exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,29 @@
package writer // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer"

import (
"fmt"
"io"
"os"
"sync"
"time"
)

// FileWriter writes data to a configured file.
// Writer writes data to the configured io.WriteCloser.
// It is buffered to reduce I/O operations to improve performance.
type FileWriter struct {
path string
file io.WriteCloser
mu sync.Mutex
type Writer struct {
out io.WriteCloser
mu sync.Mutex

flushInterval time.Duration
flushTicker *time.Ticker
stopTicker chan struct{}
}

var _ flusher = (*FileWriter)(nil)
var _ flusher = (*Writer)(nil)

// NewFileWriter initializes a file writer for the file at the given path.
func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}

fw := &FileWriter{
path: path,
// New initializes a writer for the given io.WriteCloser.
func New(w io.WriteCloser, flushInterval time.Duration) (*Writer, error) {
fw := &Writer{
flushInterval: flushInterval,
file: newBufferedWriteCloser(file),
out: newBufferedWriteCloser(w),
}

if fw.flushInterval > 0 {
Expand All @@ -46,37 +37,37 @@ func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error
}

// Export writes the given data in the file.
func (w *FileWriter) Export(data []byte) error {
func (w *Writer) Export(data []byte) error {
w.mu.Lock()
defer w.mu.Unlock()

if _, err := w.file.Write(data); err != nil {
if _, err := w.out.Write(data); err != nil {
return err
}

// As stated in the specification, line separator is \n.
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md#json-lines-file
if _, err := io.WriteString(w.file, "\n"); err != nil {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file
if _, err := io.WriteString(w.out, "\n"); err != nil {
return err
}

return nil
}

// Shutdown stops the flusher. It also stops the flush ticker if set.
func (w *FileWriter) Shutdown() error {
func (w *Writer) Shutdown() error {
w.mu.Lock()
defer w.mu.Unlock()

if w.flushTicker != nil {
close(w.stopTicker)
}
return w.file.Close()
return w.out.Close()
}

// Flush writes buffered data to disk.
func (w *FileWriter) Flush() error {
ff, ok := w.file.(flusher)
func (w *Writer) Flush() error {
ff, ok := w.out.(flusher)
if !ok {
return nil
}
Expand All @@ -88,11 +79,11 @@ func (w *FileWriter) Flush() error {
}

// startFlusher starts the flusher to periodically flush the buffer.
func (w *FileWriter) startFlusher() {
func (w *Writer) startFlusher() {
w.mu.Lock()
defer w.mu.Unlock()

ff, ok := w.file.(flusher)
ff, ok := w.out.(flusher)
if !ok {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ func tempFile(tb testing.TB) *os.File {
func TestNewFileWriter(t *testing.T) {
f := tempFile(t)

writer, err := NewFileWriter(f.Name(), 0)
writer, err := New(f, 0)
// nolint: errcheck
defer writer.Shutdown()

assert.NoError(t, err, "must not error when creating the file writer")
assert.Equal(t, f.Name(), writer.path, "writer file path must be the same than the file path")

// Ensure file was created
_, err = os.Stat(f.Name())
Expand All @@ -46,7 +45,7 @@ func TestNewFileWriter(t *testing.T) {
func TestFileWriterExport(t *testing.T) {
f := tempFile(t)

writer, err := NewFileWriter(f.Name(), 0)
writer, err := New(f, 0)
// nolint: errcheck
defer writer.Shutdown()
require.NoError(t, err, "must not error when creating the file writer")
Expand All @@ -66,15 +65,15 @@ func TestFileWriterExport(t *testing.T) {
func TestFileWriterShutdown(t *testing.T) {
f := tempFile(t)

writer, err := NewFileWriter(f.Name(), 0)
writer, err := New(f, 0)
require.NoError(t, err, "must not error when creating the file writer")
assert.NoError(t, writer.Shutdown(), "must not error when calling Shutdown()")
}

func TestFileWriterConcurrentSafe(t *testing.T) {
f := tempFile(t)

writer, err := NewFileWriter(f.Name(), 0)
writer, err := New(f, 0)
require.NoError(t, err, "must not error when creating the file writer")

const goroutines = 10
Expand Down

0 comments on commit 7244adc

Please sign in to comment.