Skip to content

Fix panic when updating NGINX config and fix remotely enabling/disabling the metrics feature #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 37 commits into from
Closed
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6e3dc83
fix: fixing panic when updating config concurrently
bejjrajesh Mar 19, 2025
a097938
fix: adding nil check
bejjrajesh Mar 19, 2025
7d743bd
fix: adding nil check
bejjrajesh Mar 20, 2025
cfd35a2
fix: making toggle true once agent connected
bejjrajesh Apr 9, 2025
d455735
fix: adding debug log for metricsSender readyToSend Status
bejjrajesh Apr 9, 2025
c7dbfb5
fix: updating with main
bejjrajesh Apr 11, 2025
4b8fd54
fix: debugging
bejjrajesh Apr 11, 2025
0928fc0
fix: debugging
bejjrajesh Apr 11, 2025
779ef6c
fix: updating config reader features when on disk config changed
bejjrajesh Apr 11, 2025
fd17a92
fix: updating readyToSend when config changed
bejjrajesh Apr 12, 2025
dbee456
fix: updating readyToSend when config changed
bejjrajesh Apr 14, 2025
92f037b
fix: updating readyToSend when config changed
bejjrajesh Apr 15, 2025
f65680f
fix: adding conf to metrics sender
bejjrajesh Apr 16, 2025
5adfeac
fix: removing debug logs
bejjrajesh Apr 17, 2025
9c9f838
fix: fixing import issue
bejjrajesh Apr 17, 2025
a8dcf7b
Add nil pointer check to GenerateMetricsReportBundle (#1047)
dhurley Apr 16, 2025
fcb499e
Update net and nats dependencies (#1070)
dhurley May 1, 2025
1ec3554
fix: updating with main
bejjrajesh May 7, 2025
68eca9f
fix: running go deps
bejjrajesh May 7, 2025
468e6cc
fix: updating go.sum
bejjrajesh May 7, 2025
1dd35f0
fix: fixing test failures
bejjrajesh May 7, 2025
c299b17
fix: updating vendor folders
bejjrajesh May 7, 2025
2b35964
fix: updating go.sum
bejjrajesh May 7, 2025
16bb285
fix: fixing conflict
bejjrajesh May 7, 2025
106bc87
fix: fixing minor bug
bejjrajesh May 8, 2025
a9725c7
fix: fixing imports
bejjrajesh May 8, 2025
7892d7e
fix: updating default features
bejjrajesh May 9, 2025
c922b0f
fix: updating default features
bejjrajesh May 10, 2025
21d2a0c
fix: adding debug log
bejjrajesh May 10, 2025
9a8dfac
fix: removing lock when de-registering
bejjrajesh May 10, 2025
8573d86
fix: removing lock when de-registering
bejjrajesh May 10, 2025
4832ba4
fix: fixing compile issue
bejjrajesh May 10, 2025
49b2173
fix: fixing compile issue
bejjrajesh May 10, 2025
0837ab5
fix: fixing metrics-sender not initializing issue
bejjrajesh May 11, 2025
691d54b
fix: adding debug log
bejjrajesh May 11, 2025
f4fb5d0
fix: adding debug logs
bejjrajesh May 11, 2025
e06e490
fix: adding metrics sender issue
bejjrajesh May 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 249 additions & 68 deletions go.work.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/plugins/common.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E

if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
corePlugins = append(corePlugins,
NewMetricsSender(reporter),
NewMetricsSender(reporter, loadedConfig),
)
}

3 changes: 3 additions & 0 deletions src/plugins/config_reader.go
Original file line number Diff line number Diff line change
@@ -152,7 +152,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
}

if synchronizeFeatures {
log.Debugf("agent config features changed, synchronizing features")
r.synchronizeFeatures(payloadAgentConfig)
r.config.Features = payloadAgentConfig.Details.Features
}

r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig))
@@ -164,6 +166,7 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
r.detailsMu.RLock()
for _, feature := range r.config.Features {
if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync {
log.Debugf("config_reader: deregistering the feature %s", feature)
r.deRegisterPlugin(feature)
}
}
47 changes: 27 additions & 20 deletions src/plugins/dataplane_status.go
Original file line number Diff line number Diff line change
@@ -23,25 +23,26 @@ import (
)

type DataPlaneStatus struct {
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
nginxConfigActivityStatusesMutex sync.RWMutex
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
}

