diff --git a/exporters/otlp/otlplog/otlplogfile/config.go b/exporters/otlp/otlplog/otlplogfile/config.go index 5b0cf3507ab..d765601f8af 100644 --- a/exporters/otlp/otlplog/otlplogfile/config.go +++ b/exporters/otlp/otlplog/otlplogfile/config.go @@ -3,52 +3,71 @@ package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile" -import "time" +import ( + "errors" + "fmt" + "io" + "os" + "time" +) -type fnOpt func(config) config - -func (f fnOpt) applyOption(c config) config { return f(c) } - -// Option sets the configuration value for an Exporter. -type Option interface { - applyOption(config) config -} +// Option configures a field of the configuration or return an error if needed. +type Option func(*config) (*config, error) // config contains options for the OTLP Log file exporter. type config struct { - // Path to a file on disk where records must be appended. - // This file is preferably a json line file as stated in the specification. - // See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file - // See: https://jsonlines.org - path string + // Out is the output where the records should be written. + out io.WriteCloser // Duration represents the interval when the buffer should be flushed. flushInterval time.Duration } -func newConfig(options []Option) config { - c := config{ - path: "/var/log/opentelemetry/logs.jsonl", +func newConfig(options []Option) (*config, error) { + c := &config{ + out: os.Stdout, flushInterval: 5 * time.Second, } + + var configErr error for _, opt := range options { - c = opt.applyOption(c) + if _, err := opt(c); err != nil { + configErr = errors.Join(configErr, err) + } } - return c + + if configErr != nil { + return nil, configErr + } + + return c, nil } -// WithFlushInterval configures the duration after which the buffer is periodically flushed to the disk. -func WithFlushInterval(flushInterval time.Duration) Option { - return fnOpt(func(c config) config { - c.flushInterval = flushInterval - return c - }) +// WithFile configures a file where the records will be exported. +// An error is returned if the file could not be created or opened. +func WithFile(path string) Option { + return func(c *config) (*config, error) { + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + + return WithWriter(file)(c) + } +} + +// WithWriter configures the destination where the exporter should output +// the records. By default, if not specified, stdout is used. +func WithWriter(w io.WriteCloser) Option { + return func(c *config) (*config, error) { + c.out = w + return c, nil + } } -// WithPath defines a path to a file where the log records will be written. -// If not set, will default to /var/log/opentelemetry/logs.jsonl. -func WithPath(path string) Option { - return fnOpt(func(c config) config { - c.path = path - return c - }) +// WithFlushInterval configures the duration after which the buffer is periodically flushed to the output. +func WithFlushInterval(flushInterval time.Duration) Option { + return func(c *config) (*config, error) { + c.flushInterval = flushInterval + return c, nil + } } diff --git a/exporters/otlp/otlplog/otlplogfile/example_test.go b/exporters/otlp/otlplog/otlplogfile/example_test.go index a46b9fc1e2a..a6a2fda925b 100644 --- a/exporters/otlp/otlplog/otlplogfile/example_test.go +++ b/exporters/otlp/otlplog/otlplogfile/example_test.go @@ -15,7 +15,7 @@ import ( func Example() { ctx := context.Background() exp, err := otlplogfile.New( - otlplogfile.WithPath("/tmp/otlp-logs.jsonl"), + otlplogfile.WithFile("/tmp/otlp-logs.jsonl"), otlplogfile.WithFlushInterval(time.Second), ) if err != nil { diff --git a/exporters/otlp/otlplog/otlplogfile/exporter.go b/exporters/otlp/otlplog/otlplogfile/exporter.go index 1fed76a4920..12c24086bac 100644 --- a/exporters/otlp/otlplog/otlplogfile/exporter.go +++ b/exporters/otlp/otlplog/otlplogfile/exporter.go @@ -20,7 +20,7 @@ import ( // defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md type Exporter struct { mu sync.Mutex - fw *writer.FileWriter + w *writer.Writer stopped bool } @@ -29,15 +29,18 @@ var _ log.Exporter = &Exporter{} // New returns a new [Exporter]. func New(options ...Option) (*Exporter, error) { - cfg := newConfig(options) + cfg, err := newConfig(options) + if err != nil { + return nil, err + } - fw, err := writer.NewFileWriter(cfg.path, cfg.flushInterval) + w, err := writer.New(cfg.out, cfg.flushInterval) if err != nil { return nil, err } return &Exporter{ - fw: fw, + w: w, stopped: false, }, nil } @@ -65,7 +68,7 @@ func (e *Exporter) Export(ctx context.Context, records []log.Record) error { return err } - return e.fw.Export(by) + return e.w.Export(by) } // ForceFlush flushes data to the file. @@ -77,7 +80,7 @@ func (e *Exporter) ForceFlush(_ context.Context) error { return nil } - return e.fw.Flush() + return e.w.Flush() } // Shutdown shuts down the exporter. Buffered data is written to disk, @@ -91,5 +94,5 @@ func (e *Exporter) Shutdown(_ context.Context) error { } e.stopped = true - return e.fw.Shutdown() + return e.w.Shutdown() } diff --git a/exporters/otlp/otlplog/otlplogfile/exporter_test.go b/exporters/otlp/otlplog/otlplogfile/exporter_test.go index 7102761d4e7..4d1da14ac75 100644 --- a/exporters/otlp/otlplog/otlplogfile/exporter_test.go +++ b/exporters/otlp/otlplog/otlplogfile/exporter_test.go @@ -23,13 +23,13 @@ import ( // tempFile creates a temporary file for the given test case and returns its path on disk. // The file is automatically cleaned up when the test ends. -func tempFile(tb testing.TB) string { +func tempFile(tb testing.TB) *os.File { f, err := os.CreateTemp(tb.TempDir(), tb.Name()) assert.NoError(tb, err, "must not error when creating temporary file") tb.Cleanup(func() { assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written") }) - return f.Name() + return f } // makeRecords is a helper function to generate an array of log record with the desired size. @@ -48,10 +48,10 @@ func makeRecords(count int, message string) []sdklog.Record { } func TestExporter(t *testing.T) { - filepath := tempFile(t) + file := tempFile(t) records := makeRecords(1, "hello, world!") - exporter, err := New(WithPath(filepath)) + exporter, err := New(WithWriter(file)) assert.NoError(t, err) t.Cleanup(func() { assert.NoError(t, exporter.Shutdown(context.TODO())) @@ -64,8 +64,8 @@ func TestExporter(t *testing.T) { } func TestExporterConcurrentSafe(t *testing.T) { - filepath := tempFile(t) - exporter, err := New(WithPath(filepath)) + file := tempFile(t) + exporter, err := New(WithWriter(file)) require.NoError(t, err, "New()") const goroutines = 10 diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go index dc44eb54c86..ad77ffa1287 100644 --- a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer.go @@ -4,38 +4,29 @@ package writer // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer" import ( - "fmt" "io" - "os" "sync" "time" ) -// FileWriter writes data to a configured file. +// Writer writes data to the configured io.WriteCloser. // It is buffered to reduce I/O operations to improve performance. -type FileWriter struct { - path string - file io.WriteCloser - mu sync.Mutex +type Writer struct { + out io.WriteCloser + mu sync.Mutex flushInterval time.Duration flushTicker *time.Ticker stopTicker chan struct{} } -var _ flusher = (*FileWriter)(nil) +var _ flusher = (*Writer)(nil) -// NewFileWriter initializes a file writer for the file at the given path. -func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error) { - file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) - if err != nil { - return nil, fmt.Errorf("failed to open file: %w", err) - } - - fw := &FileWriter{ - path: path, +// New initializes a writer for the given io.WriteCloser. +func New(w io.WriteCloser, flushInterval time.Duration) (*Writer, error) { + fw := &Writer{ flushInterval: flushInterval, - file: newBufferedWriteCloser(file), + out: newBufferedWriteCloser(w), } if fw.flushInterval > 0 { @@ -46,17 +37,17 @@ func NewFileWriter(path string, flushInterval time.Duration) (*FileWriter, error } // Export writes the given data in the file. -func (w *FileWriter) Export(data []byte) error { +func (w *Writer) Export(data []byte) error { w.mu.Lock() defer w.mu.Unlock() - if _, err := w.file.Write(data); err != nil { + if _, err := w.out.Write(data); err != nil { return err } // As stated in the specification, line separator is \n. - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md#json-lines-file - if _, err := io.WriteString(w.file, "\n"); err != nil { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md#json-lines-file + if _, err := io.WriteString(w.out, "\n"); err != nil { return err } @@ -64,19 +55,19 @@ func (w *FileWriter) Export(data []byte) error { } // Shutdown stops the flusher. It also stops the flush ticker if set. -func (w *FileWriter) Shutdown() error { +func (w *Writer) Shutdown() error { w.mu.Lock() defer w.mu.Unlock() if w.flushTicker != nil { close(w.stopTicker) } - return w.file.Close() + return w.out.Close() } // Flush writes buffered data to disk. -func (w *FileWriter) Flush() error { - ff, ok := w.file.(flusher) +func (w *Writer) Flush() error { + ff, ok := w.out.(flusher) if !ok { return nil } @@ -88,11 +79,11 @@ func (w *FileWriter) Flush() error { } // startFlusher starts the flusher to periodically flush the buffer. -func (w *FileWriter) startFlusher() { +func (w *Writer) startFlusher() { w.mu.Lock() defer w.mu.Unlock() - ff, ok := w.file.(flusher) + ff, ok := w.out.(flusher) if !ok { return } diff --git a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go index e9374997872..1c450f86bb1 100644 --- a/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go +++ b/exporters/otlp/otlplog/otlplogfile/internal/writer/writer_test.go @@ -31,12 +31,11 @@ func tempFile(tb testing.TB) *os.File { func TestNewFileWriter(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) // nolint: errcheck defer writer.Shutdown() assert.NoError(t, err, "must not error when creating the file writer") - assert.Equal(t, f.Name(), writer.path, "writer file path must be the same than the file path") // Ensure file was created _, err = os.Stat(f.Name()) @@ -46,7 +45,7 @@ func TestNewFileWriter(t *testing.T) { func TestFileWriterExport(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) // nolint: errcheck defer writer.Shutdown() require.NoError(t, err, "must not error when creating the file writer") @@ -66,7 +65,7 @@ func TestFileWriterExport(t *testing.T) { func TestFileWriterShutdown(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) require.NoError(t, err, "must not error when creating the file writer") assert.NoError(t, writer.Shutdown(), "must not error when calling Shutdown()") } @@ -74,7 +73,7 @@ func TestFileWriterShutdown(t *testing.T) { func TestFileWriterConcurrentSafe(t *testing.T) { f := tempFile(t) - writer, err := NewFileWriter(f.Name(), 0) + writer, err := New(f, 0) require.NoError(t, err, "must not error when creating the file writer") const goroutines = 10