Skip to content

Commit

Permalink
Properly protect client and hub state
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Aug 21, 2024
1 parent f663c3e commit 76c6da6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/settings/collection/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (h *hub) run(stopCh <-chan struct{}) {
select {
case client := <-h.registerCh:
h.clients[client] = struct{}{}
for _, topic := range client.subscribedTopics {
for _, topic := range client.SubscribedTopics() {
t, ok := h.topics[topic]
if !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/settings/collection/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (b *bucketStore) list(ctx context.Context) ([]*settingsv1.CollectionRule, e
// serve from cache if available
b.cacheLock.RLock()
if b.cache != nil {
b.cacheLock.RUnlock()
defer b.cacheLock.RUnlock()
return b.cache.Rules, nil
}
b.cacheLock.RUnlock()
Expand Down
27 changes: 21 additions & 6 deletions pkg/settings/collection/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ type client struct {
// The websocket connection.
conn *websocket.Conn

role Role
subscribedTopics []string
role Role

subscribedTopics []string
subscribedTopicsMtx sync.Mutex

// Buffered channel of outbound messages.
send chan []byte
Expand All @@ -37,6 +39,12 @@ func (c *client) close() {
})
}

func (c *client) SubscribedTopics() []string {
c.subscribedTopicsMtx.Lock()
defer c.subscribedTopicsMtx.Unlock()
return c.subscribedTopics
}

func (c *client) isRuleManager() bool {
return c.role&RuleManager == RuleManager
}
Expand Down Expand Up @@ -91,7 +99,11 @@ func (c *client) readPump() {
}

if p := msg.PayloadSubscribe; p != nil {
c.subscribedTopics = p.Topics
c.subscribedTopicsMtx.Lock()
c.subscribedTopics = make([]string, len(p.Topics))
copy(c.subscribedTopics, p.Topics)
c.subscribedTopicsMtx.Unlock()

level.Debug(c.logger).Log("msg", "client subscribing", "topics", fmt.Sprintf("%v", p.Topics))
c.hub.registerCh <- c
} else if p := msg.PayloadData; p != nil {
Expand All @@ -111,18 +123,21 @@ func (c *client) readPump() {
continue
}
level.Info(c.logger).Log("msg", "received rule delete", "id", p.Id)
id := p.Id
ruleID := p.Id
msgID := msg.Id
c.hub.rulesCh <- func(h *hub) {
handleResult(msg.Id, h.deleteRule(ctx, id))
handleResult(msgID, h.deleteRule(ctx, ruleID))
}
} else if p := msg.PayloadRuleInsert; p != nil {
if !c.isRuleManager() {
level.Warn(c.logger).Log("msg", "not allowed without rule manager role")
continue
}
level.Info(c.logger).Log("msg", "received rule insert", "rule", p.Rule)
id := msg.Id
settings := p.CloneVT()
c.hub.rulesCh <- func(h *hub) {
handleResult(msg.Id, h.insertRule(ctx, p))
handleResult(id, h.insertRule(ctx, settings))
}
} else {
level.Warn(c.logger).Log("msg", "no known message type used")
Expand Down

0 comments on commit 76c6da6

Please sign in to comment.