-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add query-log-path
configuration
#25710
base: master-1.x
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -487,6 +487,16 @@ func (s *Server) Open() error { | |
// so it only logs as is appropriate. | ||
s.QueryExecutor.TaskManager.Logger = s.Logger | ||
} | ||
if s.config.Data.QueryLogPath != "" { | ||
path := s.config.Data.QueryLogPath | ||
flw := query.NewFileLogWatcher(s.QueryExecutor, path, s.Logger, s.config.Logging.Format) | ||
if flw != nil { | ||
s.QueryExecutor.WithLogWriter(flw, context.Background()) | ||
Comment on lines
+492
to
+494
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @devan I had an idea last night on how to make this way more flexible and reusable. First, change Second, change |
||
} else { | ||
s.Logger.Error("error creating log writer", zap.String("path", path)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, if |
||
} | ||
} | ||
|
||
s.PointsWriter.WithLogger(s.Logger) | ||
s.Subscriber.WithLogger(s.Logger) | ||
for _, svc := range s.Services { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { | |
} | ||
} | ||
|
||
encoder, err := newEncoder(format) | ||
encoder, err := NewEncoder(format) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -51,8 +51,8 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { | |
), zap.Fields(zap.String("log_id", nextID()))), nil | ||
} | ||
|
||
func newEncoder(format string) (zapcore.Encoder, error) { | ||
config := newEncoderConfig() | ||
func NewEncoder(format string) (zapcore.Encoder, error) { | ||
config := NewEncoderConfig() | ||
switch format { | ||
case "json": | ||
return zapcore.NewJSONEncoder(config), nil | ||
|
@@ -65,7 +65,7 @@ func newEncoder(format string) (zapcore.Encoder, error) { | |
} | ||
} | ||
|
||
func newEncoderConfig() zapcore.EncoderConfig { | ||
func NewEncoderConfig() zapcore.EncoderConfig { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't find any uses of |
||
config := zap.NewProductionEncoderConfig() | ||
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) { | ||
encoder.AppendString(ts.UTC().Format(TimeFormat)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,9 +11,11 @@ import ( | |
"sync/atomic" | ||
"time" | ||
|
||
"github.com/fsnotify/fsnotify" | ||
"github.com/influxdata/influxdb/models" | ||
"github.com/influxdata/influxql" | ||
"go.uber.org/zap" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
var ( | ||
|
@@ -228,6 +230,19 @@ type StatementNormalizer interface { | |
NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error | ||
} | ||
|
||
// WatcherInterface is used for any file watch functionality using fsnotify | ||
type WatcherInterface interface { | ||
Event() <-chan fsnotify.Event | ||
Error() <-chan error | ||
FileChangeCapture() error | ||
GetLogger() *zap.Logger | ||
GetLogPath() string | ||
// Methods from fsnotify | ||
Close() error | ||
Add(name string) error | ||
Remove(name string) error | ||
} | ||
|
||
// Executor executes every statement in an Query. | ||
type Executor struct { | ||
// Used for executing a statement in the query. | ||
|
@@ -242,6 +257,8 @@ type Executor struct { | |
|
||
// expvar-based stats. | ||
stats *Statistics | ||
|
||
mu sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would not put My preference is to put |
||
} | ||
|
||
// NewExecutor returns a new instance of Executor. | ||
|
@@ -289,6 +306,94 @@ func (e *Executor) WithLogger(log *zap.Logger) { | |
e.TaskManager.Logger = e.Logger | ||
} | ||
|
||
func startFileLogWatcher(w WatcherInterface, e *Executor, ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
path := w.GetLogPath() | ||
|
||
if err := w.Add(path); err != nil { | ||
return fmt.Errorf("add watch path: %w", err) | ||
} | ||
|
||
for { | ||
select { | ||
case event, ok := <-w.Event(): | ||
if !ok { | ||
return fmt.Errorf("watcher event channel closed") | ||
} | ||
|
||
e.Logger.Debug("received file event", zap.String("event", event.Name)) | ||
|
||
if err := func() error { | ||
if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
e.Logger.Info("log file altered, creating new Watcher", | ||
zap.String("event", event.Name), | ||
zap.String("path", path)) | ||
|
||
if err := w.FileChangeCapture(); err != nil { | ||
e.mu.Unlock() | ||
return fmt.Errorf("handle file change: %w", err) | ||
} | ||
|
||
logger := w.GetLogger() | ||
e.Logger = logger | ||
e.TaskManager.Logger = e.Logger | ||
} | ||
return nil | ||
}(); err != nil { | ||
return err | ||
} | ||
|
||
case err, ok := <-w.Error(): | ||
errs := make([]error, 3) | ||
errs = append(errs, err) | ||
|
||
closeErr := w.Close() | ||
if closeErr != nil { | ||
errs = append(errs, fmt.Errorf("failed to close watcher: %w", closeErr)) | ||
} | ||
if !ok { | ||
errs = append(errs, fmt.Errorf("watcher error channel closed")) | ||
} | ||
|
||
return errors.Join(errs...) | ||
case <-ctx.Done(): | ||
closeErr := w.Close() | ||
if closeErr != nil { | ||
return fmt.Errorf("failed to close watcher: %w", closeErr) | ||
} | ||
e.Logger.Info("closing file Watcher") | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
func (e *Executor) WithLogWriter(w WatcherInterface, ctx context.Context) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
e.Logger.Info("starting file watcher", zap.String("path", w.GetLogPath())) | ||
errs, ctx := errgroup.WithContext(ctx) | ||
|
||
e.mu.Lock() | ||
e.Logger = w.GetLogger() | ||
e.TaskManager.Logger = e.Logger | ||
e.mu.Unlock() | ||
|
||
errs.Go(func() error { | ||
return startFileLogWatcher(w, e, ctx) | ||
}) | ||
|
||
go func() { | ||
if err := errs.Wait(); err != nil { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
e.Logger.Error("file log Watcher error", zap.Error(err), zap.String("path", w.GetLogPath())) | ||
err := w.Close() | ||
if err != nil { | ||
e.Logger.Error("failed to close file watcher", zap.Error(err), zap.String("path", w.GetLogPath())) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// ExecuteQuery executes each statement within a query. | ||
func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { | ||
results := make(chan *Result) | ||
|
@@ -386,7 +491,9 @@ LOOP: | |
|
||
// Log each normalized statement. | ||
if !ctx.Quiet { | ||
e.mu.Lock() | ||
e.Logger.Info("Executing query", zap.Stringer("query", stmt)) | ||
e.mu.Unlock() | ||
} | ||
|
||
// Send any other statements to the underlying statement executor. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either
query.NewFileLogWatcher
is misnamed, orQueryExecutor.WithLogWriter
is misnamed. It looks like the wrong thing is getting passed around.