diff --git a/pkg/settings/collection/hub.go b/pkg/settings/collection/hub.go index f8d07fa3b6..9c66dbdc2f 100644 --- a/pkg/settings/collection/hub.go +++ b/pkg/settings/collection/hub.go @@ -235,7 +235,7 @@ func (h *hub) run(stopCh <-chan struct{}) { select { case client := <-h.registerCh: h.clients[client] = struct{}{} - for _, topic := range client.subscribedTopics { + for _, topic := range client.SubscribedTopics() { t, ok := h.topics[topic] if !ok { continue diff --git a/pkg/settings/collection/store.go b/pkg/settings/collection/store.go index cd06ee4838..fa497f72d8 100644 --- a/pkg/settings/collection/store.go +++ b/pkg/settings/collection/store.go @@ -141,7 +141,7 @@ func (b *bucketStore) list(ctx context.Context) ([]*settingsv1.CollectionRule, e // serve from cache if available b.cacheLock.RLock() if b.cache != nil { - b.cacheLock.RUnlock() + defer b.cacheLock.RUnlock() return b.cache.Rules, nil } b.cacheLock.RUnlock() diff --git a/pkg/settings/collection/websocket.go b/pkg/settings/collection/websocket.go index 37df049ed4..9177057085 100644 --- a/pkg/settings/collection/websocket.go +++ b/pkg/settings/collection/websocket.go @@ -23,8 +23,10 @@ type client struct { // The websocket connection. conn *websocket.Conn - role Role - subscribedTopics []string + role Role + + subscribedTopics []string + subscribedTopicsMtx sync.Mutex // Buffered channel of outbound messages. send chan []byte @@ -37,6 +39,12 @@ func (c *client) close() { }) } +func (c *client) SubscribedTopics() []string { + c.subscribedTopicsMtx.Lock() + defer c.subscribedTopicsMtx.Unlock() + return c.subscribedTopics +} + func (c *client) isRuleManager() bool { return c.role&RuleManager == RuleManager } @@ -91,7 +99,11 @@ func (c *client) readPump() { } if p := msg.PayloadSubscribe; p != nil { - c.subscribedTopics = p.Topics + c.subscribedTopicsMtx.Lock() + c.subscribedTopics = make([]string, len(p.Topics)) + copy(c.subscribedTopics, p.Topics) + c.subscribedTopicsMtx.Unlock() + level.Debug(c.logger).Log("msg", "client subscribing", "topics", fmt.Sprintf("%v", p.Topics)) c.hub.registerCh <- c } else if p := msg.PayloadData; p != nil { @@ -111,9 +123,10 @@ func (c *client) readPump() { continue } level.Info(c.logger).Log("msg", "received rule delete", "id", p.Id) - id := p.Id + ruleID := p.Id + msgID := msg.Id c.hub.rulesCh <- func(h *hub) { - handleResult(msg.Id, h.deleteRule(ctx, id)) + handleResult(msgID, h.deleteRule(ctx, ruleID)) } } else if p := msg.PayloadRuleInsert; p != nil { if !c.isRuleManager() { @@ -121,8 +134,10 @@ func (c *client) readPump() { continue } level.Info(c.logger).Log("msg", "received rule insert", "rule", p.Rule) + id := msg.Id + settings := p.CloneVT() c.hub.rulesCh <- func(h *hub) { - handleResult(msg.Id, h.insertRule(ctx, p)) + handleResult(id, h.insertRule(ctx, settings)) } } else { level.Warn(c.logger).Log("msg", "no known message type used")