Skip to content

Commit

Permalink
Sample code to use the wizard bus to tie the user watcher with the
Browse files Browse the repository at this point in the history
session shutdown
  • Loading branch information
fguimond committed Feb 27, 2024
1 parent 0311466 commit 6238877
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 39 deletions.
1 change: 1 addition & 0 deletions backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}
a.bus.Publish("userChanges", event.User.Username)

return nil
}
Expand Down
65 changes: 27 additions & 38 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ type Session struct {
unmarshal agent.UnmarshalFunc
entityConfig *entityConfig
userConfig *userConfig

Check failure on line 98 in backend/agentd/session.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

field userConfig is unused (U1000)
user string
mu sync.Mutex
subscriptionsMap map[string]subscription
userReceiver *UserReceiver
}

// subscription is used to abstract a message.Subscription and therefore allow
Expand Down Expand Up @@ -177,6 +179,20 @@ func (b *BurialReceiver) Receiver() chan<- interface{} {
return b.ch
}

type UserReceiver struct {
ch chan interface{}
}

func NewUserReceiver() *UserReceiver {
return &UserReceiver{
ch: make(chan interface{}, 1),
}
}

func (b *UserReceiver) Receiver() chan<- interface{} {
return b.ch
}

// NewSession creates a new Session object given the triple of a transport
// connection, message bus, and store.
// The Session is responsible for stopping itself, and does so when it
Expand Down Expand Up @@ -205,6 +221,8 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
ringPool: cfg.RingPool,
unmarshal: cfg.Unmarshal,
marshal: cfg.Marshal,
user: cfg.User,
userReceiver: NewUserReceiver(),
entityConfig: &entityConfig{
subscriptions: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
Expand All @@ -223,6 +241,11 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
}()
}

_, err := s.bus.Subscribe("userUpdates", cfg.AgentName, s.userReceiver)
if err != nil {
return nil, err
}

if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil {
return nil, err
}
Expand Down Expand Up @@ -334,48 +357,14 @@ func (s *Session) sender() {
var msg *transport.Message
select {
//2608 ---- user -----
case u := <-s.userConfig.updatesChannel:
var usr *corev2.User
watchEvent, ok := u.(*store.WatchEventUserConfig)
case u := <-s.userReceiver.ch:
user, ok := u.(corev2.User)

Check failure on line 361 in backend/agentd/session.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

this value of ok is never used (SA4006)
if !ok {

Check failure on line 362 in backend/agentd/session.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

empty branch (SA9003)
logger.Errorf("Session received unexpected struct: %T", u)
continue
}

// Handle the delete and unknown watch events
switch watchEvent.Action {
case store.WatchDelete:
//stop session
return
case store.WatchUnknown:
logger.Error("session received unknown watch event")
continue
}

if watchEvent.User == nil {
logger.Error("session received nil user in watch event")
continue
}

lager := logger.WithFields(logrus.Fields{
"action": watchEvent.Action.String(),
"user": watchEvent.User.GetMetadata().Name,
"namespace": watchEvent.User.GetMetadata().Namespace,
})
logger.Debug("User update received")

//bytes, err := s.marshal(watchEvent.User)
//if err != nil {
// lager.WithError(err).Error("session failed to serialize entity config")
// continue
//}

// determine if user was disabled
if err := usr.Disabled; err {
lager.Debug("The user is now disabled ", err)
if user.Disabled && user.Username == s.user {
return
}
//msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// -----entity -------

case e := <-s.entityConfig.updatesChannel:
Expand Down
2 changes: 1 addition & 1 deletion backend/agentd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s
Context: ctx,
StoreName: new(corev2.User).StoreName(),
})
w := etcdstore.Watch(ctx, client, key, true)
w := etcdstore.Watch(ctx, client, key, true, clientv3.WithFilterPut())
ch := make(chan store.WatchEventUserConfig, 1)

go func() {
Expand Down

0 comments on commit 6238877

Please sign in to comment.