Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add query-log-path configuration #25710

Open
wants to merge 3 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,16 @@ func (s *Server) Open() error {
// so it only logs as is appropriate.
s.QueryExecutor.TaskManager.Logger = s.Logger
}
if s.config.Data.QueryLogPath != "" {
path := s.config.Data.QueryLogPath
flw := query.NewFileLogWatcher(s.QueryExecutor, path, s.Logger, s.config.Logging.Format)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either query.NewFileLogWatcher is misnamed, or QueryExecutor.WithLogWriter is misnamed. It looks like the wrong thing is getting passed around.

if flw != nil {
s.QueryExecutor.WithLogWriter(flw, context.Background())
Comment on lines +492 to +494
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devan I had an idea last night on how to make this way more flexible and reusable.

First, change QueryExecutor.WithLogWriter(*FileLogWatcher) to QueryExecutor.WithQueryLogger(*zap.Logger). By taking a *zap.Logger, we have way more flexibility in how queries can be logged. It will also decouple log rotation logic from the QueryExecutor if the code ends up being split between OSS and Enterprise.

Second, change query.NewFileLogWatcher(...) *FileLogWatcher to query.NewRotatableLogger(...) (*zap.Logger, error) (or logger.NewRotatableLogger...). This logger can then be used anywhere we need a log file that can be rotated by an external log rotator without making changes to the client code using the logger. Note: The goroutine to watch the signal channel will keep the references to the required data so it won't be garbage collected.

} else {
s.Logger.Error("error creating log writer", zap.String("path", path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if NewFileLogWatcher returned an error, you could log the error right here instead of this generic log message.

}
}

s.PointsWriter.WithLogger(s.Logger)
s.Subscriber.WithLogger(s.Logger)
for _, svc := range s.Services {
Expand Down
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
# Whether queries should be logged before execution. Very useful for troubleshooting, but will
# log any sensitive data contained within a query.
# query-log-enabled = true
# query-log-path = ""

# It is possible to collect statistics of points written per-measurement and/or per-login.
# These can be accessed via the monitoring subsystem.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/fsnotify/fsnotify v1.4.7
github.com/go-chi/chi v4.1.0+incompatible
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/mock v1.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro=
github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8=
Expand Down
8 changes: 4 additions & 4 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
}
}

encoder, err := newEncoder(format)
encoder, err := NewEncoder(format)
if err != nil {
return nil, err
}
Expand All @@ -51,8 +51,8 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
), zap.Fields(zap.String("log_id", nextID()))), nil
}

func newEncoder(format string) (zapcore.Encoder, error) {
config := newEncoderConfig()
func NewEncoder(format string) (zapcore.Encoder, error) {
config := NewEncoderConfig()
switch format {
case "json":
return zapcore.NewJSONEncoder(config), nil
Expand All @@ -65,7 +65,7 @@ func newEncoder(format string) (zapcore.Encoder, error) {
}
}

func newEncoderConfig() zapcore.EncoderConfig {
func NewEncoderConfig() zapcore.EncoderConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find any uses of NewEncodingConfig outside of the logger package. Is there a reason to change it to public?

config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(TimeFormat))
Expand Down
107 changes: 107 additions & 0 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -228,6 +230,19 @@ type StatementNormalizer interface {
NormalizeStatement(stmt influxql.Statement, database, retentionPolicy string) error
}

// WatcherInterface is used for any file watch functionality using fsnotify
type WatcherInterface interface {
Event() <-chan fsnotify.Event
Error() <-chan error
FileChangeCapture() error
GetLogger() *zap.Logger
GetLogPath() string
// Methods from fsnotify
Close() error
Add(name string) error
Remove(name string) error
}

// Executor executes every statement in an Query.
type Executor struct {
// Used for executing a statement in the query.
Expand All @@ -242,6 +257,8 @@ type Executor struct {

// expvar-based stats.
stats *Statistics

mu sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not put mu at the bottom of the struct. A developer going into this code might miss mu and not lock appropriately.

My preference is to put mu above all the members that it will protect, along with a comment that mu protects all following members. Members that don't require protection can go above mu.

}

// NewExecutor returns a new instance of Executor.
Expand Down Expand Up @@ -289,6 +306,94 @@ func (e *Executor) WithLogger(log *zap.Logger) {
e.TaskManager.Logger = e.Logger
}

func startFileLogWatcher(w WatcherInterface, e *Executor, ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startFileLogWatcher made me think this was going to spawn a goroutine, but it doesn't. It's the body of a goroutine. Maybe either doFileLogWatch or simply fileLogWatch?

path := w.GetLogPath()

if err := w.Add(path); err != nil {
return fmt.Errorf("add watch path: %w", err)
}

for {
select {
case event, ok := <-w.Event():
if !ok {
return fmt.Errorf("watcher event channel closed")
}

e.Logger.Debug("received file event", zap.String("event", event.Name))

if err := func() error {
if event.Op == fsnotify.Remove || event.Op == fsnotify.Rename {
e.mu.Lock()
defer e.mu.Unlock()
e.Logger.Info("log file altered, creating new Watcher",
zap.String("event", event.Name),
zap.String("path", path))

if err := w.FileChangeCapture(); err != nil {
e.mu.Unlock()
return fmt.Errorf("handle file change: %w", err)
}

logger := w.GetLogger()
e.Logger = logger
e.TaskManager.Logger = e.Logger
}
return nil
}(); err != nil {
return err
}

case err, ok := <-w.Error():
errs := make([]error, 3)
errs = append(errs, err)

closeErr := w.Close()
if closeErr != nil {
errs = append(errs, fmt.Errorf("failed to close watcher: %w", closeErr))
}
if !ok {
errs = append(errs, fmt.Errorf("watcher error channel closed"))
}

return errors.Join(errs...)
case <-ctx.Done():
closeErr := w.Close()
if closeErr != nil {
return fmt.Errorf("failed to close watcher: %w", closeErr)
}
e.Logger.Info("closing file Watcher")
return ctx.Err()
}
}
}

func (e *Executor) WithLogWriter(w WatcherInterface, ctx context.Context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx should be the first parameter. From the docs:

The Context should be the first parameter, typically named ctx.

e.Logger.Info("starting file watcher", zap.String("path", w.GetLogPath()))
errs, ctx := errgroup.WithContext(ctx)

e.mu.Lock()
e.Logger = w.GetLogger()
e.TaskManager.Logger = e.Logger
e.mu.Unlock()

errs.Go(func() error {
return startFileLogWatcher(w, e, ctx)
})

go func() {
if err := errs.Wait(); err != nil {
e.mu.Lock()
defer e.mu.Unlock()
e.Logger.Error("file log Watcher error", zap.Error(err), zap.String("path", w.GetLogPath()))
err := w.Close()
if err != nil {
e.Logger.Error("failed to close file watcher", zap.Error(err), zap.String("path", w.GetLogPath()))
}
}
}()
}

// ExecuteQuery executes each statement within a query.
func (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result {
results := make(chan *Result)
Expand Down Expand Up @@ -386,7 +491,9 @@ LOOP:

// Log each normalized statement.
if !ctx.Quiet {
e.mu.Lock()
e.Logger.Info("Executing query", zap.Stringer("query", stmt))
e.mu.Unlock()
}

// Send any other statements to the underlying statement executor.
Expand Down
Loading
Loading