const (
@@ -81,7 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {

func (dps *DataPlaneStatus) Close() {
log.Info("DataPlaneStatus is wrapping up")
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses = nil
dps.nginxConfigActivityStatusesMutex.Unlock()
dps.softwareDetailsMutex.Lock()
dps.softwareDetails = nil
dps.softwareDetailsMutex.Unlock()
@@ -144,8 +147,10 @@ func (dps *DataPlaneStatus) Subscriptions() []string {

func (dps *DataPlaneStatus) updateNginxConfigActivityStatuses(newAgentActivityStatus *proto.AgentActivityStatus) {
log.Tracef("DataplaneStatus: Updating nginxConfigActivityStatuses with %v", newAgentActivityStatus)
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); ok {
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); dps.nginxConfigActivityStatuses != nil && ok {
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses[newAgentActivityStatus.GetNginxConfigStatus().GetNginxId()] = newAgentActivityStatus
dps.nginxConfigActivityStatusesMutex.Unlock()
}
}

@@ -184,6 +189,8 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)

dps.nginxConfigActivityStatusesMutex.Lock()
defer dps.nginxConfigActivityStatusesMutex.Unlock()
agentActivityStatuses := []*proto.AgentActivityStatus{}
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)
19 changes: 13 additions & 6 deletions src/plugins/features.go
Original file line number Diff line number Diff line change
@@ -135,7 +135,7 @@ func (f *Features) Process(msg *core.Message) {

func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {

log.Debugf("features.go: enabling metrics feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -144,7 +144,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {

metrics := NewMetrics(f.conf, f.env, f.binary, f.processes)
metricsThrottle := NewMetricsThrottle(f.conf, f.env)
metricsSender := NewMetricsSender(f.commander)
metricsSender := NewMetricsSender(f.commander, conf)

return []core.Plugin{metrics, metricsThrottle, metricsSender}
}
@@ -154,7 +154,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) {

log.Debugf("features.go: enabling metrics-collection feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -171,7 +171,7 @@ func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) {

log.Debugf("features.go: enabling metrics-throttle feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -188,14 +188,14 @@ func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) {

log.Debugf("features.go: enabling metrics-sender feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
f.conf = conf

metricsSender := NewMetricsSender(f.commander)
metricsSender := NewMetricsSender(f.commander, conf)

return []core.Plugin{metricsSender}
}
@@ -205,6 +205,7 @@ func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) {
conf, err := config.GetConfig(f.conf.ClientID)
log.Debugf("features.go: enabling agent-api feature")
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
}
@@ -219,6 +220,7 @@ func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {

func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) {
log.Debugf("features.go: enabling registration feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -234,6 +236,7 @@ func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {

func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureDataPlaneStatus) {
log.Debugf("features.go: enabling dataplane-status feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -249,6 +252,7 @@ func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {

func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureProcessWatcher) {
log.Debugf("features.go: enabling process-watcher feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -264,6 +268,7 @@ func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {

func (f *Features) enableActivityEventsFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureActivityEvents) {
log.Debugf("features.go: enabling activity-events feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -279,6 +284,7 @@ func (f *Features) enableActivityEventsFeature(_ string) []core.Plugin {

func (f *Features) enableFileWatcherFeature(_ string) []core.Plugin {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureFileWatcher) {
log.Debugf("features.go: enabling file-watcher feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
@@ -297,6 +303,7 @@ func (f *Features) enableNginxCountingFeature(_ string) []core.Plugin {
countingPlugins := []core.Plugin{}
if len(f.conf.Nginx.NginxCountingSocket) > 0 {
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureNginxCounting) {
log.Debugf("features.go: enabling nginx-counting feature")
conf, err := config.GetConfig(f.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
68 changes: 54 additions & 14 deletions src/plugins/metrics_sender.go
Original file line number Diff line number Diff line change
@@ -9,32 +9,36 @@ package plugins

import (
"context"
"strings"

"github.com/nginx/agent/sdk/v2"
agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/client"
"github.com/nginx/agent/sdk/v2/proto"
models "github.com/nginx/agent/sdk/v2/proto/events"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"strings"
"sync"

log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)

type MetricsSender struct {
reporter client.MetricReporter
pipeline core.MessagePipeInterface
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
reporter client.MetricReporter
pipeline core.MessagePipeInterface
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
readyToSendMu sync.RWMutex
conf *config.Config
}

func NewMetricsSender(reporter client.MetricReporter) *MetricsSender {
func NewMetricsSender(reporter client.MetricReporter, config *config.Config) *MetricsSender {
return &MetricsSender{
reporter: reporter,
started: atomic.NewBool(false),
readyToSend: atomic.NewBool(false),
conf: config,
}
}

@@ -50,31 +54,44 @@ func (r *MetricsSender) Init(pipeline core.MessagePipeInterface) {

func (r *MetricsSender) Close() {
log.Info("MetricsSender is wrapping up")
r.readyToSendMu.Lock()
r.started.Store(false)
r.readyToSend.Store(false)
defer r.readyToSendMu.Unlock()
}

func (r *MetricsSender) Info() *core.Info {
return core.NewInfo(agent_config.FeatureMetricsSender, "v0.0.1")
}

func (r *MetricsSender) Process(msg *core.Message) {

if msg.Exact(core.AgentConnected) {
r.readyToSend.Toggle()
log.Debugf("metrics_sender: agent connected %s", strings.Join(r.conf.Features, ","))
if r.conf.Features != nil && r.isFeatureEnabled(r.conf.Features) {
r.readyToSendMu.Lock()
r.readyToSend.Store(true)
r.readyToSendMu.Unlock()
} else {
r.readyToSendMu.Lock()
r.readyToSend.Store(false)
r.readyToSendMu.Unlock()
}
return
}

if msg.Exact(core.CommMetrics) {
payloads, ok := msg.Data().([]core.Payload)
if !ok {
log.Warnf("Failed to coerce Message to []Payload: %v", msg.Data())
return
}
r.readyToSendMu.RLock()
defer r.readyToSendMu.RUnlock()
for _, p := range payloads {
if !r.readyToSend.Load() {
log.Debugf("metrics_sender is not ready to send the metrics")
continue
}

switch report := p.(type) {
case *proto.MetricsReport:
message := client.MessageFromMetrics(report)
@@ -99,9 +116,9 @@ func (r *MetricsSender) Process(msg *core.Message) {
}
}
} else if msg.Exact(core.AgentConfigChanged) {
switch config := msg.Data().(type) {
switch agentConfig := msg.Data().(type) {
case *proto.AgentConfig:
r.metricSenderBackoff(config)
r.metricSenderBackoff(agentConfig)
default:
log.Warnf("metrics sender expected %T type, but got: %T", &proto.AgentConfig{}, msg.Data())
}
@@ -110,7 +127,17 @@ func (r *MetricsSender) Process(msg *core.Message) {

func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) {
log.Debugf("update metric reporter client configuration to %+v", agentConfig)

if agentConfig.Details.Features != nil {
if r.isFeatureEnabled(agentConfig.Details.Features) {
r.readyToSendMu.Lock()
r.readyToSend.Store(true)
r.readyToSendMu.Unlock()
} else {
r.readyToSendMu.Lock()
r.readyToSend.Store(false)
r.readyToSendMu.Unlock()
}
}
if agentConfig.GetDetails() == nil || agentConfig.GetDetails().GetServer() == nil || agentConfig.GetDetails().GetServer().GetBackoff() == nil {
log.Debug("not updating metric reporter client configuration as new Agent backoff settings is nil")
return
@@ -123,3 +150,16 @@ func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) {
func (r *MetricsSender) Subscriptions() []string {
return []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged}
}

func (r *MetricsSender) isFeatureEnabled(features []string) bool {
var isFeatureEnabled bool
if features != nil {
for _, feature := range features {
if feature == agent_config.FeatureMetricsSender {
isFeatureEnabled = true
break
}
}
}
return isFeatureEnabled
}
8 changes: 5 additions & 3 deletions src/plugins/metrics_sender_test.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@ import (
"testing"
"time"

"github.com/nginx/agent/v2/src/core/config"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
@@ -43,7 +45,7 @@ func TestMetricsSenderSendMetrics(t *testing.T) {
ctx := context.TODO()
mockMetricsReportClient := tutils.NewMockMetricsReportClient()
mockMetricsReportClient.Mock.On("Send", ctx, mock.Anything).Return(test.err)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

assert.False(t, pluginUnderTest.started.Load())
assert.False(t, pluginUnderTest.readyToSend.Load())
@@ -110,7 +112,7 @@ func TestMetricsSenderBackoff(t *testing.T) {
t.Run(test.name, func(_ *testing.T) {
ctx := context.TODO()
mockMetricsReportClient := tutils.NewMockMetricsReportClient()
pluginUnderTest := NewMetricsSender(mockMetricsReportClient)
pluginUnderTest := NewMetricsSender(mockMetricsReportClient, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

pluginUnderTest.Init(core.NewMockMessagePipe(ctx))
pluginUnderTest.Process(core.NewMessage(core.AgentConnected, nil))
@@ -130,6 +132,6 @@ func TestMetricsSenderBackoff(t *testing.T) {
}

func TestMetricsSenderSubscriptions(t *testing.T) {
pluginUnderTest := NewMetricsSender(tutils.NewMockMetricsReportClient())
pluginUnderTest := NewMetricsSender(tutils.NewMockMetricsReportClient(), &config.Config{ClientID: "12345"})
assert.Equal(t, []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged}, pluginUnderTest.Subscriptions())
}
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ func TestNAPMonitoring(t *testing.T) {
return
}

metricsSender := plugins.NewMetricsSender(reporter)
metricsSender := plugins.NewMetricsSender(reporter, &config.Config{ClientID: "12345", Features: []string{"metrics-sender"}})

env := tutils.NewMockEnvironment()
env.On("NewHostInfo", testifyMock.Anything, testifyMock.Anything, testifyMock.Anything).Return(&sdkPb.HostInfo{