Skip to content

Commit

Permalink
Merge pull request #1421 from rancher/rs-fix
Browse files Browse the repository at this point in the history
Metrics otel collector : endpoint ordering edge cases and watch improvements
  • Loading branch information
dbason authored May 24, 2023
2 parents 58c01ac + 08b6624 commit 60dc799
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 61 deletions.
5 changes: 5 additions & 0 deletions apis/core/v1beta1/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type CollectorSpec struct {
SystemNamespace string `json:"systemNamespace,omitempty"`
LoggingConfig *corev1.LocalObjectReference `json:"loggingConfig,omitempty"`
MetricsConfig *corev1.LocalObjectReference `json:"metricsConfig,omitempty"`
ConfigReloader *ConfigReloaderSpec `json:"configReloader,omitempty"`
}

type ConfigReloaderSpec struct {
opnimeta.ImageSpec `json:",inline,omitempty"`
}

// CollectorStatus defines the observed state of Collector
Expand Down
21 changes: 21 additions & 0 deletions apis/core/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/crd/bases/core.opni.io_collectors.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions controllers/core_collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ func (r *CoreCollectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
})
r.Client = mgr.GetClient()
r.scheme = mgr.GetScheme()

return ctrl.NewControllerManagedBy(mgr).
For(&corev1beta1.Collector{}).
Watches(&source.Kind{Type: &opnimonitoringv1beta1.CollectorConfig{}}, requestMapper).
Watches(&source.Kind{Type: &opniloggingv1beta1.CollectorConfig{}}, requestMapper).
Watches(&source.Kind{Type: &promoperatorv1.ServiceMonitor{}}, requestMapper).
Watches(&source.Kind{Type: &promoperatorv1.PodMonitor{}}, requestMapper).
Watches(&source.Kind{Type: &corev1.Pod{}}, requestMapper).
Watches(&source.Kind{Type: &corev1.Service{}}, requestMapper).
// for metrics, the we want to watch changes to the spec of objects that drive discovery
Watches(&source.Kind{Type: &promoperatorv1.ServiceMonitor{}}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &promoperatorv1.PodMonitor{}}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.Service{}).
Owns(&appsv1.DaemonSet{}).
Expand Down
11 changes: 1 addition & 10 deletions pkg/otel/templates/metrics.tmpl
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
{{- define "metrics-node-receivers" -}}
{{- if .Metrics.Enabled -}}
prometheus/self:
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 15s
static_configs:
- targets: ['127.0.0.1:{{ .Metrics.ListenPort }}']
{{ template "metrics-nodehost-receiver" . }}
{{- end -}}
{{- end -}}
Expand Down Expand Up @@ -81,10 +74,8 @@ kubeletstats:
{{- define "metrics-self-telemetry" -}}
{{- if .Metrics.Enabled -}}
telemetry:
logs:
level : {{ or .Metrics.LogLevel "debug" }}
metrics:
address : "127.0.0.1:{{ .Metrics.ListenPort }}"
level: none
{{- end -}}
{{- end -}}

