Skip to content

Update credential watcher to allow second credential watcher #1132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: add-read-only-file-plugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 61 additions & 21 deletions internal/watcher/credentials/credential_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"sync/atomic"
"time"

"github.com/nginx/agent/v3/internal/command"
"github.com/nginx/agent/v3/internal/grpc"

"github.com/fsnotify/fsnotify"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
Expand All @@ -29,55 +32,74 @@ var emptyEvent = fsnotify.Event{

type CredentialUpdateMessage struct {
CorrelationID slog.Attr
Conn *grpc.GrpcConnection
SeverType command.ServerType
}

type CredentialWatcherService struct {
agentConfig *config.Config
watcher *fsnotify.Watcher
filesBeingWatched *sync.Map
filesChanged *atomic.Bool
serverType command.ServerType
watcherMutex sync.Mutex
}

func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService {
func NewCredentialWatcherService(agentConfig *config.Config, serverType command.ServerType) *CredentialWatcherService {
filesChanged := &atomic.Bool{}
filesChanged.Store(false)

return &CredentialWatcherService{
agentConfig: agentConfig,
filesBeingWatched: &sync.Map{},
filesChanged: filesChanged,
serverType: serverType,
watcherMutex: sync.Mutex{},
}
}

func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) {
slog.DebugContext(ctx, "Starting credential watcher monitoring")
newCtx := context.WithValue(
ctx,
logger.ServerTypeContextKey,
slog.Any(logger.ServerTypeKey, cws.serverType.String()),
)
slog.DebugContext(newCtx, "Starting credential watcher monitoring")

ticker := time.NewTicker(monitoringInterval)
watcher, err := fsnotify.NewWatcher()
if err != nil {
slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err)
slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err)
return
}

cws.watcher = watcher

cws.watchFiles(ctx, credentialPaths(cws.agentConfig))
cws.watcherMutex.Lock()
commandSever := cws.agentConfig.Command

if cws.serverType == command.Auxiliary {
commandSever = cws.agentConfig.AuxiliaryCommand
}

cws.watchFiles(newCtx, credentialPaths(commandSever))
cws.watcherMutex.Unlock()

for {
select {
case <-ctx.Done():
case <-newCtx.Done():
closeError := cws.watcher.Close()
if closeError != nil {
slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError)
slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError)
}

return
case event := <-cws.watcher.Events:
cws.handleEvent(ctx, event)
cws.handleEvent(newCtx, event)
case <-ticker.C:
cws.checkForUpdates(ctx, ch)
cws.checkForUpdates(newCtx, ch)
case watcherError := <-cws.watcher.Errors:
slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError)
slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError)
}
}
}
Expand Down Expand Up @@ -146,31 +168,49 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha
slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()),
)

cws.watcherMutex.Lock()
defer cws.watcherMutex.Unlock()

commandSever := cws.agentConfig.Command
if cws.serverType == command.Auxiliary {
commandSever = cws.agentConfig.AuxiliaryCommand
}

conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandSever)
if err != nil {
slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err)
cws.filesChanged.Store(false)

return
}
slog.DebugContext(ctx, "Credential watcher has detected changes")
ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)}
ch <- CredentialUpdateMessage{
CorrelationID: logger.CorrelationIDAttr(newCtx),
SeverType: cws.serverType, Conn: conn,
}
cws.filesChanged.Store(false)
}
}

func credentialPaths(agentConfig *config.Config) []string {
func credentialPaths(agentConfig *config.Command) []string {
var paths []string

if agentConfig.Command.Auth != nil {
if agentConfig.Command.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Command.Auth.TokenPath)
if agentConfig.Auth != nil {
if agentConfig.Auth.TokenPath != "" {
paths = append(paths, agentConfig.Auth.TokenPath)
}
}

// agent's tls certs
if agentConfig.Command.TLS != nil {
if agentConfig.Command.TLS.Ca != "" {
paths = append(paths, agentConfig.Command.TLS.Ca)
if agentConfig.TLS != nil {
if agentConfig.TLS.Ca != "" {
paths = append(paths, agentConfig.TLS.Ca)
}
if agentConfig.Command.TLS.Cert != "" {
paths = append(paths, agentConfig.Command.TLS.Cert)
if agentConfig.TLS.Cert != "" {
paths = append(paths, agentConfig.TLS.Cert)
}
if agentConfig.Command.TLS.Key != "" {
paths = append(paths, agentConfig.Command.TLS.Key)
if agentConfig.TLS.Key != "" {
paths = append(paths, agentConfig.TLS.Key)
}
}

Expand Down
18 changes: 10 additions & 8 deletions internal/watcher/credentials/credential_watcher_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

"github.com/nginx/agent/v3/internal/command"

"github.com/nginx/agent/v3/internal/config"

"github.com/fsnotify/fsnotify"
Expand All @@ -22,15 +24,15 @@ import (
)

func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) {
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig())
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), command.Command)

assert.Empty(t, credentialWatcherService.filesBeingWatched)
assert.False(t, credentialWatcherService.filesChanged.Load())
}

func TestCredentialWatcherService_Watch(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) {
}

func TestCredentialWatcherService_isWatching(t *testing.T) {
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
assert.False(t, cws.isWatching("test-file"))
cws.filesBeingWatched.Store("test-file", true)
assert.True(t, cws.isWatching("test-file"))
Expand All @@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) {

func TestCredentialWatcherService_addWatcher(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
var files []string

ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {

func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand All @@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) {

func TestCredentialWatcherService_handleEvent(t *testing.T) {
ctx := context.Background()
cws := NewCredentialWatcherService(types.AgentConfig())
cws := NewCredentialWatcherService(types.AgentConfig(), command.Command)
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
cws.watcher = watcher
Expand Down Expand Up @@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig)
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig)
})
}
}
64 changes: 32 additions & 32 deletions internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"slices"
"sync"

