diff --git a/.golangci.yml b/.golangci.yml index d830af46b..4ff358815 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -764,7 +764,7 @@ linters-settings: - github.com/jmoiron/sqlx sloglint: # Enforce not mixing key-value pairs and attributes. Default: true - no-mixed-args: true + no-mixed-args: false # Enforce using key-value pairs only (overrides no-mixed-args, incompatible with attr-only). Default: false kv-only: false # Enforce using attributes only (overrides no-mixed-args, incompatible with kv-only). Default: false diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 5a98a8919..a2171756b 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -56,6 +56,8 @@ type ( } ) +var logOrigin = slog.String("log_origin", "grpc.go") + var ( serviceConfig = `{ "healthCheckConfig": { @@ -83,7 +85,7 @@ func NewGrpcConnection(ctx context.Context, agentConfig *config.Config) (*GrpcCo fmt.Sprint(agentConfig.Command.Server.Port), ) - slog.InfoContext(ctx, "Dialing grpc server", "server_addr", serverAddr) + slog.InfoContext(ctx, "Dialing grpc server", "server_addr", serverAddr, logOrigin) info := host.NewInfo() resourceID := info.ResourceID(ctx) @@ -120,7 +122,7 @@ func (gc *GrpcConnection) Close(ctx context.Context) error { defer gc.mutex.Unlock() if gc.conn != nil { - slog.InfoContext(ctx, "Closing grpc connection") + slog.InfoContext(ctx, "Closing grpc connection", logOrigin) err := gc.conn.Close() gc.conn = nil if err != nil { @@ -166,14 +168,14 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp protoValidatorStreamClientInterceptor, err := ProtoValidatorStreamClientInterceptor() if err != nil { - slog.Error("Unable to add proto validation stream interceptor", "error", err) + slog.Error("Unable to add proto validation stream interceptor", "error", err, logOrigin) } else { streamClientInterceptors = append(streamClientInterceptors, protoValidatorStreamClientInterceptor) } protoValidatorUnaryClientInterceptor, err := ProtoValidatorUnaryClientInterceptor() if err != nil { - slog.Error("Unable to add proto validation unary interceptor", "error", err) + slog.Error("Unable to add proto validation unary interceptor", "error", err, logOrigin) } else { unaryClientInterceptors = append(unaryClientInterceptors, protoValidatorUnaryClientInterceptor) } @@ -223,15 +225,18 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to add transport credentials to gRPC dial options, adding "+ - "default transport credentials", "error", err) + slog.Error( + "Unable to add transport credentials to gRPC dial options, adding default transport credentials", + "error", err, + logOrigin, + ) opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) return opts, true } - slog.Debug("Adding transport credentials to gRPC dial options") + slog.Debug("Adding transport credentials to gRPC dial options", logOrigin) opts = append(opts, grpc.WithTransportCredentials(transportCredentials), ) @@ -243,16 +248,16 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] token := agentConfig.Command.Auth.Token if agentConfig.Command.Auth.TokenPath != "" { - slog.Debug("Reading token from file", "path", agentConfig.Command.Auth.TokenPath) + slog.Debug("Reading token from file", "path", agentConfig.Command.Auth.TokenPath, logOrigin) tk, err := file.ReadFromFile(agentConfig.Command.Auth.TokenPath) if err == nil { token = tk } else { - slog.Error("Unable to add token to gRPC dial options", "error", err) + slog.Error("Unable to add token to gRPC dial options", "error", err, logOrigin) } } - slog.Debug("Adding RPC credentials") + slog.Debug("Adding RPC credentials", logOrigin) opts = append(opts, grpc.WithPerRPCCredentials( &PerRPCCredentials{ @@ -365,7 +370,7 @@ func getTransportCredentials(agentConfig *config.Config) (credentials.TransportC } if agentConfig.Command.TLS.SkipVerify { - slog.Warn("Verification of the server's certificate chain and host name is disabled") + slog.Warn("Verification of the server's certificate chain and host name is disabled", logOrigin) } tlsConfig := &tls.Config{ @@ -385,7 +390,7 @@ func getTransportCredentials(agentConfig *config.Config) (credentials.TransportC err = appendRootCAs(tlsConfig, agentConfig.Command.TLS.Ca) if err != nil { - slog.Debug("Unable to append root CA", "error", err) + slog.Debug("Unable to append root CA", "error", err, logOrigin) } return credentials.NewTLS(tlsConfig), nil diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 9a6d7b064..7ce8a93f4 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -24,6 +24,8 @@ const ( CorrelationIDKey = "correlation_id" ) +var logOrigin = slog.String("log_origin", "logger.go") + var ( logLevels = map[string]slog.Level{ "debug": slog.LevelDebug, @@ -73,7 +75,11 @@ func getLogWriter(logFile string) io.Writer { if logFile != "" { fileInfo, err := os.Stat(logPath) if err != nil { - slog.Error("Error reading log directory, proceeding to log only to stdout/stderr", "error", err) + slog.Error( + "Error reading log directory, proceeding to log only to stdout/stderr", + "error", err, + logOrigin, + ) return os.Stderr } @@ -84,7 +90,11 @@ func getLogWriter(logFile string) io.Writer { logFileHandle, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, filePermission) if err != nil { - slog.Error("Failed to open log file, proceeding to log only to stdout/stderr", "error", err) + slog.Error( + "Failed to open log file, proceeding to log only to stdout/stderr", + "error", err, + logOrigin, + ) return os.Stderr } @@ -135,7 +145,9 @@ func GetCorrelationIDAttr(ctx context.Context) slog.Attr { slog.Debug( "Correlation ID not found in context, generating new correlation ID", "correlation_id", - correlationID) + correlationID, + logOrigin, + ) return GenerateCorrelationID() } diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index 6a53a9c85..a5b073eb5 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -22,6 +22,8 @@ import ( "github.com/nginx/agent/v3/internal/watcher" ) +var logOrigin = slog.String("log_origin", "plugin_manager.go") + func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin { plugins := make([]bus.Plugin, 0) @@ -44,7 +46,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo if agentConfig.IsGrpcClientConfigured() { grpcConnection, err := grpc.NewGrpcConnection(ctx, agentConfig) if err != nil { - slog.WarnContext(ctx, "Failed to create gRPC connection", "error", err) + slog.WarnContext(ctx, "Failed to create gRPC connection", "error", err, logOrigin) } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection) plugins = append(plugins, commandPlugin) @@ -53,7 +55,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } } else { slog.InfoContext(ctx, "Agent is not connected to a management plane. "+ - "Configure a command server to establish a connection with a management plane.") + "Configure a command server to establish a connection with a management plane.", logOrigin) } return plugins @@ -71,11 +73,14 @@ func addCollectorPlugin(ctx context.Context, agentConfig *config.Config, plugins if err == nil { plugins = append(plugins, oTelCollector) } else { - slog.ErrorContext(ctx, "Failed to initialize collector plugin", "error", err) + slog.ErrorContext(ctx, "Failed to initialize collector plugin", "error", err, logOrigin) } } else { - slog.InfoContext(ctx, "Agent OTel collector isn't started. "+ - "Configure a collector to begin collecting metrics.") + slog.InfoContext( + ctx, + "Agent OTel collector isn't started. Configure a collector to begin collecting metrics.", + logOrigin, + ) } return plugins diff --git a/internal/resource/nginx_instance_operator.go b/internal/resource/nginx_instance_operator.go index 70fb3af0e..47c72ef61 100644 --- a/internal/resource/nginx_instance_operator.go +++ b/internal/resource/nginx_instance_operator.go @@ -25,6 +25,8 @@ type NginxInstanceOperator struct { var _ instanceOperator = (*NginxInstanceOperator)(nil) +var logOrigin = slog.String("log_origin", "nginx_instance_operator.go") + func NewInstanceOperator(agentConfig *config.Config) *NginxInstanceOperator { return &NginxInstanceOperator{ executer: &exec.Exec{}, @@ -34,7 +36,7 @@ func NewInstanceOperator(agentConfig *config.Config) *NginxInstanceOperator { } func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Instance) error { - slog.DebugContext(ctx, "Validating NGINX config") + slog.DebugContext(ctx, "Validating NGINX config", logOrigin) exePath := instance.GetInstanceRuntime().GetBinaryPath() out, err := i.executer.RunCmd(ctx, exePath, "-t") @@ -47,7 +49,7 @@ func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Inst return err } - slog.InfoContext(ctx, "NGINX config tested", "output", out) + slog.InfoContext(ctx, "NGINX config tested", "output", out, logOrigin) return nil } @@ -68,10 +70,19 @@ func (i *NginxInstanceOperator) validateConfigCheckResponse(out []byte) error { func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instance) error { var errorsFound error - slog.InfoContext(ctx, "Reloading NGINX PID", "pid", - instance.GetInstanceRuntime().GetProcessId()) - - slog.InfoContext(ctx, "NGINX reloaded", "processid", instance.GetInstanceRuntime().GetProcessId()) + slog.InfoContext( + ctx, + "Reloading NGINX PID", + "pid", instance.GetInstanceRuntime().GetProcessId(), + logOrigin, + ) + + slog.InfoContext( + ctx, + "NGINX reloaded", + "processid", instance.GetInstanceRuntime().GetProcessId(), + logOrigin, + ) errorLogs := i.errorLogs(instance) @@ -89,14 +100,14 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan for i := 0; i < numberOfExpectedMessages; i++ { logErr := <-logErrorChannel - slog.InfoContext(ctx, "Message received in logErrorChannel", "error", logErr) + slog.InfoContext(ctx, "Message received in logErrorChannel", "error", logErr, logOrigin) if logErr != nil { errorsFound = errors.Join(errorsFound, logErr) - slog.Info("Errors Found", "", errorsFound) + slog.Info("Errors Found", "", errorsFound, logOrigin) } } - slog.InfoContext(ctx, "Finished monitoring post reload") + slog.InfoContext(ctx, "Finished monitoring post reload", logOrigin) if errorsFound != nil { return errorsFound @@ -117,7 +128,7 @@ func (i *NginxInstanceOperator) errorLogs(instance *mpi.Instance) (errorLogs []s func (i *NginxInstanceOperator) monitorLogs(ctx context.Context, errorLogs []string, errorChannel chan error) { if len(errorLogs) == 0 { - slog.InfoContext(ctx, "No NGINX error logs found to monitor") + slog.InfoContext(ctx, "No NGINX error logs found to monitor", logOrigin) return } diff --git a/internal/resource/nginx_log_tailer_operator.go b/internal/resource/nginx_log_tailer_operator.go index 3a62f12ee..e05f05ef3 100644 --- a/internal/resource/nginx_log_tailer_operator.go +++ b/internal/resource/nginx_log_tailer_operator.go @@ -23,6 +23,8 @@ type NginxLogTailerOperator struct { var _ logTailerOperator = (*NginxLogTailerOperator)(nil) +var tailerLogOrigin = slog.String("log_origin", "nginx_log_tailer_operator.go") + var ( // Line is over 120 characters long, regex needs to be on one line so needs to be ignored by linter // nolint: lll @@ -41,7 +43,8 @@ func NewLogTailerOperator(agentConfig *config.Config) *NginxLogTailerOperator { func (l *NginxLogTailerOperator) Tail(ctx context.Context, errorLog string, errorChannel chan error) { t, err := nginx.NewTailer(errorLog) if err != nil { - slog.ErrorContext(ctx, "Unable to tail error log after NGINX reload", "log_file", errorLog, "error", err) + slog.ErrorContext(ctx, "Unable to tail error log after NGINX reload", "log_file", errorLog, + "error", err, tailerLogOrigin) // this is not an error in the logs, ignoring tailing errorChannel <- nil @@ -51,7 +54,8 @@ func (l *NginxLogTailerOperator) Tail(ctx context.Context, errorLog string, erro ctxWithTimeout, cancel := context.WithTimeout(ctx, l.agentConfig.DataPlaneConfig.Nginx.ReloadMonitoringPeriod) defer cancel() - slog.DebugContext(ctxWithTimeout, "Monitoring NGINX error log file for any errors", "file", errorLog) + slog.DebugContext(ctxWithTimeout, "Monitoring NGINX error log file for any errors", + "file", errorLog, tailerLogOrigin) data := make(chan string, logTailerChannelSize) go t.Tail(ctxWithTimeout, data) diff --git a/internal/resource/nginx_plus_actions.go b/internal/resource/nginx_plus_actions.go index e0bcdc819..7cd242067 100644 --- a/internal/resource/nginx_plus_actions.go +++ b/internal/resource/nginx_plus_actions.go @@ -21,6 +21,9 @@ type APIAction struct { ResourceService resourceServiceInterface } +var plusActionsLogOrigin = slog.String("log_origin", "nginx_plus_actions.go") + +// nolint: dupl func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action *mpi.NGINXPlusAction, instance *mpi.Instance, ) *mpi.DataPlaneResponse { @@ -30,16 +33,27 @@ func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action add, update, del, err := a.ResourceService.UpdateStreamServers(ctx, instance, action.GetUpdateStreamServers().GetUpstreamStreamName(), action.GetUpdateStreamServers().GetServers()) if err != nil { - slog.ErrorContext(ctx, "Unable to update stream servers of upstream", "request", - action.GetUpdateHttpUpstreamServers(), "error", err) + slog.ErrorContext( + ctx, + "Unable to update stream servers of upstream", + "request", action.GetUpdateHttpUpstreamServers(), + "error", err, + plusActionsLogOrigin, + ) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, err.Error()) } - slog.DebugContext(ctx, "Successfully updated stream upstream servers", "http_upstream_name", - action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update), - "delete", len(del)) + slog.DebugContext( + ctx, + "Successfully updated stream upstream servers", + "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), + "add", len(add), + "update", len(update), + "delete", len(del), + plusActionsLogOrigin, + ) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, "Successfully updated stream upstream servers", instanceID, "") @@ -54,7 +68,7 @@ func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context, streamUpstreams, err := a.ResourceService.GetStreamUpstreams(ctx, instance) if err != nil { - slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err) + slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err, plusActionsLogOrigin) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, err.Error()) } @@ -62,7 +76,7 @@ func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context, if streamUpstreams != nil { streamUpstreamsJSON, jsonErr := json.Marshal(streamUpstreams) if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err) + slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err, plusActionsLogOrigin) } streamUpstreamsResponse = string(streamUpstreamsJSON) } @@ -78,7 +92,7 @@ func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi upstreams, err := a.ResourceService.GetUpstreams(ctx, instance) if err != nil { - slog.InfoContext(ctx, "Unable to get upstreams", "error", err) + slog.InfoContext(ctx, "Unable to get upstreams", "error", err, plusActionsLogOrigin) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, err.Error()) @@ -87,7 +101,7 @@ func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi if upstreams != nil { upstreamsJSON, jsonErr := json.Marshal(upstreams) if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err) + slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err, plusActionsLogOrigin) } upstreamsResponse = string(upstreamsJSON) } @@ -96,6 +110,7 @@ func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi upstreamsResponse, instanceID, "") } +// nolint: dupl func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action *mpi.NGINXPlusAction, instance *mpi.Instance, ) *mpi.DataPlaneResponse { @@ -106,16 +121,27 @@ func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), action.GetUpdateHttpUpstreamServers().GetServers()) if err != nil { - slog.ErrorContext(ctx, "Unable to update HTTP servers of upstream", "request", - action.GetUpdateHttpUpstreamServers(), "error", err) + slog.ErrorContext( + ctx, + "Unable to update HTTP servers of upstream", + "request", action.GetUpdateHttpUpstreamServers(), + "error", err, + plusActionsLogOrigin, + ) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, err.Error()) } - slog.DebugContext(ctx, "Successfully updated http upstream servers", "http_upstream_name", - action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update), - "delete", len(del)) + slog.DebugContext( + ctx, + "Successfully updated http upstream servers", + "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), + "add", len(add), + "update", len(update), + "delete", len(del), + plusActionsLogOrigin, + ) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, "Successfully updated HTTP Upstreams", instanceID, "") @@ -131,7 +157,7 @@ func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(ctx context.Context, ac upstreams, err := a.ResourceService.GetHTTPUpstreamServers(ctx, instance, action.GetGetHttpUpstreamServers().GetHttpUpstreamName()) if err != nil { - slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err) + slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err, plusActionsLogOrigin) return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, err.Error()) } @@ -139,7 +165,7 @@ func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(ctx context.Context, ac if upstreams != nil { upstreamsJSON, jsonErr := json.Marshal(upstreams) if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err) + slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err, plusActionsLogOrigin) } upstreamsResponse = string(upstreamsJSON) } diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index 5d9015c35..10d78904c 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -44,6 +44,8 @@ type plusAPIErr struct { var _ bus.Plugin = (*Resource)(nil) +var resourcePluginLogOrigin = slog.String("log_origin", "resource_plugin.go") + func NewResource(agentConfig *config.Config) *Resource { return &Resource{ agentConfig: agentConfig, @@ -51,7 +53,7 @@ func NewResource(agentConfig *config.Config) *Resource { } func (r *Resource) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - slog.DebugContext(ctx, "Starting resource plugin") + slog.DebugContext(ctx, "Starting resource plugin", resourcePluginLogOrigin) r.messagePipe = messagePipe r.resourceService = NewResourceService(ctx, r.agentConfig) @@ -60,7 +62,7 @@ func (r *Resource) Init(ctx context.Context, messagePipe bus.MessagePipeInterfac } func (*Resource) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing resource plugin") + slog.InfoContext(ctx, "Closing resource plugin", resourcePluginLogOrigin) return nil } @@ -77,7 +79,8 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) { case bus.AddInstancesTopic: instanceList, ok := msg.Data.([]*mpi.Instance) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", + "payload", msg.Data, resourcePluginLogOrigin) return } @@ -90,7 +93,8 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) { case bus.UpdatedInstancesTopic: instanceList, ok := msg.Data.([]*mpi.Instance) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", + "payload", msg.Data, resourcePluginLogOrigin) return } @@ -103,7 +107,8 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) { case bus.DeletedInstancesTopic: instanceList, ok := msg.Data.([]*mpi.Instance) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to []*mpi.Instance", + "payload", msg.Data, resourcePluginLogOrigin) return } @@ -119,7 +124,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) { case bus.APIActionRequestTopic: r.handleAPIActionRequest(ctx, msg) default: - slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic, resourcePluginLogOrigin) } } @@ -138,8 +143,8 @@ func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", "payload", - msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", + "payload", msg.Data, resourcePluginLogOrigin) return } @@ -147,7 +152,7 @@ func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ActionRequest) if !requestOk { slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ActionRequest", - "payload", msg.Data) + "payload", msg.Data, resourcePluginLogOrigin) } instanceID := request.ActionRequest.GetInstanceId() @@ -156,7 +161,7 @@ func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) case *mpi.APIActionRequest_NginxPlusAction: r.handleNginxPlusActionRequest(ctx, request.ActionRequest.GetNginxPlusAction(), instanceID) default: - slog.DebugContext(ctx, "API action request not implemented yet") + slog.DebugContext(ctx, "API action request not implemented yet", resourcePluginLogOrigin) } } @@ -167,7 +172,7 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi ResourceService: r.resourceService, } if instance == nil { - slog.ErrorContext(ctx, "Unable to find instance with ID", "id", instanceID) + slog.ErrorContext(ctx, "Unable to find instance with ID", "id", instanceID, resourcePluginLogOrigin) resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, fmt.Sprintf("failed to preform API "+ "action, could not find instance with ID: %s", instanceID)) @@ -178,7 +183,8 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi } if instance.GetInstanceMeta().GetInstanceType() != mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { - slog.ErrorContext(ctx, "Failed to preform API action", "error", errors.New("instance is not NGINX Plus")) + slog.ErrorContext(ctx, "Failed to preform API action", + "error", errors.New("instance is not NGINX Plus"), resourcePluginLogOrigin) resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, "", instanceID, "failed to preform API action, instance is not NGINX Plus") @@ -189,41 +195,48 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi switch action.GetAction().(type) { case *mpi.NGINXPlusAction_UpdateHttpUpstreamServers: - slog.DebugContext(ctx, "Updating http upstream servers", "request", action.GetUpdateHttpUpstreamServers()) + slog.DebugContext(ctx, "Updating http upstream servers", + "request", action.GetUpdateHttpUpstreamServers(), resourcePluginLogOrigin) resp := apiAction.HandleUpdateHTTPUpstreamsRequest(ctx, action, instance) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) case *mpi.NGINXPlusAction_GetHttpUpstreamServers: - slog.DebugContext(ctx, "Getting http upstream servers", "request", action.GetGetHttpUpstreamServers()) + slog.DebugContext(ctx, "Getting http upstream servers", + "request", action.GetGetHttpUpstreamServers(), resourcePluginLogOrigin) resp := apiAction.HandleGetHTTPUpstreamsServersRequest(ctx, action, instance) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) case *mpi.NGINXPlusAction_UpdateStreamServers: - slog.DebugContext(ctx, "Updating stream servers", "request", action.GetUpdateStreamServers()) + slog.DebugContext(ctx, "Updating stream servers", + "request", action.GetUpdateStreamServers(), resourcePluginLogOrigin) resp := apiAction.HandleUpdateStreamServersRequest(ctx, action, instance) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) case *mpi.NGINXPlusAction_GetStreamUpstreams: - slog.DebugContext(ctx, "Getting stream upstreams", "request", action.GetGetStreamUpstreams()) + slog.DebugContext(ctx, "Getting stream upstreams", + "request", action.GetGetStreamUpstreams(), resourcePluginLogOrigin) resp := apiAction.HandleGetStreamUpstreamsRequest(ctx, instance) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) case *mpi.NGINXPlusAction_GetUpstreams: - slog.DebugContext(ctx, "Getting upstreams", "request", action.GetGetUpstreams()) + slog.DebugContext(ctx, "Getting upstreams", + "request", action.GetGetUpstreams(), resourcePluginLogOrigin) resp := apiAction.HandleGetUpstreamsRequest(ctx, instance) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) default: - slog.DebugContext(ctx, "NGINX Plus action not implemented yet") + slog.DebugContext(ctx, "NGINX Plus action not implemented yet", resourcePluginLogOrigin) } } func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Message) { data, ok := msg.Data.(*model.ConfigApplyMessage) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", + "payload", msg.Data, resourcePluginLogOrigin) return } err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { data.Error = err - slog.Error("errors found during config apply, sending error status, rolling back config", "err", err) + slog.Error("errors found during config apply, sending error status, rolling back config", + "err", err, resourcePluginLogOrigin) dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, "Config apply failed, rolling back config", data.InstanceID, err.Error()) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) @@ -248,13 +261,15 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { data, ok := msg.Data.(*model.ConfigApplyMessage) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", + "payload", msg.Data, resourcePluginLogOrigin) return } err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { - slog.Error("errors found during rollback, sending failure status", "err", err) + slog.Error("errors found during rollback, sending failure status", + "err", err, resourcePluginLogOrigin) rollbackResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, "Rollback failed", data.InstanceID, err.Error()) diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index c53ee6d27..939467c04 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -33,6 +33,8 @@ const ( unixPlusAPIFormat = "http://nginx-plus-api%s" ) +var resourceServiceLogOrigin = slog.String("log_origin", "resource_service.go") + //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate //counterfeiter:generate . resourceServiceInterface @@ -144,8 +146,13 @@ func (r *ResourceService) UpdateInstances(ctx context.Context, instanceList []*m } r.resource = resourceCopy } else { - slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource", - r.resource, "instances", instanceList) + slog.WarnContext( + ctx, + "Unable to clone resource while updating instances", + "resource", r.resource, + "instances", instanceList, + resourceServiceLogOrigin, + ) } } @@ -165,8 +172,13 @@ func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*m } } } else { - slog.WarnContext(ctx, "Unable to clone resource while deleting instances", "resource", - r.resource, "instances", instanceList) + slog.WarnContext( + ctx, + "Unable to clone resource while deleting instances", + "resource", r.resource, + "instances", instanceList, + resourceServiceLogOrigin, + ) } } r.RemoveOperator(instanceList) @@ -206,13 +218,17 @@ func (r *ResourceService) GetHTTPUpstreamServers(ctx context.Context, instance * ) ([]client.UpstreamServer, error) { plusClient, err := r.createPlusClient(instance) if err != nil { - slog.ErrorContext(ctx, "Failed to create plus client ", "error", err) + slog.ErrorContext(ctx, "Failed to create plus client ", "error", err, resourceServiceLogOrigin) return nil, err } servers, getServersErr := plusClient.GetHTTPServers(ctx, upstream) - slog.Warn("Error returned from NGINX Plus client, GetHTTPUpstreamServers", "err", getServersErr) + slog.Warn( + "Error returned from NGINX Plus client, GetHTTPUpstreamServers", + "err", getServersErr, + resourceServiceLogOrigin, + ) return servers, createPlusAPIError(getServersErr) } @@ -221,13 +237,23 @@ func (r *ResourceService) GetUpstreams(ctx context.Context, instance *mpi.Instan ) (*client.Upstreams, error) { plusClient, err := r.createPlusClient(instance) if err != nil { - slog.ErrorContext(ctx, "Failed to create plus client ", "error", err) + slog.ErrorContext( + ctx, + "Failed to create plus client ", + "error", err, + resourceServiceLogOrigin, + ) + return nil, err } servers, getUpstreamsErr := plusClient.GetUpstreams(ctx) - slog.Warn("Error returned from NGINX Plus client, GetUpstreams", "err", getUpstreamsErr) + slog.Warn( + "Error returned from NGINX Plus client, GetUpstreams", + "err", getUpstreamsErr, + resourceServiceLogOrigin, + ) return servers, createPlusAPIError(getUpstreamsErr) } @@ -236,13 +262,23 @@ func (r *ResourceService) GetStreamUpstreams(ctx context.Context, instance *mpi. ) (*client.StreamUpstreams, error) { plusClient, err := r.createPlusClient(instance) if err != nil { - slog.ErrorContext(ctx, "Failed to create plus client ", "error", err) + slog.ErrorContext( + ctx, + "Failed to create plus client ", + "error", err, + resourceServiceLogOrigin, + ) + return nil, err } streamUpstreams, getServersErr := plusClient.GetStreamUpstreams(ctx) - slog.Warn("Error returned from NGINX Plus client, GetStreamUpstreams", "err", getServersErr) + slog.Warn( + "Error returned from NGINX Plus client, GetStreamUpstreams", + "err", getServersErr, + resourceServiceLogOrigin, + ) return streamUpstreams, createPlusAPIError(getServersErr) } @@ -254,7 +290,8 @@ func (r *ResourceService) UpdateStreamServers(ctx context.Context, instance *mpi ) (added, updated, deleted []client.StreamUpstreamServer, err error) { plusClient, err := r.createPlusClient(instance) if err != nil { - slog.ErrorContext(ctx, "Failed to create plus client ", "error", err) + slog.ErrorContext(ctx, "Failed to create plus client ", "error", err, resourceServiceLogOrigin) + return nil, nil, nil, err } @@ -262,7 +299,11 @@ func (r *ResourceService) UpdateStreamServers(ctx context.Context, instance *mpi added, updated, deleted, updateError := plusClient.UpdateStreamServers(ctx, upstream, servers) - slog.Warn("Error returned from NGINX Plus client, UpdateStreamServers", "err", updateError) + slog.Warn( + "Error returned from NGINX Plus client, UpdateStreamServers", + "err", updateError, + resourceServiceLogOrigin, + ) return added, updated, deleted, createPlusAPIError(updateError) } @@ -274,7 +315,7 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc ) (added, updated, deleted []client.UpstreamServer, err error) { plusClient, err := r.createPlusClient(instance) if err != nil { - slog.ErrorContext(ctx, "Failed to create plus client ", "error", err) + slog.ErrorContext(ctx, "Failed to create plus client ", "error", err, resourceServiceLogOrigin) return nil, nil, nil, err } @@ -283,7 +324,11 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers) if updateError != nil { - slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError) + slog.Warn( + "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", + "err", updateError, + resourceServiceLogOrigin, + ) } return added, updated, deleted, createPlusAPIError(updateError) @@ -293,11 +338,21 @@ func convertToUpstreamServer(upstreams []*structpb.Struct) []client.UpstreamServ var servers []client.UpstreamServer res, err := json.Marshal(upstreams) if err != nil { - slog.Error("Failed to marshal upstreams", "error", err, "upstreams", upstreams) + slog.Error( + "Failed to marshal upstreams", + "error", err, + "upstreams", upstreams, + resourceServiceLogOrigin, + ) } err = json.Unmarshal(res, &servers) if err != nil { - slog.Error("Failed to unmarshal upstreams", "error", err, "servers", servers) + slog.Error( + "Failed to unmarshal upstreams", + "error", err, + "servers", servers, + resourceServiceLogOrigin, + ) } return servers @@ -307,11 +362,21 @@ func convertToStreamUpstreamServer(streamUpstreams []*structpb.Struct) []client. var servers []client.StreamUpstreamServer res, err := json.Marshal(streamUpstreams) if err != nil { - slog.Error("Failed to marshal stream upstream server", "error", err, "stream_upstreams", streamUpstreams) + slog.Error( + "Failed to marshal stream upstream server", + "error", err, + "stream_upstreams", streamUpstreams, + resourceServiceLogOrigin, + ) } err = json.Unmarshal(res, &servers) if err != nil { - slog.Error("Failed to unmarshal stream upstream server", "error", err, "stream_upstreams", streamUpstreams) + slog.Error( + "Failed to unmarshal stream upstream server", + "error", err, + "stream_upstreams", streamUpstreams, + resourceServiceLogOrigin, + ) } return servers @@ -325,7 +390,7 @@ func (r *ResourceService) createPlusClient(instance *mpi.Instance) (*client.Ngin return nil, errors.New("failed to preform API action, NGINX Plus API is not configured") } - slog.Info("location", "", plusAPI.GetListen()) + slog.Info("location", "", plusAPI.GetListen(), resourceServiceLogOrigin) if strings.HasPrefix(plusAPI.GetListen(), "unix:") { endpoint = fmt.Sprintf(unixPlusAPIFormat, plusAPI.GetLocation()) } else { @@ -392,7 +457,7 @@ func createPlusAPIError(apiErr error) error { r, err := json.Marshal(plusErr) if err != nil { - slog.Error("Unable to marshal NGINX Plus API error", "error", err) + slog.Error("Unable to marshal NGINX Plus API error", "error", err, resourceServiceLogOrigin) return apiErr } diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 3dd5dcc9f..454dbc78b 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -27,6 +27,8 @@ var emptyEvent = fsnotify.Event{ Op: 0, } +var logOrigin = slog.String("log_origin", "credential_watcher_service.go") + type CredentialUpdateMessage struct { CorrelationID slog.Attr } @@ -55,12 +57,12 @@ func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherS } func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) { - slog.DebugContext(ctx, "Starting credential watcher monitoring") + slog.DebugContext(ctx, "Starting credential watcher monitoring", logOrigin) ticker := time.NewTicker(monitoringInterval) watcher, err := fsnotify.NewWatcher() if err != nil { - slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err) + slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err, logOrigin) return } @@ -73,7 +75,7 @@ func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- Creden case <-ctx.Done(): closeError := cws.watcher.Close() if closeError != nil { - slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError) + slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError, logOrigin) } return @@ -82,7 +84,12 @@ func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- Creden case <-ticker.C: cws.checkForUpdates(ctx, ch) case watcherError := <-cws.watcher.Errors: - slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError) + slog.ErrorContext( + ctx, + "Unexpected error in credential watcher", + "error", watcherError, + logOrigin, + ) } } } @@ -93,29 +100,30 @@ func (cws *CredentialWatcherService) SetEnabled(enabled bool) { func (cws *CredentialWatcherService) addWatcher(ctx context.Context, filePath string) { if !cws.enabled.Load() { - slog.DebugContext(ctx, "Credential watcher is disabled") + slog.DebugContext(ctx, "Credential watcher is disabled", logOrigin) return } if cws.isWatching(filePath) { slog.DebugContext( - ctx, "Credential watcher is already watching ", "path", filePath) + ctx, "Credential watcher is already watching ", "path", filePath, logOrigin) return } if err := cws.watcher.Add(filePath); err != nil { - slog.ErrorContext(ctx, "Failed to add credential watcher", "path", filePath, "error", err) + slog.ErrorContext(ctx, "Failed to add credential watcher", "path", filePath, + "error", err, logOrigin) return } cws.filesBeingWatched.Store(filePath, true) - slog.DebugContext(ctx, "Credential watcher has been added", "path", filePath) + slog.DebugContext(ctx, "Credential watcher has been added", "path", filePath, logOrigin) } func (cws *CredentialWatcherService) watchFiles(ctx context.Context, files []string) { - slog.DebugContext(ctx, "Creating credential watchers") + slog.DebugContext(ctx, "Creating credential watchers", logOrigin) for _, filePath := range files { cws.addWatcher(ctx, filePath) @@ -135,11 +143,11 @@ func (cws *CredentialWatcherService) isWatching(path string) bool { func (cws *CredentialWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) { if cws.enabled.Load() { if isEventSkippable(event) { - slog.DebugContext(ctx, "Skipping FSNotify event", "event", event) + slog.DebugContext(ctx, "Skipping FSNotify event", "event", event, logOrigin) return } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) + slog.DebugContext(ctx, "Processing FSNotify event", "event", event, logOrigin) switch { case event.Has(fsnotify.Remove): @@ -163,7 +171,7 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()), ) - slog.DebugContext(ctx, "Credential watcher has detected changes") + slog.DebugContext(ctx, "Credential watcher has detected changes", logOrigin) ch <- CredentialUpdateMessage{CorrelationID: logger.GetCorrelationIDAttr(newCtx)} cws.filesChanged.Store(false) } diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index d864414ff..be082ba29 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -36,6 +36,8 @@ var emptyEvent = fsnotify.Event{ Op: 0, } +var logOrigin = slog.String("log_origin", "file_watcher_service.go") + type FileUpdateMessage struct { CorrelationID slog.Attr } @@ -65,14 +67,25 @@ func NewFileWatcherService(agentConfig *config.Config) *FileWatcherService { func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMessage) { monitoringFrequency := fws.agentConfig.Watchers.FileWatcher.MonitoringFrequency - slog.DebugContext(ctx, "Starting file watcher monitoring", "monitoring_frequency", monitoringFrequency) + slog.DebugContext( + ctx, + "Starting file watcher monitoring", + "monitoring_frequency", monitoringFrequency, + logOrigin, + ) instanceWatcherTicker := time.NewTicker(monitoringFrequency) defer instanceWatcherTicker.Stop() watcher, err := fsnotify.NewWatcher() if err != nil { - slog.ErrorContext(ctx, "Failed to create file watcher", "error", err) + slog.ErrorContext( + ctx, + "Failed to create file watcher", + "error", err, + logOrigin, + ) + return } @@ -85,7 +98,7 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe case <-ctx.Done(): closeError := fws.watcher.Close() if closeError != nil { - slog.ErrorContext(ctx, "Unable to close file watcher", "error", closeError) + slog.ErrorContext(ctx, "Unable to close file watcher", "error", closeError, logOrigin) } return @@ -94,7 +107,7 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe case <-instanceWatcherTicker.C: fws.checkForUpdates(ctx, ch) case watcherError := <-fws.watcher.Errors: - slog.ErrorContext(ctx, "Unexpected error in file watcher", "error", watcherError) + slog.ErrorContext(ctx, "Unexpected error in file watcher", "error", watcherError, logOrigin) } } } @@ -106,15 +119,28 @@ func (fws *FileWatcherService) SetEnabled(enabled bool) { func (fws *FileWatcherService) watchDirectories(ctx context.Context) { for _, dir := range fws.agentConfig.AllowedDirectories { if _, err := os.Stat(dir); errors.Is(err, os.ErrNotExist) { - slog.DebugContext(ctx, "Unable to watch directory that does not exist", "directory", dir, "error", err) + slog.DebugContext( + ctx, + "Unable to watch directory that does not exist", + "directory", dir, + "error", err, + logOrigin, + ) + continue } - slog.DebugContext(ctx, "Creating file watchers", "directory", dir) + slog.DebugContext(ctx, "Creating file watchers", "directory", dir, logOrigin) err := fws.walkDir(ctx, dir) if err != nil { - slog.ErrorContext(ctx, "Failed to create file watchers", "directory", dir, "error", err) + slog.ErrorContext( + ctx, + "Failed to create file watchers", + "directory", dir, + "error", err, + logOrigin, + ) } } } @@ -127,7 +153,13 @@ func (fws *FileWatcherService) walkDir(ctx context.Context, dir string) error { info, infoErr := d.Info() if infoErr != nil { - slog.ErrorContext(ctx, "Error getting info for file", "error", infoErr) + slog.ErrorContext( + ctx, + "Error getting info for file", + "error", infoErr, + logOrigin, + ) + return infoErr } @@ -142,10 +174,22 @@ func (fws *FileWatcherService) walkDir(ctx context.Context, dir string) error { func (fws *FileWatcherService) addWatcher(ctx context.Context, path string, info os.FileInfo) { if info.IsDir() && !fws.isWatching(path) { if err := fws.watcher.Add(path); err != nil { - slog.ErrorContext(ctx, "Failed to add file watcher", "directory_path", path, "error", err) + slog.ErrorContext( + ctx, + "Failed to add file watcher", + "directory_path", path, + "error", err, + logOrigin, + ) removeError := fws.watcher.Remove(path) if removeError != nil { - slog.ErrorContext(ctx, "Failed to remove file watcher", "directory_path", path, "error", removeError) + slog.ErrorContext( + ctx, + "Failed to remove file watcher", + "directory_path", path, + "error", removeError, + logOrigin, + ) } return @@ -159,7 +203,14 @@ func (fws *FileWatcherService) removeWatcher(ctx context.Context, path string) { if _, ok := fws.directoriesBeingWatched.Load(path); ok { err := fws.watcher.Remove(path) if err != nil { - slog.ErrorContext(ctx, "Failed to remove file watcher", "directory_path", path, "error", err) + slog.ErrorContext( + ctx, + "Failed to remove file watcher", + "directory_path", path, + "error", err, + logOrigin, + ) + return } @@ -190,7 +241,7 @@ func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.E case event.Op&Create == Create: info, err := os.Stat(event.Name) if err != nil { - slog.DebugContext(ctx, "Unable to add watcher", "path", event.Name, "error", err) + slog.DebugContext(ctx, "Unable to add watcher", "path", event.Name, "error", err, logOrigin) return } fws.addWatcher(ctx, event.Name, info) @@ -198,7 +249,7 @@ func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.E fws.removeWatcher(ctx, event.Name) } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) + slog.DebugContext(ctx, "Processing FSNotify event", "event", event, logOrigin) fws.filesChanged.Store(true) } @@ -212,7 +263,7 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()), ) - slog.DebugContext(newCtx, "File watcher detected a file change") + slog.DebugContext(newCtx, "File watcher detected a file change", logOrigin) ch <- FileUpdateMessage{CorrelationID: logger.GetCorrelationIDAttr(newCtx)} fws.filesChanged.Store(false) } @@ -228,13 +279,13 @@ func isExcludedFile(path string, excludeFiles []string) bool { for _, pattern := range excludeFiles { _, compileErr := regexp.Compile(pattern) if compileErr != nil { - slog.Error("Invalid path for excluding file", "file_path", pattern) + slog.Error("Invalid path for excluding file", "file_path", pattern, logOrigin) continue } ok, err := regexp.MatchString(pattern, path) if err != nil { - slog.Error("Invalid path for excluding file", "file_path", pattern) + slog.Error("Invalid path for excluding file", "file_path", pattern, logOrigin) continue } else if ok { return true diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go index 77ae72ea5..e49cb3eae 100644 --- a/internal/watcher/health/health_watcher_service.go +++ b/internal/watcher/health/health_watcher_service.go @@ -42,6 +42,8 @@ type ( } ) +var logOrigin = slog.String("log_origin", "health_watcher_service.go") + func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService { return &HealthWatcherService{ watchers: make(map[string]healthWatcherOperator), @@ -65,8 +67,8 @@ func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) { mpi.InstanceMeta_INSTANCE_TYPE_NGINX_APP_PROTECT: fallthrough default: - slog.Warn("Health watcher not implemented", "instance_type", - instance.GetInstanceMeta().GetInstanceType()) + slog.Warn("Health watcher not implemented", + "instance_type", instance.GetInstanceMeta().GetInstanceType(), logOrigin) } hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance } @@ -100,7 +102,12 @@ func (hw *HealthWatcherService) GetInstancesHealth() []*mpi.InstanceHealth { func (hw *HealthWatcherService) Watch(ctx context.Context, ch chan<- InstanceHealthMessage) { monitoringFrequency := hw.agentConfig.Watchers.InstanceHealthWatcher.MonitoringFrequency - slog.DebugContext(ctx, "Starting health watcher monitoring", "monitoring_frequency", monitoringFrequency) + slog.DebugContext( + ctx, + "Starting health watcher monitoring", + "monitoring_frequency", monitoringFrequency, + logOrigin, + ) instanceHealthTicker := time.NewTicker(monitoringFrequency) defer instanceHealthTicker.Stop() @@ -116,7 +123,7 @@ func (hw *HealthWatcherService) Watch(ctx context.Context, ch chan<- InstanceHea healthStatuses, isHealthDiff := hw.health(ctx) if isHealthDiff && len(healthStatuses) > 0 { - slog.DebugContext(newCtx, "Instance health watcher found health updates") + slog.DebugContext(newCtx, "Instance health watcher found health updates", logOrigin) ch <- InstanceHealthMessage{ CorrelationID: correlationID, InstanceHealth: healthStatuses, @@ -171,7 +178,7 @@ func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.Instan } } - slog.Debug("Updating health watcher cache", "cache", hw.cache) + slog.Debug("Updating health watcher cache", "cache", hw.cache, logOrigin) } // compare the cache with the current list of instances to check if an instance has been deleted diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index e163ee0c9..d00611195 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -32,6 +32,8 @@ const defaultAgentPath = "/run/nginx-agent" //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate //counterfeiter:generate . nginxConfigParser +var logOrigin = slog.String("log_origin", "instance_watcher_service.go") + type ( processParser interface { Parse(ctx context.Context, processes []*nginxprocess.Process) map[string]*mpi.Instance @@ -92,7 +94,12 @@ func (iw *InstanceWatcherService) Watch( nginxConfigContextChannel chan<- NginxConfigContextMessage, ) { monitoringFrequency := iw.agentConfig.Watchers.InstanceWatcher.MonitoringFrequency - slog.DebugContext(ctx, "Starting instance watcher monitoring", "monitoring_frequency", monitoringFrequency) + slog.DebugContext( + ctx, + "Starting instance watcher monitoring", + "monitoring_frequency", monitoringFrequency, + logOrigin, + ) iw.instancesChannel = instancesChannel iw.nginxConfigContextChannel = nginxConfigContextChannel @@ -114,11 +121,11 @@ func (iw *InstanceWatcherService) Watch( } func (iw *InstanceWatcherService) ReparseConfigs(ctx context.Context) { - slog.DebugContext(ctx, "Reparsing all instance configurations") + slog.DebugContext(ctx, "Reparsing all instance configurations", logOrigin) for _, instance := range iw.instanceCache { iw.ReparseConfig(ctx, instance.GetInstanceMeta().GetInstanceId()) } - slog.DebugContext(ctx, "Finished reparsing all instance configurations") + slog.DebugContext(ctx, "Finished reparsing all instance configurations", logOrigin) } func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID string) { @@ -136,6 +143,7 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID ctx, "Reparsing NGINX instance config", "instance_id", instanceID, + logOrigin, ) nginxConfigContext, parseErr := iw.nginxConfigParser.Parse(ctx, instance) @@ -146,6 +154,7 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID "config_path", instance.GetInstanceRuntime().GetConfigPath(), "instance_id", instanceID, "error", parseErr, + logOrigin, ) return @@ -175,7 +184,7 @@ func (iw *InstanceWatcherService) checkForUpdates( instanceUpdates, err := iw.instanceUpdates(newCtx) if err != nil { - slog.ErrorContext(newCtx, "Instance watcher updates", "error", err) + slog.ErrorContext(newCtx, "Instance watcher updates", "error", err, logOrigin) } instancesToParse = append(instancesToParse, instanceUpdates.UpdatedInstances...) @@ -188,6 +197,7 @@ func (iw *InstanceWatcherService) checkForUpdates( "Parsing instance config", "instance_id", newInstance.GetInstanceMeta().GetInstanceId(), "instance_type", instanceType, + logOrigin, ) if instanceType == mpi.InstanceMeta_INSTANCE_TYPE_NGINX || @@ -201,6 +211,7 @@ func (iw *InstanceWatcherService) checkForUpdates( "instance_id", newInstance.GetInstanceMeta().GetInstanceId(), "instance_type", instanceType, "error", parseErr, + logOrigin, ) } else { iw.sendNginxConfigContextUpdate(newCtx, nginxConfigContext) @@ -231,6 +242,7 @@ func (iw *InstanceWatcherService) sendNginxConfigContextUpdate( "New NGINX config context", "instance_id", nginxConfigContext.InstanceID, "nginx_config_context", nginxConfigContext, + logOrigin, ) iw.nginxConfigContextChannel <- NginxConfigContextMessage{ @@ -286,12 +298,22 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan processPath, err := iw.executer.Executable() if err != nil { processPath = defaultAgentPath - slog.WarnContext(ctx, "Unable to read process location, defaulting to /var/run/nginx-agent", "error", err) + slog.WarnContext( + ctx, + "Unable to read process location, defaulting to /var/run/nginx-agent", + "error", err, + logOrigin, + ) } labels, convertErr := mpi.ConvertToStructs(iw.agentConfig.Labels) if convertErr != nil { - slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr) + slog.WarnContext( + ctx, + "Unable to convert config to labels structure", + "error", convertErr, + logOrigin, + ) } return &mpi.Instance{ diff --git a/internal/watcher/instance/nginx_app_protect_process_parser.go b/internal/watcher/instance/nginx_app_protect_process_parser.go index 93001e86e..8b09a7f72 100644 --- a/internal/watcher/instance/nginx_app_protect_process_parser.go +++ b/internal/watcher/instance/nginx_app_protect_process_parser.go @@ -36,6 +36,8 @@ type ( var _ processParser = (*NginxAppProtectProcessParser)(nil) +var napProcessParserLogOrigin = slog.String("log_origin", "nginx_app_protect_process_parser.go") + func NewNginxAppProtectProcessParser() *NginxAppProtectProcessParser { return &NginxAppProtectProcessParser{ versionFilePath: versionFilePath, @@ -94,7 +96,14 @@ func (n NginxAppProtectProcessParser) instanceID(process *nginxprocess.Process) func (n NginxAppProtectProcessParser) instanceVersion(ctx context.Context) string { version, err := os.ReadFile(n.versionFilePath) if err != nil { - slog.WarnContext(ctx, "Unable to read NAP version file", "file_path", n.versionFilePath, "error", err) + slog.WarnContext( + ctx, + "Unable to read NAP version file", + "file_path", n.versionFilePath, + "error", err, + napProcessParserLogOrigin, + ) + return "" } @@ -104,7 +113,14 @@ func (n NginxAppProtectProcessParser) instanceVersion(ctx context.Context) strin func (n NginxAppProtectProcessParser) release(ctx context.Context) string { release, err := os.ReadFile(n.releaseFilePath) if err != nil { - slog.WarnContext(ctx, "Unable to read NAP release file", "file_path", n.releaseFilePath, "error", err) + slog.WarnContext( + ctx, + "Unable to read NAP release file", + "file_path", n.releaseFilePath, + "error", err, + napProcessParserLogOrigin, + ) + return "" } @@ -119,6 +135,7 @@ func (n NginxAppProtectProcessParser) attackSignatureVersion(ctx context.Context "Unable to read NAP attack signature version file", "file_path", n.attackSignatureVersionFilePath, "error", err, + napProcessParserLogOrigin, ) return "" @@ -135,6 +152,7 @@ func (n NginxAppProtectProcessParser) threatCampaignVersion(ctx context.Context) "Unable to read NAP threat campaign version file", "file_path", n.threatCampaignVersionFilePath, "error", err, + napProcessParserLogOrigin, ) return "" diff --git a/internal/watcher/instance/nginx_config_parser.go b/internal/watcher/instance/nginx_config_parser.go index af9a8e3b9..9da642f55 100644 --- a/internal/watcher/instance/nginx_config_parser.go +++ b/internal/watcher/instance/nginx_config_parser.go @@ -50,6 +50,8 @@ type ( var _ nginxConfigParser = (*NginxConfigParser)(nil) +var configParserLogOrigin = slog.String("log_origin", "nginx_config_parser.go") + type ( crossplaneTraverseCallback = func(ctx context.Context, parent, current *crossplane.Directive) error crossplaneTraverseCallbackAPIDetails = func(ctx context.Context, parent, @@ -74,6 +76,7 @@ func (ncp *NginxConfigParser) Parse(ctx context.Context, instance *mpi.Instance) "Parsing NGINX config", "file_path", configPath, "instance_id", instance.GetInstanceMeta().GetInstanceId(), + configParserLogOrigin, ) lua := crossplane.Lua{} @@ -118,11 +121,14 @@ func (ncp *NginxConfigParser) createNginxConfigContext( rootDir := filepath.Dir(instance.GetInstanceRuntime().GetConfigPath()) for _, conf := range payload.Config { - slog.DebugContext(ctx, "Traversing NGINX config file", "config", conf) + slog.DebugContext(ctx, "Traversing NGINX config file", "config", conf, configParserLogOrigin) if !ncp.agentConfig.IsDirectoryAllowed(conf.File) { - slog.WarnContext(ctx, "File included in NGINX config is outside of allowed directories, "+ - "excluding from config", - "file", conf.File) + slog.WarnContext( + ctx, + "File included in NGINX config is outside of allowed directories, excluding from config", + "file", conf.File, + configParserLogOrigin, + ) continue } @@ -144,21 +150,35 @@ func (ncp *NginxConfigParser) createNginxConfigContext( errorLog := ncp.errorLog(directive.Args[0], ncp.errorLogDirectiveLevel(directive)) nginxConfigContext.ErrorLogs = append(nginxConfigContext.ErrorLogs, errorLog) } else { - slog.WarnContext(ctx, fmt.Sprintf("Currently error log outputs to %s. Log monitoring "+ - "is disabled while applying a config; "+"log errors to file to enable error monitoring", - directive.Args[0]), "error_log", directive.Args[0]) + slog.WarnContext( + ctx, + fmt.Sprintf("Currently error log outputs to %s. Log monitoring "+ + "is disabled while applying a config; "+"log errors to file to enable error monitoring", + directive.Args[0]), + "error_log", directive.Args[0], + configParserLogOrigin, + ) } case "ssl_certificate", "proxy_ssl_certificate", "ssl_client_certificate", "ssl_trusted_certificate": if ncp.agentConfig.IsFeatureEnabled(pkg.FeatureCertificates) { sslCertFile := ncp.sslCert(ctx, directive.Args[0], rootDir) if sslCertFile != nil && !ncp.isDuplicateFile(nginxConfigContext.Files, sslCertFile) { - slog.DebugContext(ctx, "Adding SSL certificate file", "ssl_cert", sslCertFile) + slog.DebugContext( + ctx, + "Adding SSL certificate file", + "ssl_cert", sslCertFile, + configParserLogOrigin, + ) nginxConfigContext.Files = append(nginxConfigContext.Files, sslCertFile) } } else { - slog.DebugContext(ctx, "Certificate feature is disabled, skipping cert", - "enabled_features", ncp.agentConfig.Features) + slog.DebugContext( + ctx, + "Certificate feature is disabled, skipping cert", + "enabled_features", ncp.agentConfig.Features, + configParserLogOrigin, + ) } case "app_protect_security_log": if len(directive.Args) > 1 { @@ -173,7 +193,12 @@ func (ncp *NginxConfigParser) createNginxConfigContext( syslogServer, ) napSyslogServersFound[syslogServer] = true - slog.DebugContext(ctx, "Found NAP syslog server", "address", syslogServer) + slog.DebugContext( + ctx, + "Found NAP syslog server", + "address", syslogServer, + configParserLogOrigin, + ) } } } @@ -198,7 +223,13 @@ func (ncp *NginxConfigParser) createNginxConfigContext( fileMeta, err := files.FileMeta(conf.File) if err != nil { - slog.WarnContext(ctx, "Unable to get file metadata", "file_name", conf.File, "error", err) + slog.WarnContext( + ctx, + "Unable to get file metadata", + "file_name", conf.File, + "error", err, + configParserLogOrigin, + ) } else { nginxConfigContext.Files = append(nginxConfigContext.Files, &mpi.File{FileMeta: fileMeta}) } @@ -219,7 +250,11 @@ func (ncp *NginxConfigParser) ignoreLog(logPath string) bool { } if !ncp.agentConfig.IsDirectoryAllowed(logPath) { - slog.Warn("Log being read is outside of allowed directories", "log_path", logPath) + slog.Warn( + "Log being read is outside of allowed directories", + "log_path", logPath, + configParserLogOrigin, + ) } return false @@ -229,16 +264,16 @@ func (ncp *NginxConfigParser) isExcludeLog(path string) bool { for _, pattern := range ncp.agentConfig.DataPlaneConfig.Nginx.ExcludeLogs { _, compileErr := regexp.Compile(pattern) if compileErr != nil { - slog.Error("Invalid path for excluding log", "log_path", pattern) + slog.Error("Invalid path for excluding log", "log_path", pattern, configParserLogOrigin) continue } ok, err := regexp.MatchString(pattern, path) if err != nil { - slog.Error("Invalid path for excluding log", "file_path", pattern) + slog.Error("Invalid path for excluding log", "file_path", pattern, configParserLogOrigin) continue } else if ok { - slog.Info("Excluding log as specified in config", "log_path", path) + slog.Info("Excluding log as specified in config", "log_path", path, configParserLogOrigin) return true } @@ -329,7 +364,13 @@ func (ncp *NginxConfigParser) errorLogDirectiveLevel(directive *crossplane.Direc func (ncp *NginxConfigParser) sslCert(ctx context.Context, file, rootDir string) (sslCertFile *mpi.File) { if strings.Contains(file, "$") { - slog.DebugContext(ctx, "Cannot process SSL certificate file path with variables", "file", file) + slog.DebugContext( + ctx, + "Cannot process SSL certificate file path with variables", + "file", file, + configParserLogOrigin, + ) + return nil } @@ -338,11 +379,17 @@ func (ncp *NginxConfigParser) sslCert(ctx context.Context, file, rootDir string) } if !ncp.agentConfig.IsDirectoryAllowed(file) { - slog.DebugContext(ctx, "File not in allowed directories", "file", file) + slog.DebugContext(ctx, "File not in allowed directories", "file", file, configParserLogOrigin) } else { sslCertFileMeta, fileMetaErr := files.FileMetaWithCertificate(file) if fileMetaErr != nil { - slog.ErrorContext(ctx, "Unable to get file metadata", "file", file, "error", fileMetaErr) + slog.ErrorContext( + ctx, + "Unable to get file metadata", + "file", file, + "error", fileMetaErr, + configParserLogOrigin, + ) } else { sslCertFile = &mpi.File{FileMeta: sslCertFileMeta} } @@ -470,15 +517,31 @@ func (ncp *NginxConfigParser) apiCallback(ctx context.Context, parent, ) *model.APIDetails { urls := ncp.urlsForLocationDirectiveAPIDetails(parent, current, apiType) if len(urls) > 0 { - slog.DebugContext(ctx, fmt.Sprintf("%d potential %s urls", len(urls), apiType), "urls", urls) + slog.DebugContext( + ctx, + fmt.Sprintf("%d potential %s urls", len(urls), apiType), + "urls", urls, + configParserLogOrigin, + ) } for _, url := range urls { if ncp.pingAPIEndpoint(ctx, url, apiType) { - slog.DebugContext(ctx, fmt.Sprintf("%s found", apiType), "url", url) + slog.DebugContext( + ctx, + fmt.Sprintf("%s found", apiType), + "url", url, + configParserLogOrigin, + ) + return url } - slog.DebugContext(ctx, fmt.Sprintf("%s is not reachable", apiType), "url", url) + slog.DebugContext( + ctx, + fmt.Sprintf("%s is not reachable", apiType), + "url", url, + configParserLogOrigin, + ) } return &model.APIDetails{ @@ -502,26 +565,48 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta } req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusAPI, nil) if err != nil { - slog.WarnContext(ctx, fmt.Sprintf("Unable to create %s API GET request", apiType), "error", err) + slog.WarnContext( + ctx, + fmt.Sprintf("Unable to create %s API GET request", apiType), + "error", err, + configParserLogOrigin, + ) + return false } resp, err := httpClient.Do(req) if err != nil { - slog.WarnContext(ctx, fmt.Sprintf("Unable to GET %s from API request", apiType), "error", err) + slog.WarnContext( + ctx, + fmt.Sprintf("Unable to GET %s from API request", apiType), + "error", err, + configParserLogOrigin, + ) + return false } if resp.StatusCode != http.StatusOK { - slog.DebugContext(ctx, fmt.Sprintf("%s API responded with unexpected status code", apiType), "status_code", - resp.StatusCode, "expected", http.StatusOK) + slog.DebugContext(ctx, + fmt.Sprintf("%s API responded with unexpected status code", apiType), + "status_code", resp.StatusCode, + "expected", http.StatusOK, + configParserLogOrigin, + ) return false } bodyBytes, err := io.ReadAll(resp.Body) if err != nil { - slog.WarnContext(ctx, fmt.Sprintf("Unable to read %s API response body", apiType), "error", err) + slog.WarnContext( + ctx, + fmt.Sprintf("Unable to read %s API response body", apiType), + "error", err, + configParserLogOrigin, + ) + return false } @@ -544,7 +629,13 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta err = json.Unmarshal(bodyBytes, &responseBody) defer resp.Body.Close() if err != nil { - slog.DebugContext(ctx, "Unable to unmarshal NGINX Plus API response body", "error", err) + slog.DebugContext( + ctx, + "Unable to unmarshal NGINX Plus API response body", + "error", err, + configParserLogOrigin, + ) + return false } diff --git a/internal/watcher/instance/nginx_process_parser.go b/internal/watcher/instance/nginx_process_parser.go index 84cd737ac..9ac5ed850 100644 --- a/internal/watcher/instance/nginx_process_parser.go +++ b/internal/watcher/instance/nginx_process_parser.go @@ -53,6 +53,8 @@ var ( versionRegex = regexp.MustCompile(`(?P\S+)\/(?P.*)`) ) +var processParserLogOrigin = slog.String("log_origin", "nginx_process_parser.go") + func NewNginxProcessParser() *NginxProcessParser { return &NginxProcessParser{ executer: &exec.Exec{}, @@ -79,7 +81,13 @@ func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxproc } nginxInfo, err := npp.getInfo(ctx, proc) if err != nil { - slog.DebugContext(ctx, "Unable to get NGINX info", "pid", proc.PID, "error", err) + slog.DebugContext( + ctx, + "Unable to get NGINX info", + "pid", proc.PID, + "error", err, + processParserLogOrigin, + ) continue } @@ -107,7 +115,13 @@ func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxproc if proc.IsMaster() { nginxInfo, err := npp.getInfo(ctx, proc) if err != nil { - slog.DebugContext(ctx, "Unable to get NGINX info", "pid", proc.PID, "error", err) + slog.DebugContext( + ctx, + "Unable to get NGINX info", + "pid", proc.PID, + "error", err, + processParserLogOrigin, + ) continue } @@ -301,7 +315,7 @@ func getNginxPrefix(ctx context.Context, nginxInfo *Info) string { var ok bool prefix, ok = nginxInfo.ConfigureArgs["prefix"].(string) if !ok { - slog.DebugContext(ctx, "Failed to cast nginxInfo prefix to string") + slog.DebugContext(ctx, "Failed to cast nginxInfo prefix to string", processParserLogOrigin) } } else { prefix = "/usr/local/nginx" @@ -317,7 +331,7 @@ func getNginxConfPath(ctx context.Context, nginxInfo *Info) string { var ok bool confPath, ok = nginxInfo.ConfigureArgs["conf-path"].(string) if !ok { - slog.DebugContext(ctx, "failed to cast nginxInfo conf-path to string") + slog.DebugContext(ctx, "failed to cast nginxInfo conf-path to string", processParserLogOrigin) } } else { confPath = path.Join(nginxInfo.Prefix, "/conf/nginx.conf") @@ -339,12 +353,12 @@ func getLoadableModules(nginxInfo *Info) (modules []string) { if mp, ok := nginxInfo.ConfigureArgs["modules-path"]; ok { modulePath, pathOK := mp.(string) if !pathOK { - slog.Debug("Error parsing modules-path") + slog.Debug("Error parsing modules-path", processParserLogOrigin) return modules } modules, err = readDirectory(modulePath, ".so") if err != nil { - slog.Debug("Error reading module dir", "dir", modulePath, "error", err) + slog.Debug("Error reading module dir", "dir", modulePath, "error", err, processParserLogOrigin) return modules } diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 3fb7ec544..0d2b7c5d7 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -70,6 +70,8 @@ type ( var _ bus.Plugin = (*Watcher)(nil) +var logOrigin = slog.String("log_origin", "watcher_plugin.go") + func NewWatcher(agentConfig *config.Config) *Watcher { return &Watcher{ agentConfig: agentConfig, @@ -90,7 +92,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher { // nolint: unparam // error is always nil func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - slog.DebugContext(ctx, "Starting watcher plugin") + slog.DebugContext(ctx, "Starting watcher plugin", logOrigin) w.messagePipe = messagePipe watcherContext, cancel := context.WithCancel(ctx) @@ -115,7 +117,7 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface // nolint: unparam // error is always nil func (w *Watcher) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing watcher plugin") + slog.InfoContext(ctx, "Closing watcher plugin", logOrigin) w.cancel() @@ -141,7 +143,7 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { case bus.DataPlaneHealthRequestTopic: w.handleHealthRequest(ctx) default: - slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic, logOrigin) } } @@ -158,16 +160,26 @@ func (*Watcher) Subscriptions() []string { func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data, "topic", msg.Topic) + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.ManagementPlaneRequest", + "payload", msg.Data, + "topic", msg.Topic, + logOrigin, + ) return } request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) if !requestOk { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", - "payload", msg.Data, "topic", msg.Topic) + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", + "payload", msg.Data, + "topic", msg.Topic, + logOrigin, + ) return } @@ -184,8 +196,13 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) { response, ok := msg.Data.(*mpi.DataPlaneResponse) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", - msg.Data, "topic", msg.Topic) + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.DataPlaneResponse", + "payload", msg.Data, + "topic", msg.Topic, + logOrigin, + ) return } @@ -215,8 +232,13 @@ func (w *Watcher) handleHealthRequest(ctx context.Context) { func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) { response, ok := msg.Data.(*mpi.DataPlaneResponse) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", - msg.Data, "topic", msg.Topic) + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.DataPlaneResponse", + "payload", msg.Data, + "topic", msg.Topic, + logOrigin, + ) return } @@ -236,12 +258,12 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag } func (w *Watcher) handleCredentialUpdate(ctx context.Context) { - slog.DebugContext(ctx, "Received credential update topic") + slog.DebugContext(ctx, "Received credential update topic", logOrigin) w.watcherMutex.Lock() conn, err := grpc.NewGrpcConnection(ctx, w.agentConfig) if err != nil { - slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err) + slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err, logOrigin) w.watcherMutex.Unlock() return @@ -258,7 +280,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { case <-ctx.Done(): return case message := <-w.credentialUpdatesChannel: - slog.DebugContext(ctx, "Received credential update event") + slog.DebugContext(ctx, "Received credential update event", logOrigin) newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) w.messagePipe.Process(newCtx, &bus.Message{ Topic: bus.CredentialUpdatedTopic, Data: nil, @@ -274,6 +296,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { newCtx, "Updated NGINX config context", "nginx_config_context", message.NginxConfigContext, + logOrigin, ) w.messagePipe.Process( newCtx, @@ -285,6 +308,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { newCtx, "Not sending updated NGINX config context since config apply is in progress", "nginx_config_context", message.NginxConfigContext, + logOrigin, ) } w.watcherMutex.Unlock() @@ -304,7 +328,12 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance.InstanceUpdatesMessage) { if len(message.InstanceUpdates.NewInstances) > 0 { - slog.DebugContext(newCtx, "New instances found", "instances", message.InstanceUpdates.NewInstances) + slog.DebugContext( + newCtx, + "New instances found", + "instances", message.InstanceUpdates.NewInstances, + logOrigin, + ) w.healthWatcherService.AddHealthWatcher(message.InstanceUpdates.NewInstances) w.messagePipe.Process( newCtx, @@ -312,14 +341,24 @@ func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance ) } if len(message.InstanceUpdates.UpdatedInstances) > 0 { - slog.DebugContext(newCtx, "Instances updated", "instances", message.InstanceUpdates.UpdatedInstances) + slog.DebugContext( + newCtx, + "Instances updated", + "instances", message.InstanceUpdates.UpdatedInstances, + logOrigin, + ) w.messagePipe.Process( newCtx, &bus.Message{Topic: bus.UpdatedInstancesTopic, Data: message.InstanceUpdates.UpdatedInstances}, ) } if len(message.InstanceUpdates.DeletedInstances) > 0 { - slog.DebugContext(newCtx, "Instances deleted", "instances", message.InstanceUpdates.DeletedInstances) + slog.DebugContext( + newCtx, + "Instances deleted", + "instances", message.InstanceUpdates.DeletedInstances, + logOrigin, + ) w.healthWatcherService.DeleteHealthWatcher(message.InstanceUpdates. DeletedInstances) w.messagePipe.Process(