Expand Down
2 changes: 0 additions & 2 deletions pkg/otel/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (w *WALConfig) DeepCopy() *WALConfig {
func (d NodeConfig) MetricReceivers() []string {
res := []string{}
if d.Metrics.Enabled {
res = append(res, "prometheus/self")
if lo.FromPtrOr(d.Metrics.Spec.HostMetrics, false) {
res = append(res, "hostmetrics")
if d.Containerized {
Expand All @@ -168,7 +167,6 @@ func (d NodeConfig) MetricReceivers() []string {
func (o AggregatorConfig) MetricReceivers() []string {
res := []string{}
if o.Metrics.Enabled {
res = append(res, "prometheus/self")
if len(o.Metrics.Spec.AdditionalScrapeConfigs) > 0 {
res = append(res, "prometheus/additional")
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/resources/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func NewReconciler(
func (r *Reconciler) Reconcile() (retResult *reconcile.Result, retErr error) {
lg := log.FromContext(r.ctx)
conditions := []string{}

defer func() {
// When the reconciler is done, figure out what the state of the opnicluster
// is and set it in the state field accordingly.
Expand Down Expand Up @@ -86,19 +85,18 @@ func (r *Reconciler) Reconcile() (retResult *reconcile.Result, retErr error) {

var resourceList []resources.Resource

config, configHash := r.agentConfigMap()
config, _ := r.agentConfigMap()
resourceList = append(resourceList, config)
resourceList = append(resourceList, r.daemonSet(configHash))
resourceList = append(resourceList, r.daemonSet())

// check metrics service discovery
metricsConfig, tlsSecrets := r.withPrometheusCrdDiscovery(r.getMetricsConfig()) // check metrics SD
r.logger.Debugf("metrics config : %v", metricsConfig.Spec)
aggCfg := r.getAggregatorConfig(lo.FromPtr(metricsConfig)) // generate aggregator struct
config, configHash = r.aggregatorConfigMap(aggCfg) // generate aggregator configmap

config, _ = r.aggregatorConfigMap(aggCfg) // generate aggregator configmap
resourceList = append(resourceList, r.metricsTlsAssets(tlsSecrets))
resourceList = append(resourceList, config)
resourceList = append(resourceList, r.deployment(configHash))
resourceList = append(resourceList, r.deployment())
resourceList = append(resourceList, r.service())

for _, factory := range resourceList {
Expand Down
8 changes: 8 additions & 0 deletions pkg/resources/collector/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ type target struct {
friendlyName string
}

func (t target) Less(other target) bool {
// handles the case where we have services manageing different ports on the same sets of pods for example
if t.staticAddress == other.staticAddress {
return t.friendlyName < other.friendlyName
}
return t.staticAddress < other.staticAddress
}

type PrometheusDiscovery struct {
logger *zap.SugaredLogger
retrievers []ScrapeConfigRetriever
Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/collector/discovery/podmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (p *podMonitorScrapeConfigRetriever) Yield() (cfg *promCRDOperatorConfig, r
}
err = p.client.List(context.TODO(), pList, listOptions)
if err != nil {
p.logger.Warnf("failed to select pods for pod monitor %s", selectorMap)
lg.Warnf("failed to select pods for pod monitor %s", selectorMap)
continue
}
podList.Items = append(podList.Items, pList.Items...)
Expand Down Expand Up @@ -179,7 +179,7 @@ func (p *podMonitorScrapeConfigRetriever) Yield() (cfg *promCRDOperatorConfig, r
}
numTargets += len(targets)
slices.SortFunc(targets, func(i, j target) bool {
return i.staticAddress < j.staticAddress
return i.Less(j)
})
if len(targets) > 0 {
//de-dupe discovered targets
Expand Down
34 changes: 24 additions & 10 deletions pkg/resources/collector/discovery/servicemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func (s *serviceMonitorScrapeConfigRetriever) resolveServiceTargets(
lg.Warnf("failed to find pods for service %s", svc.Namespace+"-"+svc.Name)
return
}
lg.Debugf("found %d pods matching service", len(podList.Items))
// deref pods
for _, pod := range podList.Items {
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -381,23 +380,38 @@ func (s serviceMonitorScrapeConfigRetriever) Yield() (cfg *promCRDOperatorConfig
lg.Warnf("failed to select services for service monitor %s: %s", svcMon.Spec.Selector, err)
continue
}
for _, svc := range svcList.Items {
for i, ep := range svcMon.Spec.Endpoints {
// in the case of multiple services here, we need to dedupe static targets that are pointed
// to by the multiple services
for i, ep := range svcMon.Spec.Endpoints {
dedupedTargets := map[string]target{}
for _, svc := range svcList.Items {
targets := s.resolveServiceTargets(*svcMon, svc, ep)
numTargets += len(targets)
slices.SortFunc(targets, func(i, j target) bool {
return i.staticAddress < j.staticAddress
return i.Less(j)
})
if len(targets) > 0 {
job, sCfg, secrets := s.generateStaticServiceConfig(svcMon, ep, i, targets)
if _, ok := cfgMap[job]; !ok {
cfgMap[job] = sCfg
for _, t := range targets {
if target, ok := dedupedTargets[t.staticAddress]; !ok {
dedupedTargets[t.staticAddress] = t
} else {
if t.Less(target) {
dedupedTargets[t.staticAddress] = t
}
}
secretRes = append(secretRes, secrets...)
}
} // end of target discovery
if len(dedupedTargets) > 0 {
generatedTargets := lo.Values(dedupedTargets)
slices.SortFunc(generatedTargets, func(i, j target) bool {
return i.Less(j)
})
job, sCfg, secrets := s.generateStaticServiceConfig(svcMon, ep, i, generatedTargets)
if _, ok := cfgMap[job]; !ok {
cfgMap[job] = sCfg
}
secretRes = append(secretRes, secrets...)
}
}

if numTargets == 0 {
lg.Warn("no scrape targets found for service monitor")
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/resources/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
monitoringv1beta1 "github.com/rancher/opni/apis/monitoring/v1beta1"
"github.com/rancher/opni/pkg/otel"
"github.com/rancher/opni/pkg/resources"
"github.com/rancher/opni/pkg/resources/collector/discovery"
promdiscover "github.com/rancher/opni/pkg/resources/collector/discovery"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -81,10 +80,10 @@ func (r *Reconciler) getMetricsConfig() (config *otel.MetricsConfig) {
func (r *Reconciler) withPrometheusCrdDiscovery(
config *otel.MetricsConfig) (
*otel.MetricsConfig,
[]discovery.SecretResolutionConfig,
[]promdiscover.SecretResolutionConfig,
) {
if r.PrometheusDiscovery == nil {
return config, []discovery.SecretResolutionConfig{}
return config, []promdiscover.SecretResolutionConfig{}
}
discStr, secrets, err := r.discoveredScrapeCfg(config)
if err != nil {
Expand All @@ -98,17 +97,17 @@ func (r *Reconciler) discoveredScrapeCfg(
cfg *otel.MetricsConfig, // TODO : eventually this config will drive selector config for SD
) (
retCfg string,
secrets []discovery.SecretResolutionConfig,
secrets []promdiscover.SecretResolutionConfig,
retErr error,
) {
cfgs, secrets, err := r.PrometheusDiscovery.YieldScrapeConfigs()
if err != nil || len(cfgs) == 0 {
return "", []discovery.SecretResolutionConfig{}, err
return "", []promdiscover.SecretResolutionConfig{}, err
}
return otel.PromCfgToString(cfgs), secrets, nil
}

func (r *Reconciler) metricsTlsAssets(sec []discovery.SecretResolutionConfig) resources.Resource {
func (r *Reconciler) metricsTlsAssets(sec []promdiscover.SecretResolutionConfig) resources.Resource {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: tlsSecretName,
Expand Down Expand Up @@ -157,7 +156,7 @@ func (r *Reconciler) aggregatorMetricVolumes() (retVolumeMounts []corev1.VolumeM
},
corev1.VolumeMount{
Name: fmt.Sprintf("%s-volume", tlsSecretName),
MountPath: fmt.Sprintf(discovery.TlsAssetMountPath),
MountPath: fmt.Sprintf(promdiscover.TlsAssetMountPath),
},
)

Expand Down
1 change: 0 additions & 1 deletion pkg/resources/collector/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ receivers:
protocols:
grpc: {}
http: {}
{{ template "metrics-self-receiver" . }}
{{ template "metrics-prometheus-receiver" . }}
{{ template "metrics-prometheus-discoverer" . }}
{{- if .LogsEnabled }}
Expand Down
Loading

0 comments on commit 60dc799

Please sign in to comment.