Skip to content

Commit

Permalink
prometheus.operator.* - Fix issue with missing targets when one monit…
Browse files Browse the repository at this point in the history
…or's name is a prefix of another (#5862)

Co-authored-by: Paul Bormans <[email protected]>
  • Loading branch information
captncraig and Paul424 authored Nov 27, 2023
1 parent 84344fb commit f232fb4
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Main (unreleased)

- Fix agent crash when process null OTel's fan out consumers. (@hainenber)

- Fix issue in `prometheus.operator.*` where targets would be dropped if two crds share a common prefix in their names. (@Paul424, @captncraig)

v0.38.0 (2023-11-21)
--------------------

Expand Down
44 changes: 31 additions & 13 deletions component/prometheus/operator/common/crdmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,19 @@ const informerSyncTimeout = 10 * time.Second
// crdManager is all of the fields required to run a crd based component.
// on update, this entire thing should be recreated and restarted
type crdManager struct {
mut sync.Mutex
discoveryConfigs map[string]discovery.Configs
scrapeConfigs map[string]*config.ScrapeConfig
debugInfo map[string]*operator.DiscoveredResource
discoveryManager *discovery.Manager
scrapeManager *scrape.Manager
mut sync.Mutex

// these maps are keyed by job name
discoveryConfigs map[string]discovery.Configs
scrapeConfigs map[string]*config.ScrapeConfig

// list of keys to the above maps for a given resource by `ns/name`
crdsToMapKeys map[string][]string
// debug info by `kind/ns/name`
debugInfo map[string]*operator.DiscoveredResource

discoveryManager discoveryManager
scrapeManager scrapeManager
clusteringUpdated chan struct{}
ls labelstore.LabelStore

Expand Down Expand Up @@ -80,6 +87,7 @@ func newCrdManager(opts component.Options, cluster cluster.Cluster, logger log.L
cluster: cluster,
discoveryConfigs: map[string]discovery.Configs{},
scrapeConfigs: map[string]*config.ScrapeConfig{},
crdsToMapKeys: map[string][]string{},
debugInfo: map[string]*operator.DiscoveredResource{},
kind: kind,
clusteringUpdated: make(chan struct{}, 1),
Expand Down Expand Up @@ -392,6 +400,7 @@ func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {
AdditionalRelabelConfigs: c.args.RelabelConfigs,
ScrapeOptions: c.args.Scrape,
}
mapKeys := []string{}
for i, ep := range pm.Spec.PodMetricsEndpoints {
var scrapeConfig *config.ScrapeConfig
scrapeConfig, err = gen.GeneratePodMonitorConfig(pm, ep, i)
Expand All @@ -400,6 +409,7 @@ func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error generating scrapeconfig from podmonitor")
break
}
mapKeys = append(mapKeys, scrapeConfig.JobName)
c.mut.Lock()
c.discoveryConfigs[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
c.scrapeConfigs[scrapeConfig.JobName] = scrapeConfig
Expand All @@ -409,6 +419,9 @@ func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {
c.addDebugInfo(pm.Namespace, pm.Name, err)
return
}
c.mut.Lock()
c.crdsToMapKeys[fmt.Sprintf("%s/%s", pm.Namespace, pm.Name)] = mapKeys
c.mut.Unlock()
if err = c.apply(); err != nil {
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)
}
Expand Down Expand Up @@ -442,6 +455,8 @@ func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {
AdditionalRelabelConfigs: c.args.RelabelConfigs,
ScrapeOptions: c.args.Scrape,
}

mapKeys := []string{}
for i, ep := range sm.Spec.Endpoints {
var scrapeConfig *config.ScrapeConfig
scrapeConfig, err = gen.GenerateServiceMonitorConfig(sm, ep, i)
Expand All @@ -450,6 +465,7 @@ func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {
level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error generating scrapeconfig from serviceMonitor")
break
}
mapKeys = append(mapKeys, scrapeConfig.JobName)
c.mut.Lock()
c.discoveryConfigs[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
c.scrapeConfigs[scrapeConfig.JobName] = scrapeConfig
Expand All @@ -459,6 +475,9 @@ func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {
c.addDebugInfo(sm.Namespace, sm.Name, err)
return
}
c.mut.Lock()
c.crdsToMapKeys[fmt.Sprintf("%s/%s", sm.Namespace, sm.Name)] = mapKeys
c.mut.Unlock()
if err = c.apply(); err != nil {
level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)
}
Expand Down Expand Up @@ -503,6 +522,7 @@ func (c *crdManager) addProbe(p *promopv1.Probe) {
c.mut.Lock()
c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs
c.scrapeConfigs[pmc.JobName] = pmc
c.crdsToMapKeys[fmt.Sprintf("%s/%s", p.Namespace, p.Name)] = []string{pmc.JobName}
c.mut.Unlock()

if err = c.apply(); err != nil {
Expand Down Expand Up @@ -533,12 +553,10 @@ func (c *crdManager) onDeleteProbe(obj interface{}) {
func (c *crdManager) clearConfigs(ns, name string) {
c.mut.Lock()
defer c.mut.Unlock()
prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name)
for k := range c.discoveryConfigs {
if strings.HasPrefix(k, prefix) {
delete(c.discoveryConfigs, k)
delete(c.scrapeConfigs, k)
}

for _, k := range c.crdsToMapKeys[fmt.Sprintf("%s/%s", ns, name)] {
delete(c.discoveryConfigs, k)
delete(c.scrapeConfigs, k)
}
delete(c.debugInfo, prefix)
delete(c.debugInfo, fmt.Sprintf("%s/%s/%s", c.kind, ns, name))
}
168 changes: 168 additions & 0 deletions component/prometheus/operator/common/crdmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package common

import (
"testing"

"golang.org/x/exp/maps"

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/prometheus/operator"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/scrape"
"k8s.io/apimachinery/pkg/util/intstr"

promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/stretchr/testify/require"
)

func TestClearConfigsSameNsSamePrefix(t *testing.T) {
logger := log.NewNopLogger()
m := newCrdManager(
component.Options{
Logger: logger,
GetServiceData: func(name string) (interface{}, error) { return nil, nil },
},
cluster.Mock(),
logger,
&operator.DefaultArguments,
KindServiceMonitor,
labelstore.New(logger),
)

m.discoveryManager = newMockDiscoveryManager()
m.scrapeManager = newMockScrapeManager()

targetPort := intstr.FromInt(9090)
m.onAddServiceMonitor(&promopv1.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Namespace: "monitoring",
Name: "svcmonitor",
},
Spec: promopv1.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
"group": "my-group",
},
},
Endpoints: []promopv1.Endpoint{
{
TargetPort: &targetPort,
ScrapeTimeout: "5s",
Interval: "10s",
},
},
},
})
m.onAddServiceMonitor(&promopv1.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Namespace: "monitoring",
Name: "svcmonitor-another",
},
Spec: promopv1.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
"group": "my-group",
},
},
Endpoints: []promopv1.Endpoint{
{
TargetPort: &targetPort,
ScrapeTimeout: "5s",
Interval: "10s",
},
},
}})