"github.com/nginx/agent/v3/internal/model"
"github.com/nginx/agent/v3/internal/command"

"github.com/nginx/agent/v3/internal/grpc"
"github.com/nginx/agent/v3/internal/model"

"github.com/nginx/agent/v3/internal/watcher/credentials"

Expand All @@ -40,12 +40,14 @@ type (
instanceWatcherService instanceWatcherServiceInterface
healthWatcherService *health.HealthWatcherService
fileWatcherService *file.FileWatcherService
credentialWatcherService credentialWatcherServiceInterface
commandCredentialWatcherService credentialWatcherServiceInterface
auxiliaryCredentialWatcherService credentialWatcherServiceInterface
instanceUpdatesChannel chan instance.InstanceUpdatesMessage
nginxConfigContextChannel chan instance.NginxConfigContextMessage
instanceHealthChannel chan health.InstanceHealthMessage
fileUpdatesChannel chan file.FileUpdateMessage
credentialUpdatesChannel chan credentials.CredentialUpdateMessage
commandCredentialUpdatesChannel chan credentials.CredentialUpdateMessage
auxiliaryCredentialUpdatesChannel chan credentials.CredentialUpdateMessage
cancel context.CancelFunc
instancesWithConfigApplyInProgress []string
watcherMutex sync.Mutex
Expand Down Expand Up @@ -78,12 +80,14 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
instanceWatcherService: instance.NewInstanceWatcherService(agentConfig),
healthWatcherService: health.NewHealthWatcherService(agentConfig),
fileWatcherService: file.NewFileWatcherService(agentConfig),
credentialWatcherService: credentials.NewCredentialWatcherService(agentConfig),
commandCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Command),
auxiliaryCredentialWatcherService: credentials.NewCredentialWatcherService(agentConfig, command.Auxiliary),
instanceUpdatesChannel: make(chan instance.InstanceUpdatesMessage),
nginxConfigContextChannel: make(chan instance.NginxConfigContextMessage),
instanceHealthChannel: make(chan health.InstanceHealthMessage),
fileUpdatesChannel: make(chan file.FileUpdateMessage),
credentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
commandCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
auxiliaryCredentialUpdatesChannel: make(chan credentials.CredentialUpdateMessage),
instancesWithConfigApplyInProgress: []string{},
watcherMutex: sync.Mutex{},
}
Expand All @@ -100,7 +104,11 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface

go w.instanceWatcherService.Watch(watcherContext, w.instanceUpdatesChannel, w.nginxConfigContextChannel)
go w.healthWatcherService.Watch(watcherContext, w.instanceHealthChannel)
go w.credentialWatcherService.Watch(watcherContext, w.credentialUpdatesChannel)
go w.commandCredentialWatcherService.Watch(watcherContext, w.commandCredentialUpdatesChannel)

if w.agentConfig.AuxiliaryCommand != nil {
go w.auxiliaryCredentialWatcherService.Watch(watcherContext, w.auxiliaryCredentialUpdatesChannel)
}

if w.agentConfig.IsFeatureEnabled(pkgConfig.FeatureFileWatcher) {
go w.fileWatcherService.Watch(watcherContext, w.fileUpdatesChannel)
Expand Down Expand Up @@ -132,8 +140,6 @@ func (*Watcher) Info() *bus.Info {

func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.CredentialUpdatedTopic:
w.handleCredentialUpdate(ctx)
case bus.ConfigApplyRequestTopic:
w.handleConfigApplyRequest(ctx, msg)
case bus.ConfigApplySuccessfulTopic:
Expand All @@ -149,7 +155,6 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {

func (*Watcher) Subscriptions() []string {
return []string{
bus.CredentialUpdatedTopic,
bus.ConfigApplyRequestTopic,
bus.ConfigApplySuccessfulTopic,
bus.ConfigApplyCompleteTopic,
Expand Down Expand Up @@ -248,34 +253,29 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag
w.fileWatcherService.SetEnabled(true)
}

func (w *Watcher) handleCredentialUpdate(ctx context.Context) {
slog.DebugContext(ctx, "Watcher plugin received credential update message")

w.watcherMutex.Lock()
// This will be changed/moved during the credential watcher PR
conn, err := grpc.NewGrpcConnection(ctx, w.agentConfig, w.agentConfig.Command)
if err != nil {
slog.ErrorContext(ctx, "Unable to create new grpc connection", "error", err)
w.watcherMutex.Unlock()

return
}
w.watcherMutex.Unlock()
w.messagePipe.Process(ctx, &bus.Message{
Topic: bus.ConnectionResetTopic, Data: conn,
})
}

func (w *Watcher) monitorWatchers(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case message := <-w.credentialUpdatesChannel:
slog.DebugContext(ctx, "Received credential update event")
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
case message := <-w.commandCredentialUpdatesChannel:
slog.DebugContext(ctx, "Received credential update event for command server")
newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID),
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey,
message.SeverType.String()))

w.messagePipe.Process(newCtx, &bus.Message{
Topic: bus.ConnectionResetTopic, Data: message.Conn,
})

case message := <-w.auxiliaryCredentialUpdatesChannel:
slog.DebugContext(ctx, "Received credential update event for auxiliary command server")
newCtx := context.WithValue(context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID),
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey,
message.SeverType.String()))

w.messagePipe.Process(newCtx, &bus.Message{
Topic: bus.CredentialUpdatedTopic, Data: nil,
Topic: bus.ConnectionResetTopic, Data: message.Conn,
})
case message := <-w.instanceUpdatesChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
Expand Down
Loading