From efcf13c363db8b164ad9327fc643af8d16f0f8e9 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 8 Jan 2025 14:17:23 -0600 Subject: [PATCH 1/3] feat: Add query-log-path configuration This PR adds the ability for an admin to define 'query-log-path' as a configuration item field. A new configuration option is created ```toml [data] query-log-path = "/var/influx/query.log" ``` This will enable query logging being written to a log file on disk. Logged queries example: ``` {"level":"info","ts":1735248393.084461,"msg":"Executing query","query":"SHOW DATABASES"} {"level":"info","ts":1735248395.092188,"msg":"Executing query","query":"SHOW DATABASES"} {"level":"info","ts":1735248398.58039,"msg":"Executing query","query":"SELECT * FROM stress.autogen.m0 LIMIT 20"} ``` --- cmd/influxd/run/server.go | 10 ++ etc/config.sample.toml | 1 + go.mod | 1 + go.sum | 1 + logger/logger.go | 8 +- query/executor.go | 109 ++++++++++++ query/executor_test.go | 277 +++++++++++++++++++++++++++++++ query/file_log_watcher.go | 176 ++++++++++++++++++++ tsdb/config.go | 3 + tsdb/engine/tsm1/compact.go | 93 +++++------ tsdb/engine/tsm1/compact_test.go | 130 ++++----------- tsdb/engine/tsm1/writer.go | 66 ++++---- 12 files changed, 695 insertions(+), 180 deletions(-) create mode 100644 query/file_log_watcher.go diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index c0340450271..5647939620d 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -487,6 +487,16 @@ func (s *Server) Open() error { // so it only logs as is appropriate. s.QueryExecutor.TaskManager.Logger = s.Logger } + if s.config.Data.QueryLogPath != "" { + path := s.config.Data.QueryLogPath + flw := query.NewFileLogWatcher(s.QueryExecutor, path, s.Logger, s.config.Logging.Format) + if flw != nil { + s.QueryExecutor.WithLogWriter(flw, context.Background()) + } else { + s.Logger.Error("error creating log writer", zap.String("path", path)) + } + } + s.PointsWriter.WithLogger(s.Logger) s.Subscriber.WithLogger(s.Logger) for _, svc := range s.Services { diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e66d9290c3d..a065bc72e0d 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -66,6 +66,7 @@ # Whether queries should be logged before execution. Very useful for troubleshooting, but will # log any sensitive data contained within a query. # query-log-enabled = true + # query-log-path = "" # It is possible to collect statistics of points written per-measurement and/or per-login. # These can be accessed via the monitoring subsystem. diff --git a/go.mod b/go.mod index 29b01bde70f..afd5365f7ce 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/davecgh/go-spew v1.1.1 github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 + github.com/fsnotify/fsnotify v1.4.7 github.com/go-chi/chi v4.1.0+incompatible github.com/golang-jwt/jwt v3.2.1+incompatible github.com/golang/mock v1.5.0 diff --git a/go.sum b/go.sum index 9bb9846a7e5..c3bf60eac3c 100644 --- a/go.sum +++ b/go.sum @@ -311,6 +311,7 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro= github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8= diff --git a/logger/logger.go b/logger/logger.go index 74b0b40b095..b4859f4b157 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -40,7 +40,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { } } - encoder, err := newEncoder(format) + encoder, err := NewEncoder(format) if err != nil { return nil, err } @@ -51,8 +51,8 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { ), zap.Fields(zap.String("log_id", nextID()))), nil } -func newEncoder(format string) (zapcore.Encoder, error) { - config := newEncoderConfig() +func NewEncoder(format string) (zapcore.Encoder, error) { + config := NewEncoderConfig() switch format { case "json": return zapcore.NewJSONEncoder(config), nil @@ -65,7 +65,7 @@ func newEncoder(format string) (zapcore.Encoder, error) { } } -func newEncoderConfig() zapcore.EncoderConfig { +func NewEncoderConfig() zapcore.EncoderConfig { config := zap.NewProductionEncoderConfig() config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) { encoder.AppendString(ts.UTC().Format(TimeFormat)) diff --git a/query/executor.go b/query/executor.go index 0afd6922b03..1ec7c80b82f 100644 --- a/query/executor.go +++ b/query/executor.go @@ -11,9 +11,11 @@ import ( "sync/atomic" "time" + "github.com/fsnotify/fsnotify" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxql" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -228,6 +230,19 @@ type StatementNormalizer interface { NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error } +// WatcherInterface is used for any file watch functionality using fsnotify +type WatcherInterface interface { + Event() <-chan fsnotify.Event + Error() <-chan error + FileChangeCapture() error + GetLogger() *zap.Logger + GetLogPath() string + // Methods from fsnotify + Close() error + Add(name string) error + Remove(name string) error +} + // Executor executes every statement in an Query. type Executor struct { // Used for executing a statement in the query. @@ -242,6 +257,8 @@ type Executor struct { // expvar-based stats. stats *Statistics + + mu sync.Mutex } // NewExecutor returns a new instance of Executor. @@ -289,6 +306,96 @@ func (e *Executor) WithLogger(log *zap.Logger) { e.TaskManager.Logger = e.Logger } +func startFileLogWatcher(w WatcherInterface, e *Executor, ctx context.Context) error { + path := w.GetLogPath() + + e.mu.Lock() + if err := w.Add(path); err != nil { + return fmt.Errorf("add watch path: %w", err) + } + e.mu.Unlock() + + for { + select { + case event, ok := <-w.Event(): + if !ok { + return fmt.Errorf("watcher event channel closed") + } + + e.Logger.Debug("received file event", zap.String("event", event.Name)) + + if err := func() error { + if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename { + e.mu.Lock() + defer e.mu.Unlock() + e.Logger.Info("log file altered, creating new Watcher", + zap.String("event", event.Name), + zap.String("path", path)) + + if err := w.FileChangeCapture(); err != nil { + e.mu.Unlock() + return fmt.Errorf("handle file change: %w", err) + } + + logger := w.GetLogger() + e.Logger = logger + e.TaskManager.Logger = e.Logger + } + return nil + }(); err != nil { + return err + } + + case err, ok := <-w.Error(): + errs := make([]error, 3) + errs = append(errs, err) + + closeErr := w.Close() + if closeErr != nil { + errs = append(errs, fmt.Errorf("failed to close watcher: %w", closeErr)) + } + if !ok { + errs = append(errs, fmt.Errorf("watcher error channel closed")) + } + + return errors.Join(errs...) + case <-ctx.Done(): + closeErr := w.Close() + if closeErr != nil { + return fmt.Errorf("failed to close watcher: %w", closeErr) + } + e.Logger.Info("closing file Watcher") + return ctx.Err() + } + } +} + +func (e *Executor) WithLogWriter(w WatcherInterface, ctx context.Context) { + e.Logger.Info("starting file watcher", zap.String("path", w.GetLogPath())) + errs, ctx := errgroup.WithContext(ctx) + + e.mu.Lock() + e.Logger = w.GetLogger() + e.TaskManager.Logger = e.Logger + e.mu.Unlock() + + errs.Go(func() error { + return startFileLogWatcher(w, e, ctx) + }) + + go func() { + if err := errs.Wait(); err != nil { + e.mu.Lock() + defer e.mu.Unlock() + e.Logger.Error("file log Watcher error", zap.Error(err), zap.String("path", w.GetLogPath())) + err := w.Close() + if err != nil { + e.Logger.Error("failed to close file watcher", zap.Error(err), zap.String("path", w.GetLogPath())) + } + } + }() +} + // ExecuteQuery executes each statement within a query. func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { results := make(chan *Result) @@ -386,7 +493,9 @@ LOOP: // Log each normalized statement. if !ctx.Quiet { + e.mu.Lock() e.Logger.Info("Executing query", zap.Stringer("query", stmt)) + e.mu.Unlock() } // Send any other statements to the underlying statement executor. diff --git a/query/executor_test.go b/query/executor_test.go index 9237c7e53d9..7d5c4d213f1 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -1,9 +1,17 @@ package query_test import ( + "bytes" + "context" "errors" "fmt" + "github.com/fsnotify/fsnotify" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -595,6 +603,275 @@ func TestQueryExecutor_InvalidSource(t *testing.T) { } } +type bufferWriterSync struct { + buf *bytes.Buffer + mu sync.Mutex +} + +func (w *bufferWriterSync) Sync() error { + return nil +} + +func (w *bufferWriterSync) Write(p []byte) (int, error) { + w.mu.Lock() + out, err := w.buf.Write(p) + w.mu.Unlock() + return out, err +} + +func (w *bufferWriterSync) Read() {} + +type mockWatcher struct { + ChangeEvents atomic.Uint64 + RemovedPaths atomic.Uint64 + AddedPaths atomic.Uint64 + Wg sync.WaitGroup + MockEvents chan fsnotify.Event + MockErr chan error + path string + logBuffer *bytes.Buffer + log *zap.Logger + t *testing.T + Mu sync.Mutex +} + +func newMockWatcher(t *testing.T, path string) *mockWatcher { + encoderConfig := zap.NewProductionEncoderConfig() + + logBuffer := &bytes.Buffer{} + buf := &bufferWriterSync{ + buf: logBuffer, + } + + fileCore := zapcore.NewCore( + zapcore.NewJSONEncoder(encoderConfig), + zapcore.Lock(buf), + zapcore.InfoLevel, + ) + + logger := zap.New(fileCore) + + return &mockWatcher{ + ChangeEvents: atomic.Uint64{}, + RemovedPaths: atomic.Uint64{}, + AddedPaths: atomic.Uint64{}, + Wg: sync.WaitGroup{}, + MockEvents: make(chan fsnotify.Event), + MockErr: make(chan error), + path: path, + logBuffer: logBuffer, + log: logger, + t: t, + Mu: sync.Mutex{}, + } +} + +func (m *mockWatcher) Event() <-chan fsnotify.Event { + return m.MockEvents +} + +func (m *mockWatcher) Error() <-chan error { + return m.MockErr +} + +func (m *mockWatcher) Close() error { + if err := m.log.Sync(); err != nil { + return err + } + m.logBuffer.Reset() + m.RemovedPaths.Store(0) + m.AddedPaths.Store(0) + m.ChangeEvents.Store(0) + + return nil +} + +func (m *mockWatcher) Add(name string) error { + m.AddedPaths.Add(1) + return nil +} + +func (m *mockWatcher) Remove(name string) error { + m.RemovedPaths.Add(1) + return nil +} + +func (m *mockWatcher) GetLogger() *zap.Logger { + m.Mu.Lock() + defer m.Mu.Unlock() + return m.log +} + +func (m *mockWatcher) GetLogPath() string { + m.Mu.Lock() + defer m.Mu.Unlock() + return m.path +} + +func (m *mockWatcher) FileChangeCapture() error { + m.Mu.Lock() + defer m.Wg.Done() + defer m.Mu.Unlock() + err := m.Remove(m.path) + if err != nil { + return err + } + m.ChangeEvents.Add(1) + return nil +} + +func (m *mockWatcher) Contains(substr string) bool { + m.Mu.Lock() + defer m.Mu.Unlock() + cont := strings.Contains(m.logBuffer.String(), substr) + return cont +} + +func TestQueryExecutor_WriteQueryToLog(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + require.NoError(t, err, "parse query") + + e := NewQueryExecutor() + mockWatcher := newMockWatcher(t, "query.log") + defer mockWatcher.Close() + e.WithLogWriter(mockWatcher, context.Background()) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(1), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) + err = mockWatcher.Close() + require.NoError(t, err) +} + +func TestQueryExecutor_IgnoreWriteEvent(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + require.NoError(t, err, "parse query") + + e := NewQueryExecutor() + mockWatcher := newMockWatcher(t, "query.log") + defer mockWatcher.Close() + e.WithLogWriter(mockWatcher, context.Background()) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(1), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) + require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load()) + require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) + + mockWatcher.MockEvents <- fsnotify.Event{ + Name: "Write", + Op: fsnotify.Write, + } + // Should not register a write event as a change event + require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) +} + +func TestQueryExecutor_ChangeEvent(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + require.NoError(t, err, "parse query") + + e := NewQueryExecutor() + mockWatcher := newMockWatcher(t, "query.log") + defer mockWatcher.Close() + e.WithLogWriter(mockWatcher, context.Background()) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(1), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) + require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load()) + require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) + + mockWatcher.Wg.Add(1) + mockWatcher.MockEvents <- fsnotify.Event{ + Name: "Rename", + Op: fsnotify.Rename, + } + mockWatcher.Wg.Wait() + + q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(2), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM mem`)) + + require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load()) + require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load()) +} + +func TestQueryExecutor_RemoveEvent(t *testing.T) { + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) + require.NoError(t, err, "parse query") + + e := NewQueryExecutor() + mockWatcher := newMockWatcher(t, "query.log") + defer mockWatcher.Close() + e.WithLogWriter(mockWatcher, context.Background()) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(1), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) + require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load()) + require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) + + mockWatcher.Wg.Add(1) + mockWatcher.MockEvents <- fsnotify.Event{ + Name: "Remove", + Op: fsnotify.Remove, + } + mockWatcher.Wg.Wait() + + q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`) + + e.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + require.Equal(t, uint64(2), ctx.QueryID, "query ID") + return nil + }, + } + + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + + require.True(t, mockWatcher.Contains(`SELECT count(value) FROM mem`)) + + require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load()) + require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load()) +} + func discardOutput(results <-chan *query.Result) { for range results { // Read all results and discard. diff --git a/query/file_log_watcher.go b/query/file_log_watcher.go new file mode 100644 index 00000000000..ee2fde2be73 --- /dev/null +++ b/query/file_log_watcher.go @@ -0,0 +1,176 @@ +package query + +import ( + "errors" + "os" + "sync" + + "github.com/fsnotify/fsnotify" + l "github.com/influxdata/influxdb/logger" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type FileLogWatcher struct { + path string + currFile *os.File + logger *zap.Logger + formatterConfig string + executor *Executor + mu sync.Mutex + fsWatcher *fsnotify.Watcher +} + +func (f *FileLogWatcher) Event() <-chan fsnotify.Event { + return f.fsWatcher.Events +} + +func (f *FileLogWatcher) Error() <-chan error { + return f.fsWatcher.Errors +} + +func (f *FileLogWatcher) Add(name string) error { + f.mu.Lock() + defer f.mu.Unlock() + return f.fsWatcher.Add(name) +} + +func (f *FileLogWatcher) Remove(name string) error { + f.mu.Lock() + defer f.mu.Unlock() + return f.fsWatcher.Remove(name) +} + +func NewFileLogWatcher(e *Executor, path string, logger *zap.Logger, format string) *FileLogWatcher { + // FileWatcher is not meant to be used with interactive shell sessions so + // console as a logging format will not be supported. + if format == "console" { + logger.Error("unknown logging format", zap.String("format", format), zap.String("path", path)) + return nil + } + + if format == "" || format == "auto" { + format = "logfmt" + } + + logFile, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + logger.Error("failed to open log file", zap.Error(err), zap.String("path", path)) + return nil + } + + existingCore := logger.Core() + encoder, err := l.NewEncoder(format) + if err != nil { + logger.Error("failed to create log encoder", zap.Error(err), zap.String("format", format), zap.String("path", path)) + return nil + } + + fileCore := zapcore.NewCore( + encoder, + zapcore.Lock(logFile), + zapcore.InfoLevel, + ) + + newCore := zapcore.NewTee(existingCore, fileCore) + logger = zap.New(newCore) + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Error("failed to create fsnotify watcher", zap.Error(err), zap.String("path", path)) + return nil + } + + return &FileLogWatcher{ + logger: logger, + path: path, + currFile: logFile, + executor: e, + formatterConfig: format, + mu: sync.Mutex{}, + fsWatcher: fsWatcher, + } +} + +func (f *FileLogWatcher) GetLogger() *zap.Logger { + f.mu.Lock() + defer f.mu.Unlock() + return f.logger +} + +func (f *FileLogWatcher) GetLogPath() string { + f.mu.Lock() + defer f.mu.Unlock() + return f.path +} + +func (f *FileLogWatcher) FileChangeCapture() error { + f.mu.Lock() + defer f.mu.Unlock() + + if err := f.currFile.Sync(); err != nil { + f.logger.Warn("failed to sync log file", zap.Error(err), zap.String("path", f.path)) + } + if err := f.currFile.Close(); err != nil { + return err + } + + logFile, err := os.OpenFile(f.path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + f.logger.Error("failed to open log file", zap.Error(err), zap.String("path", f.path)) + return err + } + f.currFile = logFile + existingCore := f.logger.Core() + encoder, err := l.NewEncoder(f.formatterConfig) + if err != nil { + errs := make([]error, 2) + f.logger.Error("failed to create log encoder", zap.Error(err), zap.String("format", f.formatterConfig)) + logCloseErr := logFile.Close() + + errs = append(errs, logCloseErr) + errs = append(errs, err) + + return errors.Join(errs...) + } + + fileCore := zapcore.NewCore( + encoder, + zapcore.Lock(logFile), + zapcore.InfoLevel, + ) + + newCore := zapcore.NewTee(existingCore, fileCore) + f.logger = zap.New(newCore) + + if err := f.fsWatcher.Add(f.path); err != nil { + errs := make([]error, 2) + f.logger.Error("failed to add file to watcher", zap.Error(err), zap.String("format", f.formatterConfig)) + logCloseErr := logFile.Close() + + errs = append(errs, logCloseErr) + errs = append(errs, err) + + return errors.Join(errs...) + } + + return nil +} + +func (f *FileLogWatcher) Close() error { + errs := make([]error, 3) + if err := f.currFile.Sync(); err != nil { + f.logger.Error("failed to sync log file", zap.Error(err), zap.String("path", f.path)) + errs = append(errs, err) + } + if err := f.currFile.Close(); err != nil { + f.logger.Error("failed to close log file", zap.Error(err), zap.String("path", f.path)) + errs = append(errs, err) + } + if err := f.fsWatcher.Close(); err != nil { + f.logger.Error("failed to close fsnotify watcher", zap.Error(err), zap.String("path", f.path)) + errs = append(errs, err) + } + + return errors.Join(errs...) +} diff --git a/tsdb/config.go b/tsdb/config.go index feb4927783a..2588b13fdc3 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -105,6 +105,9 @@ type Config struct { // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` + // Query logging directed to a log file + QueryLogPath string `toml:"query-log-path"` + // Compaction options for tsm1 (descriptions above with defaults) CacheMaxMemorySize toml.Size `toml:"cache-max-memory-size"` CacheSnapshotMemorySize toml.Size `toml:"cache-snapshot-memory-size"` diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 6da71e136cf..c4be1156766 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -17,7 +17,6 @@ import ( "errors" "fmt" "io" - "io/fs" "math" "os" "path/filepath" @@ -66,10 +65,6 @@ func (e errCompactionInProgress) Error() string { return "compaction in progress" } -func (e errCompactionInProgress) Unwrap() error { - return e.err -} - type errCompactionAborted struct { err error } @@ -1056,7 +1051,6 @@ func (c *Compactor) removeTmpFiles(files []string) error { func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) { // These are the new TSM files written var files []string - var eInProgress errCompactionInProgress for { sequence++ @@ -1066,15 +1060,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName)) // Write as much as possible to this file - rollToNext, err := c.write(fileName, iter, throttle, logger) + err := c.write(fileName, iter, throttle, logger) - if rollToNext { - // We've hit the max file limit and there is more to write. Create a new file - // and continue. + // We've hit the max file limit and there is more to write. Create a new file + // and continue. + if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded { files = append(files, fileName) logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName)) continue - } else if errors.Is(err, ErrNoValues) { + } else if err == ErrNoValues { logger.Debug("Dropping empty file", zap.String("output_file", fileName)) // If the file only contained tombstoned entries, then it would be a 0 length // file that we can drop. @@ -1082,14 +1076,9 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return nil, err } break - } else if errors.As(err, &eInProgress) { - if !errors.Is(eInProgress.err, fs.ErrExist) { - logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err)) - } else { - // Don't clean up the file as another compaction is using it. This should not happen as the - // planner keeps track of which files are assigned to compaction plans now. - logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName)) - } + } else if _, ok := err.(errCompactionInProgress); ok { + // Don't clean up the file as another compaction is using it. This should not happen as the + // planner keeps track of which files are assigned to compaction plans now. return nil, err } else if err != nil { // We hit an error and didn't finish the compaction. Abort. @@ -1111,10 +1100,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return files, nil } -func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { - return false, errCompactionInProgress{err: err} + return errCompactionInProgress{err: err} } // syncingWriter ensures that whatever we wrap the above file descriptor in @@ -1139,31 +1128,33 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // in memory. if iter.EstimatedIndexSize() > 64*1024*1024 { w, err = NewTSMWriterWithDiskBuffer(limitWriter) + if err != nil { + return err + } } else { w, err = NewTSMWriter(limitWriter) + if err != nil { + return err + } } - if err != nil { - // Close the file and return if we can't create the TSMWriter - return false, errors.Join(err, fd.Close()) - } - defer func() { - var eInProgress errCompactionInProgress - errs := make([]error, 0, 3) - errs = append(errs, err) + defer func() { closeErr := w.Close() - errs = append(errs, closeErr) + if err == nil { + err = closeErr + } - // Check for conditions where we should not remove the file - inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist) - if (closeErr == nil) && (inProgress || rollToNext) { - // do not join errors, there is only the one. + // Check for errors where we should not remove the file + _, inProgress := err.(errCompactionInProgress) + maxBlocks := err == ErrMaxBlocksExceeded + maxFileSize := err == errMaxFileExceeded + if inProgress || maxBlocks || maxFileSize { return - } else if err != nil || closeErr != nil { - // Remove the file, we have had a problem - errs = append(errs, w.Remove()) } - err = errors.Join(errs...) + + if err != nil { + _ = w.Remove() + } }() lastLogSize := w.Size() @@ -1173,38 +1164,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * c.mu.RUnlock() if !enabled { - return false, errCompactionAborted{} + return errCompactionAborted{} } // Each call to read returns the next sorted key (or the prior one if there are // more values to write). The size of values will be less than or equal to our // chunk size (1000) key, minTime, maxTime, block, err := iter.Read() if err != nil { - return false, err + return err } if minTime > maxTime { - return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) + return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) } // Write the key and value - if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) { + if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded { if err := w.WriteIndex(); err != nil { - return false, err + return err } - return true, err + return err } else if err != nil { - return false, err + return err } - // If we're over maxTSMFileSize, close out the file + // If we have a max file size configured and we're over it, close out the file // and return the error. if w.Size() > maxTSMFileSize { if err := w.WriteIndex(); err != nil { - return false, err + return err } - return true, errMaxFileExceeded + return errMaxFileExceeded } else if (w.Size() - lastLogSize) > logEvery { logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size())) lastLogSize = w.Size() @@ -1213,15 +1204,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // Were there any errors encountered during iteration? if err := iter.Err(); err != nil { - return false, err + return err } // We're all done. Close out the file. if err := w.WriteIndex(); err != nil { - return false, err + return err } logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size())) - return false, nil + return nil } func (c *Compactor) add(files []string) bool { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ccfa6d4f469..c90beb6b1cf 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1,9 +1,7 @@ package tsm1_test import ( - "errors" "fmt" - "io/fs" "math" "os" "path/filepath" @@ -14,7 +12,6 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" - "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -116,11 +113,11 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { } f2 := MustWriteTSM(dir, 2, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Open() files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop()) @@ -173,11 +170,11 @@ func TestCompactor_CompactFull(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -283,11 +280,11 @@ func TestCompactor_DecodeError(t *testing.T) { f.WriteAt([]byte("ffff"), 10) // skip over header f.Close() - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -329,11 +326,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -409,11 +406,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -623,11 +620,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -725,11 +722,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -828,11 +825,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -936,11 +933,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Size = 2 compactor.Open() @@ -1052,11 +1049,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } f2.Close() - ffs := &fakeFileStore{} - defer ffs.Close() + fs := &fakeFileStore{} + defer fs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = ffs + compactor.FileStore = fs compactor.Open() // Compact both files, should get 2 files back @@ -1089,61 +1086,6 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } } -func TestCompactor_CompactFull_InProgress(t *testing.T) { - // This test creates a lot of data and causes timeout failures for these envs - if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" { - t.Skip("Skipping in progress compaction test") - } - dir := MustTempDir() - defer os.RemoveAll(dir) - - f2Name := func() string { - values := make([]tsm1.Value, 1000) - - // Write a new file with 2 blocks - f2, f2Name := MustTSMWriter(dir, 2) - defer func() { - assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name) - }() - for i := 0; i < 2; i++ { - values = values[:0] - for j := 0; j < 1000; j++ { - values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1))) - } - assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name) - } - assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name) - return f2Name - }() - ffs := &fakeFileStore{} - defer ffs.Close() - compactor := tsm1.NewCompactor() - compactor.Dir = dir - compactor.FileStore = ffs - compactor.Open() - - expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name) - assert.NoError(t, err, "unexpected error parsing file name %s", f2Name) - expSeq = expSeq + 1 - - fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension) - - // Create a temp file to simulate an in progress compaction - f, err := os.Create(fileName) - assert.NoError(t, err, "creating in-progress compaction file %s", fileName) - defer func() { - assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName) - }() - _, err = compactor.CompactFull([]string{f2Name}, zap.NewNop()) - assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name) - e := errors.Unwrap(err) - assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress") - assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e) - pathErr := &os.PathError{} - assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e) - assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr) -} - func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) { files := []string{} for _, r := range readers { @@ -2587,14 +2529,14 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - ffs := &fakeFileStore{ + fs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, blockCount: 1000, } - cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) + cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { @@ -2653,9 +2595,9 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.Release(plan) - cp.FileStore = ffs + cp.FileStore = fs // ensure that it will plan if last modified has changed - ffs.lastModified = time.Now() + fs.lastModified = time.Now() cGroups, pLen := cp.Plan(time.Now()) if exp, got := 4, len(cGroups[0]); got != exp { @@ -2751,7 +2693,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - ffs := &fakeFileStore{ + fs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, @@ -2759,7 +2701,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - ffs, + fs, time.Nanosecond, ) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 70f0fd77e22..4784dc6d558 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -66,7 +66,6 @@ import ( "bufio" "bytes" "encoding/binary" - "errors" "fmt" "hash/crc32" "io" @@ -512,15 +511,19 @@ func (d *directIndex) Size() uint32 { } func (d *directIndex) Close() error { - errs := make([]error, 0, 3) // Flush anything remaining in the index - errs = append(errs, d.w.Flush()) - if d.fd != nil { - // Close and remove the temporary index file - errs = append(errs, d.fd.Close()) - errs = append(errs, os.Remove(d.fd.Name())) + if err := d.w.Flush(); err != nil { + return err + } + + if d.fd == nil { + return nil } - return errors.Join(errs...) + + if err := d.fd.Close(); err != nil { + return err + } + return os.Remove(d.fd.Name()) } // Remove removes the index from any tempory storage @@ -529,14 +532,11 @@ func (d *directIndex) Remove() error { return nil } - errs := make([]error, 0, 2) - // Close the file handle to prevent leaking. - // We don't let an error stop the removal. - if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) { - errs = append(errs, err) - } - errs = append(errs, os.Remove(d.fd.Name())) - return errors.Join(errs...) + // Close the file handle to prevent leaking. We ignore the error because + // we just want to cleanup and remove the file. + _ = d.fd.Close() + + return os.Remove(d.fd.Name()) } // tsmWriter writes keys and values in the TSM format @@ -756,19 +756,25 @@ func (t *tsmWriter) sync() error { } func (t *tsmWriter) Close() error { - errs := make([]error, 0, 3) - errs = append(errs, t.Flush()) - errs = append(errs, t.index.Close()) + if err := t.Flush(); err != nil { + return err + } + + if err := t.index.Close(); err != nil { + return err + } + if c, ok := t.wrapped.(io.Closer); ok { - errs = append(errs, c.Close()) + return c.Close() } - return errors.Join(errs...) + return nil } // Remove removes any temporary storage used by the writer. func (t *tsmWriter) Remove() error { - errs := make([]error, 0, 3) - errs = append(errs, t.index.Remove()) + if err := t.index.Remove(); err != nil { + return err + } // nameCloser is the most permissive interface we can close the wrapped // value with. @@ -777,16 +783,14 @@ func (t *tsmWriter) Remove() error { Name() string } - // If the writer is not a memory buffer, we can remove the file. if f, ok := t.wrapped.(nameCloser); ok { - // Close the file handle to prevent leaking. - if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { - errs = append(errs, err) - } - // Remove the file - errs = append(errs, os.Remove(f.Name())) + // Close the file handle to prevent leaking. We ignore the error because + // we just want to cleanup and remove the file. + _ = f.Close() + + return os.Remove(f.Name()) } - return errors.Join(errs...) + return nil } func (t *tsmWriter) Size() uint32 { From a059c07bfdc70e14f793592887dede953df00f96 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 8 Jan 2025 15:01:18 -0600 Subject: [PATCH 2/3] feat: Adds a mutex protection for file log watcher Close() --- query/file_log_watcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/query/file_log_watcher.go b/query/file_log_watcher.go index ee2fde2be73..882ab2c3e7b 100644 --- a/query/file_log_watcher.go +++ b/query/file_log_watcher.go @@ -158,6 +158,8 @@ func (f *FileLogWatcher) FileChangeCapture() error { } func (f *FileLogWatcher) Close() error { + f.mu.Lock() + defer f.mu.Unlock() errs := make([]error, 3) if err := f.currFile.Sync(); err != nil { f.logger.Error("failed to sync log file", zap.Error(err), zap.String("path", f.path)) From 28b580777e93aeea7b144fec1b6ac00d7554e860 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 8 Jan 2025 15:07:49 -0600 Subject: [PATCH 3/3] feat: remove mutex that was not needed --- query/executor.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/query/executor.go b/query/executor.go index 1ec7c80b82f..29bd86f77b0 100644 --- a/query/executor.go +++ b/query/executor.go @@ -309,11 +309,9 @@ func (e *Executor) WithLogger(log *zap.Logger) { func startFileLogWatcher(w WatcherInterface, e *Executor, ctx context.Context) error { path := w.GetLogPath() - e.mu.Lock() if err := w.Add(path); err != nil { return fmt.Errorf("add watch path: %w", err) } - e.mu.Unlock() for { select {