require.ElementsMatch(t, []string{"serviceMonitor/monitoring/svcmonitor-another/0", "serviceMonitor/monitoring/svcmonitor/0"}, maps.Keys(m.discoveryConfigs))
m.clearConfigs("monitoring", "svcmonitor")
require.ElementsMatch(t, []string{"monitoring/svcmonitor", "monitoring/svcmonitor-another"}, maps.Keys(m.crdsToMapKeys))
require.ElementsMatch(t, []string{"serviceMonitor/monitoring/svcmonitor-another/0"}, maps.Keys(m.discoveryConfigs))
require.ElementsMatch(t, []string{"serviceMonitor/monitoring/svcmonitor-another"}, maps.Keys(m.debugInfo))
}

func TestClearConfigsProbe(t *testing.T) {
logger := log.NewNopLogger()
m := newCrdManager(
component.Options{
Logger: logger,
GetServiceData: func(name string) (interface{}, error) { return nil, nil },
},
cluster.Mock(),
logger,
&operator.DefaultArguments,
KindProbe,
labelstore.New(logger),
)

m.discoveryManager = newMockDiscoveryManager()
m.scrapeManager = newMockScrapeManager()

m.onAddProbe(&promopv1.Probe{
ObjectMeta: metav1.ObjectMeta{
Namespace: "monitoring",
Name: "probe",
},
Spec: promopv1.ProbeSpec{},
})
m.onAddProbe(&promopv1.Probe{
ObjectMeta: metav1.ObjectMeta{
Namespace: "monitoring",
Name: "probe-another",
},
Spec: promopv1.ProbeSpec{}})

require.ElementsMatch(t, []string{"probe/monitoring/probe-another", "probe/monitoring/probe"}, maps.Keys(m.discoveryConfigs))
m.clearConfigs("monitoring", "probe")
require.ElementsMatch(t, []string{"monitoring/probe", "monitoring/probe-another"}, maps.Keys(m.crdsToMapKeys))
require.ElementsMatch(t, []string{"probe/monitoring/probe-another"}, maps.Keys(m.discoveryConfigs))
require.ElementsMatch(t, []string{"probe/monitoring/probe-another"}, maps.Keys(m.debugInfo))
}

type mockDiscoveryManager struct {
}

func newMockDiscoveryManager() *mockDiscoveryManager {
return &mockDiscoveryManager{}
}

func (m *mockDiscoveryManager) Run() error {
return nil
}

func (m *mockDiscoveryManager) SyncCh() <-chan map[string][]*targetgroup.Group {
return nil
}

func (m *mockDiscoveryManager) ApplyConfig(cfg map[string]discovery.Configs) error {
return nil
}

type mockScrapeManager struct {
}

func newMockScrapeManager() *mockScrapeManager {
return &mockScrapeManager{}
}

func (m *mockScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
return nil
}

func (m *mockScrapeManager) Stop() {

}

func (m *mockScrapeManager) TargetsActive() map[string][]*scrape.Target {
return nil
}

func (m *mockScrapeManager) ApplyConfig(cfg *config.Config) error {
return nil
}
23 changes: 23 additions & 0 deletions component/prometheus/operator/common/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/scrape"
)

// discoveryManager is an interface around discovery.Manager
type discoveryManager interface {
Run() error
SyncCh() <-chan map[string][]*targetgroup.Group
ApplyConfig(cfg map[string]discovery.Configs) error
}

// scrapeManager is an interface around scrape.Manager
type scrapeManager interface {
Run(tsets <-chan map[string][]*targetgroup.Group) error
Stop()
TargetsActive() map[string][]*scrape.Target
ApplyConfig(cfg *config.Config) error
}

0 comments on commit f232fb4

Please sign in to comment.