From 130c23140337865b59abe1a2b659ca407d6ad33f Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 19 Dec 2022 15:35:27 -0500 Subject: [PATCH 1/6] Only track Mean and Count from GCP Distribution Signed-off-by: Kyle Eckhart --- collectors/delta_distribution.go | 8 ++++---- collectors/monitoring_metrics.go | 22 ++++++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index 83bc96ae..6580fdac 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -120,8 +120,8 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo } // Calculate a new mean and overall count - mean := existing.dist.Mean - mean += current.dist.Mean + mean := existing.mean + mean += current.mean mean /= 2 var count uint64 @@ -129,8 +129,8 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo count += v } - current.dist.Mean = mean - current.dist.Count = int64(count) + current.mean = mean + current.count = count return current } diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index d6bbced7..52bf6af5 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -88,7 +88,8 @@ type ConstMetric struct { type HistogramMetric struct { fqName string labelKeys []string - dist *monitoring.Distribution + mean float64 + count uint64 buckets map[float64]uint64 labelValues []string reportTime time.Time @@ -104,12 +105,12 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time v = HistogramMetric{ fqName: fqName, labelKeys: labelKeys, - dist: dist, + mean: dist.Mean, + count: uint64(dist.Count), buckets: buckets, labelValues: labelValues, reportTime: reportTime, - - keysHash: hashLabelKeys(labelKeys), + keysHash: hashLabelKeys(labelKeys), } } @@ -127,16 +128,16 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time return } - t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) + t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist.Mean, uint64(dist.Count), buckets, labelValues) } -func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, mean float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( t.newMetricDesc(fqName, labelKeys), - uint64(dist.Count), - dist.Mean*float64(dist.Count), // Stackdriver does not provide the sum, but we can fake it + count, + mean*float64(count), // Stackdriver does not provide the sum, but we can fake it buckets, labelValues..., ), @@ -242,7 +243,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } } for _, v := range vs { - t.ch <- t.newConstHistogram(v.fqName, v.reportTime, v.labelKeys, v.dist, v.buckets, v.labelValues) + t.ch <- t.newConstHistogram(v.fqName, v.reportTime, v.labelKeys, v.mean, v.count, v.buckets, v.labelValues) } } } @@ -310,7 +311,8 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim collected.histogram.fqName, collected.histogram.reportTime, collected.histogram.labelKeys, - collected.histogram.dist, + collected.histogram.mean, + collected.histogram.count, collected.histogram.buckets, collected.histogram.labelValues, ) From f1b7a5e4b01926fe8abadbb427e70c5d611e64e9 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Mon, 19 Dec 2022 15:38:36 -0500 Subject: [PATCH 2/6] Export fields of ConstMetric and HistogramMetric Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 24 +++--- collectors/delta_distribution.go | 36 ++++---- collectors/monitoring_metrics.go | 140 +++++++++++++++---------------- 3 files changed, 100 insertions(+), 100 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index c3798ab3..0ab07615 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -80,27 +80,27 @@ func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.Metri existing := entry.collected[key] if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) entry.collected[key] = &CollectedMetric{currentValue, time.Now()} return } - if existing.metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) - currentValue.value = currentValue.value + existing.metric.value + if existing.metric.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.metric.Value, "adding", currentValue.Value, "last_reported_time", entry.collected[key].metric.ReportTime, "incoming_time", currentValue.ReportTime) + currentValue.Value = currentValue.Value + existing.metric.Value existing.metric = currentValue existing.lastCollectedAt = time.Now() return } - level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.metric.reportTime, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.metric.ReportTime, "incoming_time", currentValue.ReportTime) } func toCounterKey(c *ConstMetric) uint64 { labels := make(map[string]string) - keysCopy := append([]string{}, c.labelKeys...) - for i := range c.labelKeys { - labels[c.labelKeys[i]] = c.labelValues[i] + keysCopy := append([]string{}, c.LabelKeys...) + for i := range c.LabelKeys { + labels[c.LabelKeys[i]] = c.LabelValues[i] } sort.Strings(keysCopy) @@ -108,7 +108,7 @@ func toCounterKey(c *ConstMetric) uint64 { for _, k := range keysCopy { keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) } - hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) + hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) h := hashNew() h = hashAdd(h, hashText) @@ -131,12 +131,12 @@ func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.FqName) delete(entry.collected, key) continue } - metrics, exists := output[collected.metric.fqName] + metrics, exists := output[collected.metric.FqName] if !exists { metrics = make([]*CollectedMetric, 0) } @@ -145,7 +145,7 @@ func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map metric: &metricCopy, lastCollectedAt: collected.lastCollectedAt, } - output[collected.metric.fqName] = append(metrics, &outputEntry) + output[collected.metric.FqName] = append(metrics, &outputEntry) } return output diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index 6580fdac..e7c5ce86 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -80,26 +80,26 @@ func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring. existing := entry.collected[key] if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} return } - if existing.histogram.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) + if existing.histogram.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) existing.histogram = mergeHistograms(existing.histogram, currentValue) existing.lastCollectedAt = time.Now() return } - level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) } func toHistogramKey(hist *HistogramMetric) uint64 { labels := make(map[string]string) - keysCopy := append([]string{}, hist.labelKeys...) - for i := range hist.labelKeys { - labels[hist.labelKeys[i]] = hist.labelValues[i] + keysCopy := append([]string{}, hist.LabelKeys...) + for i := range hist.LabelKeys { + labels[hist.LabelKeys[i]] = hist.LabelValues[i] } sort.Strings(keysCopy) @@ -107,7 +107,7 @@ func toHistogramKey(hist *HistogramMetric) uint64 { for _, k := range keysCopy { keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) } - hashText := fmt.Sprintf("%s|%s", hist.fqName, strings.Join(keyParts, "|")) + hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) h := hashNew() h = hashAdd(h, hashText) @@ -115,22 +115,22 @@ func toHistogramKey(hist *HistogramMetric) uint64 { } func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { - for key, value := range existing.buckets { - current.buckets[key] += value + for key, value := range existing.Buckets { + current.Buckets[key] += value } // Calculate a new mean and overall count - mean := existing.mean - mean += current.mean + mean := existing.Mean + mean += current.Mean mean /= 2 var count uint64 - for _, v := range current.buckets { + for _, v := range current.Buckets { count += v } - current.mean = mean - current.count = count + current.Mean = mean + current.Count = count return current } @@ -151,12 +151,12 @@ func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.FqName) delete(entry.collected, key) continue } - metrics, exists := output[collected.histogram.fqName] + metrics, exists := output[collected.histogram.FqName] if !exists { metrics = make([]*CollectedHistogram, 0) } @@ -165,7 +165,7 @@ func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string histogram: &histCopy, lastCollectedAt: collected.lastCollectedAt, } - output[collected.histogram.fqName] = append(metrics, &outputEntry) + output[collected.histogram.FqName] = append(metrics, &outputEntry) } return output diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 52bf6af5..3bc8dc3d 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -75,26 +75,26 @@ func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *pr } type ConstMetric struct { - fqName string - labelKeys []string - valueType prometheus.ValueType - value float64 - labelValues []string - reportTime time.Time - - keysHash uint64 + FqName string + LabelKeys []string + ValueType prometheus.ValueType + Value float64 + LabelValues []string + ReportTime time.Time + + KeysHash uint64 } type HistogramMetric struct { - fqName string - labelKeys []string - mean float64 - count uint64 - buckets map[float64]uint64 - labelValues []string - reportTime time.Time - - keysHash uint64 + FqName string + LabelKeys []string + Mean float64 + Count uint64 + Buckets map[float64]uint64 + LabelValues []string + ReportTime time.Time + + KeysHash uint64 } func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { @@ -103,14 +103,14 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time var v HistogramMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ - fqName: fqName, - labelKeys: labelKeys, - mean: dist.Mean, - count: uint64(dist.Count), - buckets: buckets, - labelValues: labelValues, - reportTime: reportTime, - keysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + Mean: dist.Mean, + Count: uint64(dist.Count), + Buckets: buckets, + LabelValues: labelValues, + ReportTime: reportTime, + KeysHash: hashLabelKeys(labelKeys), } } @@ -150,14 +150,14 @@ func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer var v ConstMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = ConstMetric{ - fqName: fqName, - labelKeys: labelKeys, - valueType: metricValueType, - value: metricValue, - labelValues: labelValues, - reportTime: reportTime, - - keysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + ValueType: metricValueType, + Value: metricValue, + LabelValues: labelValues, + ReportTime: reportTime, + + KeysHash: hashLabelKeys(labelKeys), } } @@ -214,7 +214,7 @@ func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { - if vs[0].keysHash != vs[i].keysHash { + if vs[0].KeysHash != vs[i].KeysHash { needFill = true } } @@ -224,7 +224,7 @@ func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons } for _, v := range vs { - t.ch <- t.newConstMetric(v.fqName, v.reportTime, v.labelKeys, v.valueType, v.value, v.labelValues) + t.ch <- t.newConstMetric(v.FqName, v.ReportTime, v.LabelKeys, v.ValueType, v.Value, v.LabelValues) } } } @@ -234,7 +234,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { - if vs[0].keysHash != vs[i].keysHash { + if vs[0].KeysHash != vs[i].KeysHash { needFill = true } } @@ -243,7 +243,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } } for _, v := range vs { - t.ch <- t.newConstHistogram(v.fqName, v.reportTime, v.labelKeys, v.mean, v.count, v.buckets, v.labelValues) + t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Mean, v.Count, v.Buckets, v.LabelValues) } } } @@ -260,22 +260,22 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) - collected.metric.reportTime = now.Add(-reportingLag) + reportingLag := collected.lastCollectedAt.Sub(collected.metric.ReportTime).Truncate(time.Minute) + collected.metric.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := constMetrics[collected.metric.fqName]; !exists { - constMetrics[collected.metric.fqName] = []*ConstMetric{} + if _, exists := constMetrics[collected.metric.FqName]; !exists { + constMetrics[collected.metric.FqName] = []*ConstMetric{} } - constMetrics[collected.metric.fqName] = append(constMetrics[collected.metric.fqName], collected.metric) + constMetrics[collected.metric.FqName] = append(constMetrics[collected.metric.FqName], collected.metric) } else { t.ch <- t.newConstMetric( - collected.metric.fqName, - collected.metric.reportTime, - collected.metric.labelKeys, - collected.metric.valueType, - collected.metric.value, - collected.metric.labelValues, + collected.metric.FqName, + collected.metric.ReportTime, + collected.metric.LabelKeys, + collected.metric.ValueType, + collected.metric.Value, + collected.metric.LabelValues, ) } } @@ -298,23 +298,23 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) - collected.histogram.reportTime = now.Add(-reportingLag) + reportingLag := collected.lastCollectedAt.Sub(collected.histogram.ReportTime).Truncate(time.Minute) + collected.histogram.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := histograms[collected.histogram.fqName]; !exists { - histograms[collected.histogram.fqName] = []*HistogramMetric{} + if _, exists := histograms[collected.histogram.FqName]; !exists { + histograms[collected.histogram.FqName] = []*HistogramMetric{} } - histograms[collected.histogram.fqName] = append(histograms[collected.histogram.fqName], collected.histogram) + histograms[collected.histogram.FqName] = append(histograms[collected.histogram.FqName], collected.histogram) } else { t.ch <- t.newConstHistogram( - collected.histogram.fqName, - collected.histogram.reportTime, - collected.histogram.labelKeys, - collected.histogram.mean, - collected.histogram.count, - collected.histogram.buckets, - collected.histogram.labelValues, + collected.histogram.FqName, + collected.histogram.ReportTime, + collected.histogram.LabelKeys, + collected.histogram.Mean, + collected.histogram.Count, + collected.histogram.Buckets, + collected.histogram.LabelValues, ) } } @@ -328,21 +328,21 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { allKeys[key] = struct{}{} } } for _, metric := range metrics { - if len(metric.labelKeys) != len(allKeys) { + if len(metric.LabelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { metricKeys[key] = struct{}{} } for key := range allKeys { if _, ok := metricKeys[key]; !ok { - metric.labelKeys = append(metric.labelKeys, key) - metric.labelValues = append(metric.labelValues, "") + metric.LabelKeys = append(metric.LabelKeys, key) + metric.LabelValues = append(metric.LabelValues, "") } } } @@ -354,21 +354,21 @@ func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { func fillHistogramMetricsLabels(metrics []*HistogramMetric) []*HistogramMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { allKeys[key] = struct{}{} } } for _, metric := range metrics { - if len(metric.labelKeys) != len(allKeys) { + if len(metric.LabelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { metricKeys[key] = struct{}{} } for key := range allKeys { if _, ok := metricKeys[key]; !ok { - metric.labelKeys = append(metric.labelKeys, key) - metric.labelValues = append(metric.labelValues, "") + metric.LabelKeys = append(metric.LabelKeys, key) + metric.LabelValues = append(metric.LabelValues, "") } } } From 5597ed86a7d0af34ce165b511407d4ffbf20e433 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 20 Dec 2022 16:03:33 -0500 Subject: [PATCH 3/6] Simplify Delta ListMetrics implementation Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 13 ++--- collectors/delta_distribution.go | 12 ++-- collectors/monitoring_metrics.go | 95 ++++++++++++++++---------------- 3 files changed, 54 insertions(+), 66 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 0ab07615..0528bbcd 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -39,7 +39,7 @@ type DeltaCounterStore interface { Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric + ListMetrics(metricDescriptorName string) []*CollectedMetric } type metricEntry struct { @@ -115,8 +115,8 @@ func toCounterKey(c *ConstMetric) uint64 { return h } -func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { - output := map[string][]*CollectedMetric{} +func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) []*CollectedMetric { + var output []*CollectedMetric now := time.Now() ttlWindowStart := now.Add(-s.ttl) @@ -135,17 +135,12 @@ func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map delete(entry.collected, key) continue } - - metrics, exists := output[collected.metric.FqName] - if !exists { - metrics = make([]*CollectedMetric, 0) - } metricCopy := *collected.metric outputEntry := CollectedMetric{ metric: &metricCopy, lastCollectedAt: collected.lastCollectedAt, } - output[collected.metric.FqName] = append(metrics, &outputEntry) + output = append(output, &outputEntry) } return output diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index e7c5ce86..657db59d 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -39,7 +39,7 @@ type DeltaDistributionStore interface { Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram + ListMetrics(metricDescriptorName string) []*CollectedHistogram } type histogramEntry struct { @@ -135,8 +135,8 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo return current } -func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { - output := map[string][]*CollectedHistogram{} +func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) []*CollectedHistogram { + var output []*CollectedHistogram now := time.Now() ttlWindowStart := now.Add(-s.ttl) @@ -156,16 +156,12 @@ func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string continue } - metrics, exists := output[collected.histogram.FqName] - if !exists { - metrics = make([]*CollectedHistogram, 0) - } histCopy := *collected.histogram outputEntry := CollectedHistogram{ histogram: &histCopy, lastCollectedAt: collected.lastCollectedAt, } - output[collected.histogram.FqName] = append(metrics, &outputEntry) + output = append(output, &outputEntry) } return output diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 3bc8dc3d..9d57d297 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -253,31 +253,29 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} - for _, metrics := range descriptorMetrics { - for _, collected := range metrics { - // If the metric wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { - // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many - // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional - // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.metric.ReportTime).Truncate(time.Minute) - collected.metric.ReportTime = now.Add(-reportingLag) - } - if t.fillMissingLabels { - if _, exists := constMetrics[collected.metric.FqName]; !exists { - constMetrics[collected.metric.FqName] = []*ConstMetric{} - } - constMetrics[collected.metric.FqName] = append(constMetrics[collected.metric.FqName], collected.metric) - } else { - t.ch <- t.newConstMetric( - collected.metric.FqName, - collected.metric.ReportTime, - collected.metric.LabelKeys, - collected.metric.ValueType, - collected.metric.Value, - collected.metric.LabelValues, - ) + for _, collected := range descriptorMetrics { + // If the metric wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.lastCollectedAt.Sub(collected.metric.ReportTime).Truncate(time.Minute) + collected.metric.ReportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := constMetrics[collected.metric.FqName]; !exists { + constMetrics[collected.metric.FqName] = []*ConstMetric{} } + constMetrics[collected.metric.FqName] = append(constMetrics[collected.metric.FqName], collected.metric) + } else { + t.ch <- t.newConstMetric( + collected.metric.FqName, + collected.metric.ReportTime, + collected.metric.LabelKeys, + collected.metric.ValueType, + collected.metric.Value, + collected.metric.LabelValues, + ) } } @@ -291,32 +289,31 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} - for _, metrics := range descriptorMetrics { - for _, collected := range metrics { - // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { - // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many - // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional - // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.histogram.ReportTime).Truncate(time.Minute) - collected.histogram.ReportTime = now.Add(-reportingLag) - } - if t.fillMissingLabels { - if _, exists := histograms[collected.histogram.FqName]; !exists { - histograms[collected.histogram.FqName] = []*HistogramMetric{} - } - histograms[collected.histogram.FqName] = append(histograms[collected.histogram.FqName], collected.histogram) - } else { - t.ch <- t.newConstHistogram( - collected.histogram.FqName, - collected.histogram.ReportTime, - collected.histogram.LabelKeys, - collected.histogram.Mean, - collected.histogram.Count, - collected.histogram.Buckets, - collected.histogram.LabelValues, - ) + for _, collected := range descriptorMetrics { + + // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.lastCollectedAt) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.lastCollectedAt.Sub(collected.histogram.ReportTime).Truncate(time.Minute) + collected.histogram.ReportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := histograms[collected.histogram.FqName]; !exists { + histograms[collected.histogram.FqName] = []*HistogramMetric{} } + histograms[collected.histogram.FqName] = append(histograms[collected.histogram.FqName], collected.histogram) + } else { + t.ch <- t.newConstHistogram( + collected.histogram.FqName, + collected.histogram.ReportTime, + collected.histogram.LabelKeys, + collected.histogram.Mean, + collected.histogram.Count, + collected.histogram.Buckets, + collected.histogram.LabelValues, + ) } } From c79a108b4dcc5d61b3cf16c6d0d434c5152e8def Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 17 Jan 2023 21:28:35 -0500 Subject: [PATCH 4/6] Create dedicated delta package, add tests, and rename distribution -> histogram Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 147 ------------------------- collectors/delta_distribution.go | 168 ----------------------------- collectors/fnv.go | 8 +- collectors/monitoring_collector.go | 24 +++-- collectors/monitoring_metrics.go | 148 ++++++++++++------------- delta/counter.go | 131 ++++++++++++++++++++++ delta/counter_test.go | 84 +++++++++++++++ delta/delta_suite_test.go | 26 +++++ delta/histogram.go | 150 ++++++++++++++++++++++++++ delta/histogram_test.go | 64 +++++++++++ stackdriver_exporter.go | 3 +- 11 files changed, 554 insertions(+), 399 deletions(-) delete mode 100644 collectors/delta_counter.go delete mode 100644 collectors/delta_distribution.go create mode 100644 delta/counter.go create mode 100644 delta/counter_test.go create mode 100644 delta/delta_suite_test.go create mode 100644 delta/histogram.go create mode 100644 delta/histogram_test.go diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go deleted file mode 100644 index 0528bbcd..00000000 --- a/collectors/delta_counter.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedMetric struct { - metric *ConstMetric - lastCollectedAt time.Time -} - -// DeltaCounterStore defines a set of functions which must be implemented in order to be used as a DeltaCounterStore -// which accumulates DELTA Counter metrics over time -type DeltaCounterStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) []*CollectedMetric -} - -type metricEntry struct { - collected map[uint64]*CollectedMetric - mutex *sync.RWMutex -} - -type inMemoryDeltaCounterStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory -func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return &inMemoryDeltaCounterStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{ - collected: map[uint64]*CollectedMetric{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*metricEntry) - - key := toCounterKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) - entry.collected[key] = &CollectedMetric{currentValue, time.Now()} - return - } - - if existing.metric.ReportTime.Before(currentValue.ReportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.metric.Value, "adding", currentValue.Value, "last_reported_time", entry.collected[key].metric.ReportTime, "incoming_time", currentValue.ReportTime) - currentValue.Value = currentValue.Value + existing.metric.Value - existing.metric = currentValue - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.metric.ReportTime, "incoming_time", currentValue.ReportTime) -} - -func toCounterKey(c *ConstMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, c.LabelKeys...) - for i := range c.LabelKeys { - labels[c.LabelKeys[i]] = c.LabelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) []*CollectedMetric { - var output []*CollectedMetric - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*metricEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.FqName) - delete(entry.collected, key) - continue - } - metricCopy := *collected.metric - outputEntry := CollectedMetric{ - metric: &metricCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output = append(output, &outputEntry) - } - - return output -} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go deleted file mode 100644 index 657db59d..00000000 --- a/collectors/delta_distribution.go +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedHistogram struct { - histogram *HistogramMetric - lastCollectedAt time.Time -} - -// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore -// which accumulates DELTA histogram metrics over time -type DeltaDistributionStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) []*CollectedHistogram -} - -type histogramEntry struct { - collected map[uint64]*CollectedHistogram - mutex *sync.RWMutex -} - -type inMemoryDeltaDistributionStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory -func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { - return &inMemoryDeltaDistributionStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{ - collected: map[uint64]*CollectedHistogram{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*histogramEntry) - - key := toHistogramKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) - entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} - return - } - - if existing.histogram.ReportTime.Before(currentValue.ReportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) - existing.histogram = mergeHistograms(existing.histogram, currentValue) - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) -} - -func toHistogramKey(hist *HistogramMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, hist.LabelKeys...) - for i := range hist.LabelKeys { - labels[hist.LabelKeys[i]] = hist.LabelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { - for key, value := range existing.Buckets { - current.Buckets[key] += value - } - - // Calculate a new mean and overall count - mean := existing.Mean - mean += current.Mean - mean /= 2 - - var count uint64 - for _, v := range current.Buckets { - count += v - } - - current.Mean = mean - current.Count = count - - return current -} - -func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) []*CollectedHistogram { - var output []*CollectedHistogram - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*histogramEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.FqName) - delete(entry.collected, key) - continue - } - - histCopy := *collected.histogram - outputEntry := CollectedHistogram{ - histogram: &histCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output = append(output, &outputEntry) - } - - return output -} diff --git a/collectors/fnv.go b/collectors/fnv.go index 8891c287..62c66087 100644 --- a/collectors/fnv.go +++ b/collectors/fnv.go @@ -23,13 +23,13 @@ const ( prime64 = 1099511628211 ) -// hashNew initializies a new fnv64a hash value. -func hashNew() uint64 { +// HashNew initializies a new fnv64a hash value. +func HashNew() uint64 { return offset64 } -// hashAdd adds a string to a fnv64a hash value, returning the updated hash. -func hashAdd(h uint64, s string) uint64 { +// HashAdd adds a string to a fnv64a hash value, returning the updated hash. +func HashAdd(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index 124c8bbf..eabac955 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -54,8 +54,8 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore aggregateDeltas bool descriptorCache DescriptorCache } @@ -110,7 +110,17 @@ func (d *googleDescriptorCache) Store(prefix string, data []*monitoring.MetricDe d.inner.Store(prefix, data) } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { +type DeltaCounterStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + ListMetrics(metricDescriptorName string) []*ConstMetric +} + +type DeltaHistogramStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + ListMetrics(metricDescriptorName string) []*HistogramMetric +} + +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, histogramStore DeltaHistogramStore) (*MonitoringCollector, error) { const subsystem = "monitoring" apiCallsTotalMetric := prometheus.NewCounter( @@ -200,8 +210,8 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, - deltaCounterStore: counterStore, - deltaDistributionStore: distributionStore, + counterStore: counterStore, + histogramStore: histogramStore, aggregateDeltas: opts.AggregateDeltas, descriptorCache: descriptorCache, } @@ -401,8 +411,8 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, ch, c.collectorFillMissingLabels, - c.deltaCounterStore, - c.deltaDistributionStore, + c.counterStore, + c.histogramStore, c.aggregateDeltas, ) if err != nil { diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 9d57d297..af0cea7a 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -41,27 +41,27 @@ type timeSeriesMetrics struct { constMetrics map[string][]*ConstMetric histogramMetrics map[string][]*HistogramMetric - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore - aggregateDeltas bool + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore + aggregateDeltas bool } func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, fillMissingLabels bool, - deltaCounterStore DeltaCounterStore, - deltaDistributionStore DeltaDistributionStore, + counterStore DeltaCounterStore, + histogramStore DeltaHistogramStore, aggregateDeltas bool) (*timeSeriesMetrics, error) { return &timeSeriesMetrics{ - metricDescriptor: descriptor, - ch: ch, - fillMissingLabels: fillMissingLabels, - constMetrics: make(map[string][]*ConstMetric), - histogramMetrics: make(map[string][]*HistogramMetric), - deltaCounterStore: deltaCounterStore, - deltaDistributionStore: deltaDistributionStore, - aggregateDeltas: aggregateDeltas, + metricDescriptor: descriptor, + ch: ch, + fillMissingLabels: fillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + counterStore: counterStore, + histogramStore: histogramStore, + aggregateDeltas: aggregateDeltas, }, nil } @@ -75,24 +75,26 @@ func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *pr } type ConstMetric struct { - FqName string - LabelKeys []string - ValueType prometheus.ValueType - Value float64 - LabelValues []string - ReportTime time.Time + FqName string + LabelKeys []string + ValueType prometheus.ValueType + Value float64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time KeysHash uint64 } type HistogramMetric struct { - FqName string - LabelKeys []string - Mean float64 - Count uint64 - Buckets map[float64]uint64 - LabelValues []string - ReportTime time.Time + FqName string + LabelKeys []string + Mean float64 + Count uint64 + Buckets map[float64]uint64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time KeysHash uint64 } @@ -103,19 +105,21 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time var v HistogramMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ - FqName: fqName, - LabelKeys: labelKeys, - Mean: dist.Mean, - Count: uint64(dist.Count), - Buckets: buckets, - LabelValues: labelValues, - ReportTime: reportTime, - KeysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + Mean: dist.Mean, + Count: uint64(dist.Count), + Buckets: buckets, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), + + KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaDistributionStore.Increment(t.metricDescriptor, &v) + t.histogramStore.Increment(t.metricDescriptor, &v) return } @@ -150,19 +154,20 @@ func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer var v ConstMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = ConstMetric{ - FqName: fqName, - LabelKeys: labelKeys, - ValueType: metricValueType, - Value: metricValue, - LabelValues: labelValues, - ReportTime: reportTime, + FqName: fqName, + LabelKeys: labelKeys, + ValueType: metricValueType, + Value: metricValue, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaCounterStore.Increment(t.metricDescriptor, &v) + t.counterStore.Increment(t.metricDescriptor, &v) return } @@ -191,12 +196,12 @@ func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, } func hashLabelKeys(labelKeys []string) uint64 { - dh := hashNew() + dh := HashNew() sortedKeys := make([]string, len(labelKeys)) copy(sortedKeys, labelKeys) sort.Strings(sortedKeys) for _, key := range sortedKeys { - dh = hashAdd(dh, key) + dh = HashAdd(dh, key) dh = hashAddByte(dh, separatorByte) } return dh @@ -249,32 +254,32 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaCounterStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.counterStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} for _, collected := range descriptorMetrics { // If the metric wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { + if reportingStartTime.After(collected.CollectionTime) { // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.metric.ReportTime).Truncate(time.Minute) - collected.metric.ReportTime = now.Add(-reportingLag) + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := constMetrics[collected.metric.FqName]; !exists { - constMetrics[collected.metric.FqName] = []*ConstMetric{} + if _, exists := constMetrics[collected.FqName]; !exists { + constMetrics[collected.FqName] = []*ConstMetric{} } - constMetrics[collected.metric.FqName] = append(constMetrics[collected.metric.FqName], collected.metric) + constMetrics[collected.FqName] = append(constMetrics[collected.FqName], collected) } else { t.ch <- t.newConstMetric( - collected.metric.FqName, - collected.metric.ReportTime, - collected.metric.LabelKeys, - collected.metric.ValueType, - collected.metric.Value, - collected.metric.LabelValues, + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.ValueType, + collected.Value, + collected.LabelValues, ) } } @@ -285,34 +290,33 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti } func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaDistributionStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.histogramStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} for _, collected := range descriptorMetrics { - // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { + if reportingStartTime.After(collected.CollectionTime) { // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.histogram.ReportTime).Truncate(time.Minute) - collected.histogram.ReportTime = now.Add(-reportingLag) + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := histograms[collected.histogram.FqName]; !exists { - histograms[collected.histogram.FqName] = []*HistogramMetric{} + if _, exists := histograms[collected.FqName]; !exists { + histograms[collected.FqName] = []*HistogramMetric{} } - histograms[collected.histogram.FqName] = append(histograms[collected.histogram.FqName], collected.histogram) + histograms[collected.FqName] = append(histograms[collected.FqName], collected) } else { t.ch <- t.newConstHistogram( - collected.histogram.FqName, - collected.histogram.ReportTime, - collected.histogram.LabelKeys, - collected.histogram.Mean, - collected.histogram.Count, - collected.histogram.Buckets, - collected.histogram.LabelValues, + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.Mean, + collected.Count, + collected.Buckets, + collected.LabelValues, ) } } diff --git a/delta/counter.go b/delta/counter.go new file mode 100644 index 00000000..188fc525 --- /dev/null +++ b/delta/counter.go @@ -0,0 +1,131 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) + +type MetricEntry struct { + Collected map[uint64]*collectors.ConstMetric + mutex *sync.RWMutex +} + +type InMemoryCounterStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryCounterStore returns an implementation of CounterStore which is persisted in-memory +func NewInMemoryCounterStore(logger log.Logger, ttl time.Duration) *InMemoryCounterStore { + store := &InMemoryCounterStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.ConstMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &MetricEntry{ + Collected: map[uint64]*collectors.ConstMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*MetricEntry) + + key := toCounterKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.Value, "adding", currentValue.Value, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + currentValue.Value = currentValue.Value + existing.Value + entry.Collected[key] = currentValue + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toCounterKey(c *collectors.ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.LabelKeys...) + for i := range c.LabelKeys { + labels[c.LabelKeys[i]] = c.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) + h := collectors.HashNew() + h = collectors.HashAdd(h, hashText) + + return h +} + +func (s *InMemoryCounterStore) ListMetrics(metricDescriptorName string) []*collectors.ConstMetric { + var output []*collectors.ConstMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*MetricEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + //Dereference to create shallow copy + metricCopy := *collected + output = append(output, &metricCopy) + } + + return output +} diff --git a/delta/counter_test.go b/delta/counter_test.go new file mode 100644 index 00000000..31fceceb --- /dev/null +++ b/delta/counter_test.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("Counter", func() { + var store *delta.InMemoryCounterStore + var metric *collectors.ConstMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryCounterStore(promlog.New(&promlog.Config{}), time.Minute) + metric = &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 10, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + }) + + It("can return tracked counters", func() { + store.Increment(descriptor, metric) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(metric)) + }) + + It("can increment counters multiple times", func() { + store.Increment(descriptor, metric) + + metric2 := &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 20, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second).Add(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + + store.Increment(descriptor, metric2) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0].Value).To(Equal(float64(30))) + }) + + It("will remove counters outside of TTL", func() { + metric.CollectionTime = metric.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, metric) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/delta/delta_suite_test.go b/delta/delta_suite_test.go new file mode 100644 index 00000000..f0e625f2 --- /dev/null +++ b/delta/delta_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestDelta(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Delta Suite") +} diff --git a/delta/histogram.go b/delta/histogram.go new file mode 100644 index 00000000..30146eed --- /dev/null +++ b/delta/histogram.go @@ -0,0 +1,150 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) + +type HistogramEntry struct { + Collected map[uint64]*collectors.HistogramMetric + mutex *sync.RWMutex +} + +type InMemoryHistogramStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryHistogramStore returns an implementation of HistogramStore which is persisted in-memory +func NewInMemoryHistogramStore(logger log.Logger, ttl time.Duration) *InMemoryHistogramStore { + store := &InMemoryHistogramStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.HistogramMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &HistogramEntry{ + Collected: map[uint64]*collectors.HistogramMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*HistogramEntry) + + key := toHistogramKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = mergeHistograms(existing, currentValue) + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toHistogramKey(hist *collectors.HistogramMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, hist.LabelKeys...) + for i := range hist.LabelKeys { + labels[hist.LabelKeys[i]] = hist.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) + h := collectors.HashNew() + h = collectors.HashAdd(h, hashText) + + return h +} + +func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric { + for key, value := range existing.Buckets { + current.Buckets[key] += value + } + + // Calculate a new mean and overall count + mean := existing.Mean + mean += current.Mean + mean /= 2 + + var count uint64 + for _, v := range current.Buckets { + count += v + } + + current.Mean = mean + current.Count = count + + return current +} + +func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric { + var output []*collectors.HistogramMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*HistogramEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + copy := *collected + output = append(output, ©) + } + + return output +} diff --git a/delta/histogram_test.go b/delta/histogram_test.go new file mode 100644 index 00000000..ac0e27f4 --- /dev/null +++ b/delta/histogram_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("HistogramStore", func() { + var store *delta.InMemoryHistogramStore + var histogram *collectors.HistogramMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute) + histogram = &collectors.HistogramMetric{ + FqName: "histogram_name", + LabelKeys: []string{"labelKey"}, + Mean: 10, + Count: 100, + Buckets: map[float64]uint64{1.00000000000000000001: 1000}, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 8765, + } + }) + + It("can return tracked histograms", func() { + store.Increment(descriptor, histogram) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(histogram)) + }) + + It("will remove histograms outside of TTL", func() { + histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, histogram) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 760cb71b..6296626e 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -37,6 +37,7 @@ import ( "google.golang.org/api/option" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -216,7 +217,7 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { AggregateDeltas: *monitoringMetricsAggregateDeltas, DescriptorCacheTTL: *monitoringDescriptorCacheTTL, DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) + }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1) From 105b38c695d7e03d3ad1786e77f28ae0c349f9e1 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 23 Jun 2023 11:44:07 -0400 Subject: [PATCH 5/6] Create a hash package and move fnv hashing Signed-off-by: Kyle Eckhart --- collectors/monitoring_metrics.go | 7 ++++--- delta/counter.go | 5 +++-- delta/histogram.go | 5 +++-- {collectors => hash}/fnv.go | 16 ++++++++-------- 4 files changed, 18 insertions(+), 15 deletions(-) rename {collectors => hash}/fnv.go (72%) diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index af0cea7a..dcc04e62 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -21,6 +21,7 @@ import ( "sort" + "github.com/prometheus-community/stackdriver_exporter/hash" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -196,13 +197,13 @@ func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, } func hashLabelKeys(labelKeys []string) uint64 { - dh := HashNew() + dh := hash.New() sortedKeys := make([]string, len(labelKeys)) copy(sortedKeys, labelKeys) sort.Strings(sortedKeys) for _, key := range sortedKeys { - dh = HashAdd(dh, key) - dh = hashAddByte(dh, separatorByte) + dh = hash.Add(dh, key) + dh = hash.AddByte(dh, hash.SeparatorByte) } return dh } diff --git a/delta/counter.go b/delta/counter.go index 188fc525..068b7f5c 100644 --- a/delta/counter.go +++ b/delta/counter.go @@ -25,6 +25,7 @@ import ( "google.golang.org/api/monitoring/v3" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/hash" ) type MetricEntry struct { @@ -95,8 +96,8 @@ func toCounterKey(c *collectors.ConstMetric) uint64 { keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) } hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) - h := collectors.HashNew() - h = collectors.HashAdd(h, hashText) + h := hash.New() + h = hash.Add(h, hashText) return h } diff --git a/delta/histogram.go b/delta/histogram.go index 30146eed..9672ebcc 100644 --- a/delta/histogram.go +++ b/delta/histogram.go @@ -25,6 +25,7 @@ import ( "google.golang.org/api/monitoring/v3" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/hash" ) type HistogramEntry struct { @@ -94,8 +95,8 @@ func toHistogramKey(hist *collectors.HistogramMetric) uint64 { keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) } hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) - h := collectors.HashNew() - h = collectors.HashAdd(h, hashText) + h := hash.New() + h = hash.Add(h, hashText) return h } diff --git a/collectors/fnv.go b/hash/fnv.go similarity index 72% rename from collectors/fnv.go rename to hash/fnv.go index 62c66087..07648036 100644 --- a/collectors/fnv.go +++ b/hash/fnv.go @@ -11,9 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package collectors +package hash -const separatorByte = 255 +const SeparatorByte = 255 // https://github.com/prometheus/client_golang/blob/master/prometheus/fnv.go // Inline and byte-free variant of hash/fnv's fnv64a. @@ -23,13 +23,13 @@ const ( prime64 = 1099511628211 ) -// HashNew initializies a new fnv64a hash value. -func HashNew() uint64 { +// New initializies a new fnv64a hash value. +func New() uint64 { return offset64 } -// HashAdd adds a string to a fnv64a hash value, returning the updated hash. -func HashAdd(h uint64, s string) uint64 { +// Add adds a string to a fnv64a hash value, returning the updated hash. +func Add(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 @@ -37,8 +37,8 @@ func HashAdd(h uint64, s string) uint64 { return h } -// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. -func hashAddByte(h uint64, b byte) uint64 { +// AddByte adds a byte to a fnv64a hash value, returning the updated hash. +func AddByte(h uint64, b byte) uint64 { h ^= uint64(b) h *= prime64 return h From dabdb3d83eae898961224de25102eaa0f0ebbda5 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 23 Jun 2023 11:44:58 -0400 Subject: [PATCH 6/6] Fix issue with Exported function with the unexported return type on timeSeriesMetrics Signed-off-by: Kyle Eckhart --- collectors/monitoring_collector.go | 2 +- collectors/monitoring_metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index eabac955..4f9ddb30 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -408,7 +408,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( var metricValueType prometheus.ValueType var newestTSPoint *monitoring.Point - timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, + timeSeriesMetrics, err := newTimeSeriesMetrics(metricDescriptor, ch, c.collectorFillMissingLabels, c.counterStore, diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index dcc04e62..a1b08046 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -47,7 +47,7 @@ type timeSeriesMetrics struct { aggregateDeltas bool } -func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, +func newTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, fillMissingLabels bool, counterStore DeltaCounterStore,