Skip to content

Commit

Permalink
unmarshal issue
Browse files Browse the repository at this point in the history
Signed-off-by: SudhanshuBawane <[email protected]>
  • Loading branch information
SudhanshuBawane committed Mar 5, 2024
1 parent 02d1133 commit e758c97
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 61 deletions.
7 changes: 5 additions & 2 deletions backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (a *Agentd) Start() error {
func (a *Agentd) runWatcher() {
defer func() {
logger.Warn("shutting down entity config watcher")
logger.Warn("shutting down user config watcher")
}()
for {
select {
Expand Down Expand Up @@ -324,8 +325,10 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}
topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.GetMetadata().Name)
if err := a.bus.Publish(topic, &event); err != nil {
topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.Username)
if err := a.bus.Publish(topic, event.User.Username); err != nil {
logger.WithField("topic", topic).WithError(err).
Error("unable to publish a user config update to the bus")
return err
}
//a.bus.Publish("userChanges", event.User.Username)
Expand Down
99 changes: 55 additions & 44 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,59 +366,72 @@ func (s *Session) sender() {
case u := <-s.userReceiver.ch:
user, ok := u.(*corev2.User)
if !ok {
logger.WithField("key", ok)
logger.WithField("unexpected user struct", ok)
}

configUser, err := s.marshal(user)
if err != nil {
logger.WithError(err).Error("session failed to serialize the user config")
}

if user.Disabled && user.Username == s.user {
return
}

msg = transport.NewMessage(user.Username, configUser)

//case u := <-s.userConfig.updatesChannel:
// watchEvent, ok := u.(*store.WatchEventUserConfig)
// fmt.Println("========== usrConfig Updates ========", watchEvent)
// if !ok {
// logger.Errorf("session received unexoected struct : %T", u)
// logger.Errorf("session received unexoected user struct : %T", u)
// continue
// }
//
// if watchEvent.User.Disabled && watchEvent.User.Username == s.user {
// fmt.Println("--------action --------", store.WatchCreate, store.WatchDelete, store.WatchUnknown)
// //if watchEvent.User.Disabled && watchEvent.User.Username == s.user {
// // return
// //}
// fmt.Println("========== usrConfig Updates ========", watchEvent)
// //// Handle the delete/disable event
// switch watchEvent.Action {
// case store.WatchDelete:
// fmt.Println(" ======= delete =======", store.WatchDelete)
// return
// case store.WatchCreate:
// fmt.Println("======= create user ====", store.WatchCreate)
// case store.WatchUpdate:
// fmt.Println("==== user update ======", store.WatchUpdate)
//
// }
// //fmt.Println("========== usrConfig Updates ========", watchEvent)
////// Handle the delete/disable event
////switch watchEvent.Action {
////case store.WatchDelete:
//// return
////}
//
//if watchEvent.User == nil {
// logger.Error("session received nil user in watch event")
//}
////
//lagger := logger.WithFields(logrus.Fields{
// "action": watchEvent.Action.String(),
// "user": watchEvent.User.GetMetadata().Name,
// "namespace": watchEvent.User.GetMetadata().Namespace,
//})
//lagger.Debug("user update received")
// if watchEvent.User == nil {
// logger.Error("session received nil user in watch event")
// }
// //
// lagger := logger.WithFields(logrus.Fields{
// "action": watchEvent.Action.String(),
// "user": watchEvent.User.Username,
// "namespace": watchEvent.User.GetMetadata().Namespace,
// })
// lagger.Debug("user update received")
//
//configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User)
//wrapper, err := storev2.WrapResource(watchEvent.User)
//if err != nil {
// lagger.WithError(err).Error("could not warp the user config")
// continue
//}
// configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User)
// wrapper, err := storev2.WrapResource(watchEvent.User)
// if err != nil {
// lagger.WithError(err).Error("could not warp the user config")
// continue
// }
//
//if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil {
// sessionErrorCounter.WithLabelValues(err.Error()).Inc()
// lagger.WithError(err).Error("could not update the user config")
//}

//bytes, err := s.marshal(watchEvent.User)
//if err != nil {
// lagger.WithError(err).Error("session failed to serialize user config")
//}
//msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)
// if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil {
// sessionErrorCounter.WithLabelValues(err.Error()).Inc()
// lagger.WithError(err).Error("could not update the user config")
// }
//
// bytes, err := s.marshal(watchEvent.User)
// if err != nil {
// lagger.WithError(err).Error("session failed to serialize user config")
// }
// msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// ---- entity ----//
case e := <-s.entityConfig.updatesChannel:
Expand Down Expand Up @@ -548,8 +561,8 @@ func (s *Session) sender() {
// 2. Start receiver
// 3. Start goroutine that waits for context cancellation, and shuts down service.
func (s *Session) Start() (err error) {
defer close(s.entityConfig.subscriptions)
defer close(s.userConfig.subscription)
defer close(s.entityConfig.subscriptions)

sessionCounter.WithLabelValues(s.cfg.Namespace).Inc()
s.wg = &sync.WaitGroup{}
Expand Down Expand Up @@ -603,13 +616,11 @@ func (s *Session) Start() (err error) {
lager.Debug("no user config found")

// Indicate to the agent that this user does not exist
meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace)
//meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace)
watchEvent := &store.WatchEventUserConfig{
User: &corev2.User{
Username: s.user,
},
Action: store.WatchCreate,
Metadata: &meta,
User: &corev2.User{},
Action: store.WatchCreate,
//Metadata: &meta,
}
err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent)
if err != nil {
Expand Down Expand Up @@ -742,8 +753,8 @@ func (s *Session) stop() {
logger.WithError(err).Error("error closing session")
}
}()
defer close(s.entityConfig.updatesChannel)
defer close(s.userConfig.updatesChannel)
defer close(s.entityConfig.updatesChannel)
defer close(s.checkChannel)

sessionCounter.WithLabelValues(s.cfg.Namespace).Dec()
Expand Down
31 changes: 21 additions & 10 deletions backend/agentd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agentd
import (
"context"
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
Expand Down Expand Up @@ -73,19 +74,21 @@ func GetEntityConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan
// GetUserConfigWatcher watches changes to the UserConfig in etcd and publish them -- git#2806
// over the bus as store.WatchEventUserConfig
func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan store.WatchEventUserConfig {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//ctx, cancel := context.WithCancel(context.Background())
//defer cancel()

key := etcdstorev2.StoreKey(storev2.ResourceRequest{
Context: ctx,
StoreName: new(corev2.User).StoreName(),
})
w := etcdstore.Watch(ctx, client, key, true, clientv3.WithFilterPut())
w := etcdstore.Watch(ctx, client, key, true)
fmt.Sprintf("======= user metadata ========= w : %v, ctx : %v, client: %v, key: %v", w, ctx, client, key)

Check failure on line 85 in backend/agentd/watcher.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

Sprintf doesn't have side effects and its return value is ignored (SA4017)
ch := make(chan store.WatchEventUserConfig, 1)

go func() {
defer close(ch)
for response := range w.Result() {
fmt.Println("======= user config response ========", response)
if response.Type == store.WatchError {
logger.
WithError(errors.New(string(response.Object))).
Expand All @@ -100,18 +103,26 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s
userConfig corev2.User
)

// Decode and unwrap the entity config
// Decode and unwrap the user config

if err := proto.Unmarshal(response.Object, &configWrapper); err != nil {
//if err := proto.Unmarshal(response.Object, &configWrapper); err != nil {
// fmt.Println("====== unmarshaled user =======", configWrapper)
// logger.WithField("key", response.Key).WithError(err).
// Error("unable to unmarshal user config from key")
// continue
//}

if err := configWrapper.UnwrapInto(&userConfig); err != nil {
fmt.Println("====== unmarshaled user =======", userConfig)
logger.WithField("key", response.Key).WithError(err).
Error("unable to unmarshal user config from key")
Error("unable to unwrap entity config from key")
continue
}

// Remove the managed_by label if the value is sensu-agent, in case the user is disabled
if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel)
}
//if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
// delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel)
//}

ch <- store.WatchEventUserConfig{
User: &userConfig,
Expand All @@ -120,6 +131,6 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s
}
}()

logger.Println("----watch metadata----", w)
logger.Println("========= watch metadata ========", w)
return ch
}
10 changes: 10 additions & 0 deletions backend/agentd/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ func TestGetEntityConfigWatcher(t *testing.T) {
ch := GetEntityConfigWatcher(context.Background(), client)
assert.NotNil(t, ch)
}

func TestGetUserConfigWatcher(t *testing.T) {
e, cleanup := etcd.NewTestEtcd(t)
defer cleanup()
client := e.NewEmbeddedClient()
defer client.Close()

ch := GetUserConfigWatcher(context.Background(), client)
assert.NotNil(t, ch)
}
9 changes: 4 additions & 5 deletions backend/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

corev2 "github.com/sensu/core/v2"
v2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/backend/store/patch"
"github.com/sensu/sensu-go/types"
Expand Down Expand Up @@ -160,10 +159,10 @@ type WatchEventEntityConfig struct {
// WatchEventUserConfig contains and updated entity config and the action that
// occurred during this modification
type WatchEventUserConfig struct {
User *corev2.User
Action WatchActionType
Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"`
//Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"`
User *corev2.User
Action WatchActionType
//Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"`
////Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"`
}

// Store is used to abstract the durable storage used by the Sensu backend
Expand Down

0 comments on commit e758c97

Please sign in to comment.