From c60e1f550f2d1777b11b1fffda91ea6fdbfc7900 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 17 Jan 2023 21:27:04 -0500 Subject: [PATCH] Create dedicated delta package, add tests, and rename distribution -> histogram Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 147 ------------------------- collectors/delta_distribution.go | 168 ----------------------------- collectors/fnv.go | 8 +- collectors/monitoring_collector.go | 24 +++-- collectors/monitoring_metrics.go | 148 ++++++++++++------------- delta/counter.go | 131 ++++++++++++++++++++++ delta/counter_test.go | 84 +++++++++++++++ delta/delta_suite_test.go | 26 +++++ delta/histogram.go | 150 ++++++++++++++++++++++++++ delta/histogram_test.go | 64 +++++++++++ delta/inmemory_test.go | 19 ++++ stackdriver_exporter.go | 3 +- 12 files changed, 573 insertions(+), 399 deletions(-) delete mode 100644 collectors/delta_counter.go delete mode 100644 collectors/delta_distribution.go create mode 100644 delta/counter.go create mode 100644 delta/counter_test.go create mode 100644 delta/delta_suite_test.go create mode 100644 delta/histogram.go create mode 100644 delta/histogram_test.go create mode 100644 delta/inmemory_test.go diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go deleted file mode 100644 index 0528bbcd..00000000 --- a/collectors/delta_counter.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedMetric struct { - metric *ConstMetric - lastCollectedAt time.Time -} - -// DeltaCounterStore defines a set of functions which must be implemented in order to be used as a DeltaCounterStore -// which accumulates DELTA Counter metrics over time -type DeltaCounterStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) []*CollectedMetric -} - -type metricEntry struct { - collected map[uint64]*CollectedMetric - mutex *sync.RWMutex -} - -type inMemoryDeltaCounterStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory -func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return &inMemoryDeltaCounterStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{ - collected: map[uint64]*CollectedMetric{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*metricEntry) - - key := toCounterKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) - entry.collected[key] = &CollectedMetric{currentValue, time.Now()} - return - } - - if existing.metric.ReportTime.Before(currentValue.ReportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.metric.Value, "adding", currentValue.Value, "last_reported_time", entry.collected[key].metric.ReportTime, "incoming_time", currentValue.ReportTime) - currentValue.Value = currentValue.Value + existing.metric.Value - existing.metric = currentValue - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.metric.ReportTime, "incoming_time", currentValue.ReportTime) -} - -func toCounterKey(c *ConstMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, c.LabelKeys...) - for i := range c.LabelKeys { - labels[c.LabelKeys[i]] = c.LabelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) []*CollectedMetric { - var output []*CollectedMetric - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*metricEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.FqName) - delete(entry.collected, key) - continue - } - metricCopy := *collected.metric - outputEntry := CollectedMetric{ - metric: &metricCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output = append(output, &outputEntry) - } - - return output -} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go deleted file mode 100644 index 657db59d..00000000 --- a/collectors/delta_distribution.go +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedHistogram struct { - histogram *HistogramMetric - lastCollectedAt time.Time -} - -// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore -// which accumulates DELTA histogram metrics over time -type DeltaDistributionStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) []*CollectedHistogram -} - -type histogramEntry struct { - collected map[uint64]*CollectedHistogram - mutex *sync.RWMutex -} - -type inMemoryDeltaDistributionStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory -func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { - return &inMemoryDeltaDistributionStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{ - collected: map[uint64]*CollectedHistogram{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*histogramEntry) - - key := toHistogramKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) - entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} - return - } - - if existing.histogram.ReportTime.Before(currentValue.ReportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) - existing.histogram = mergeHistograms(existing.histogram, currentValue) - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.histogram.ReportTime, "incoming_time", currentValue.ReportTime) -} - -func toHistogramKey(hist *HistogramMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, hist.LabelKeys...) - for i := range hist.LabelKeys { - labels[hist.LabelKeys[i]] = hist.LabelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { - for key, value := range existing.Buckets { - current.Buckets[key] += value - } - - // Calculate a new mean and overall count - mean := existing.Mean - mean += current.Mean - mean /= 2 - - var count uint64 - for _, v := range current.Buckets { - count += v - } - - current.Mean = mean - current.Count = count - - return current -} - -func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) []*CollectedHistogram { - var output []*CollectedHistogram - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*histogramEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.FqName) - delete(entry.collected, key) - continue - } - - histCopy := *collected.histogram - outputEntry := CollectedHistogram{ - histogram: &histCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output = append(output, &outputEntry) - } - - return output -} diff --git a/collectors/fnv.go b/collectors/fnv.go index 8891c287..62c66087 100644 --- a/collectors/fnv.go +++ b/collectors/fnv.go @@ -23,13 +23,13 @@ const ( prime64 = 1099511628211 ) -// hashNew initializies a new fnv64a hash value. -func hashNew() uint64 { +// HashNew initializies a new fnv64a hash value. +func HashNew() uint64 { return offset64 } -// hashAdd adds a string to a fnv64a hash value, returning the updated hash. -func hashAdd(h uint64, s string) uint64 { +// HashAdd adds a string to a fnv64a hash value, returning the updated hash. +func HashAdd(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index b01367e8..76591bc2 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -54,8 +54,8 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore aggregateDeltas bool } @@ -82,7 +82,17 @@ type MonitoringCollectorOptions struct { AggregateDeltas bool } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { +type DeltaCounterStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + ListMetrics(metricDescriptorName string) []*ConstMetric +} + +type DeltaHistogramStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + ListMetrics(metricDescriptorName string) []*HistogramMetric +} + +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, histogramStore DeltaHistogramStore) (*MonitoringCollector, error) { const subsystem = "monitoring" apiCallsTotalMetric := prometheus.NewCounter( @@ -162,8 +172,8 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, - deltaCounterStore: counterStore, - deltaDistributionStore: distributionStore, + counterStore: counterStore, + histogramStore: histogramStore, aggregateDeltas: opts.AggregateDeltas, } @@ -346,8 +356,8 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, ch, c.collectorFillMissingLabels, - c.deltaCounterStore, - c.deltaDistributionStore, + c.counterStore, + c.histogramStore, c.aggregateDeltas, ) if err != nil { diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 9d57d297..af0cea7a 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -41,27 +41,27 @@ type timeSeriesMetrics struct { constMetrics map[string][]*ConstMetric histogramMetrics map[string][]*HistogramMetric - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore - aggregateDeltas bool + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore + aggregateDeltas bool } func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, fillMissingLabels bool, - deltaCounterStore DeltaCounterStore, - deltaDistributionStore DeltaDistributionStore, + counterStore DeltaCounterStore, + histogramStore DeltaHistogramStore, aggregateDeltas bool) (*timeSeriesMetrics, error) { return &timeSeriesMetrics{ - metricDescriptor: descriptor, - ch: ch, - fillMissingLabels: fillMissingLabels, - constMetrics: make(map[string][]*ConstMetric), - histogramMetrics: make(map[string][]*HistogramMetric), - deltaCounterStore: deltaCounterStore, - deltaDistributionStore: deltaDistributionStore, - aggregateDeltas: aggregateDeltas, + metricDescriptor: descriptor, + ch: ch, + fillMissingLabels: fillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + counterStore: counterStore, + histogramStore: histogramStore, + aggregateDeltas: aggregateDeltas, }, nil } @@ -75,24 +75,26 @@ func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *pr } type ConstMetric struct { - FqName string - LabelKeys []string - ValueType prometheus.ValueType - Value float64 - LabelValues []string - ReportTime time.Time + FqName string + LabelKeys []string + ValueType prometheus.ValueType + Value float64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time KeysHash uint64 } type HistogramMetric struct { - FqName string - LabelKeys []string - Mean float64 - Count uint64 - Buckets map[float64]uint64 - LabelValues []string - ReportTime time.Time + FqName string + LabelKeys []string + Mean float64 + Count uint64 + Buckets map[float64]uint64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time KeysHash uint64 } @@ -103,19 +105,21 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time var v HistogramMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ - FqName: fqName, - LabelKeys: labelKeys, - Mean: dist.Mean, - Count: uint64(dist.Count), - Buckets: buckets, - LabelValues: labelValues, - ReportTime: reportTime, - KeysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + Mean: dist.Mean, + Count: uint64(dist.Count), + Buckets: buckets, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), + + KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaDistributionStore.Increment(t.metricDescriptor, &v) + t.histogramStore.Increment(t.metricDescriptor, &v) return } @@ -150,19 +154,20 @@ func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer var v ConstMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = ConstMetric{ - FqName: fqName, - LabelKeys: labelKeys, - ValueType: metricValueType, - Value: metricValue, - LabelValues: labelValues, - ReportTime: reportTime, + FqName: fqName, + LabelKeys: labelKeys, + ValueType: metricValueType, + Value: metricValue, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaCounterStore.Increment(t.metricDescriptor, &v) + t.counterStore.Increment(t.metricDescriptor, &v) return } @@ -191,12 +196,12 @@ func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, } func hashLabelKeys(labelKeys []string) uint64 { - dh := hashNew() + dh := HashNew() sortedKeys := make([]string, len(labelKeys)) copy(sortedKeys, labelKeys) sort.Strings(sortedKeys) for _, key := range sortedKeys { - dh = hashAdd(dh, key) + dh = HashAdd(dh, key) dh = hashAddByte(dh, separatorByte) } return dh @@ -249,32 +254,32 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaCounterStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.counterStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} for _, collected := range descriptorMetrics { // If the metric wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { + if reportingStartTime.After(collected.CollectionTime) { // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.metric.ReportTime).Truncate(time.Minute) - collected.metric.ReportTime = now.Add(-reportingLag) + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := constMetrics[collected.metric.FqName]; !exists { - constMetrics[collected.metric.FqName] = []*ConstMetric{} + if _, exists := constMetrics[collected.FqName]; !exists { + constMetrics[collected.FqName] = []*ConstMetric{} } - constMetrics[collected.metric.FqName] = append(constMetrics[collected.metric.FqName], collected.metric) + constMetrics[collected.FqName] = append(constMetrics[collected.FqName], collected) } else { t.ch <- t.newConstMetric( - collected.metric.FqName, - collected.metric.ReportTime, - collected.metric.LabelKeys, - collected.metric.ValueType, - collected.metric.Value, - collected.metric.LabelValues, + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.ValueType, + collected.Value, + collected.LabelValues, ) } } @@ -285,34 +290,33 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti } func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaDistributionStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.histogramStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} for _, collected := range descriptorMetrics { - // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { + if reportingStartTime.After(collected.CollectionTime) { // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.histogram.ReportTime).Truncate(time.Minute) - collected.histogram.ReportTime = now.Add(-reportingLag) + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) } if t.fillMissingLabels { - if _, exists := histograms[collected.histogram.FqName]; !exists { - histograms[collected.histogram.FqName] = []*HistogramMetric{} + if _, exists := histograms[collected.FqName]; !exists { + histograms[collected.FqName] = []*HistogramMetric{} } - histograms[collected.histogram.FqName] = append(histograms[collected.histogram.FqName], collected.histogram) + histograms[collected.FqName] = append(histograms[collected.FqName], collected) } else { t.ch <- t.newConstHistogram( - collected.histogram.FqName, - collected.histogram.ReportTime, - collected.histogram.LabelKeys, - collected.histogram.Mean, - collected.histogram.Count, - collected.histogram.Buckets, - collected.histogram.LabelValues, + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.Mean, + collected.Count, + collected.Buckets, + collected.LabelValues, ) } } diff --git a/delta/counter.go b/delta/counter.go new file mode 100644 index 00000000..188fc525 --- /dev/null +++ b/delta/counter.go @@ -0,0 +1,131 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) + +type MetricEntry struct { + Collected map[uint64]*collectors.ConstMetric + mutex *sync.RWMutex +} + +type InMemoryCounterStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryCounterStore returns an implementation of CounterStore which is persisted in-memory +func NewInMemoryCounterStore(logger log.Logger, ttl time.Duration) *InMemoryCounterStore { + store := &InMemoryCounterStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.ConstMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &MetricEntry{ + Collected: map[uint64]*collectors.ConstMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*MetricEntry) + + key := toCounterKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.Value, "adding", currentValue.Value, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + currentValue.Value = currentValue.Value + existing.Value + entry.Collected[key] = currentValue + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toCounterKey(c *collectors.ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.LabelKeys...) + for i := range c.LabelKeys { + labels[c.LabelKeys[i]] = c.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) + h := collectors.HashNew() + h = collectors.HashAdd(h, hashText) + + return h +} + +func (s *InMemoryCounterStore) ListMetrics(metricDescriptorName string) []*collectors.ConstMetric { + var output []*collectors.ConstMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*MetricEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + //Dereference to create shallow copy + metricCopy := *collected + output = append(output, &metricCopy) + } + + return output +} diff --git a/delta/counter_test.go b/delta/counter_test.go new file mode 100644 index 00000000..31fceceb --- /dev/null +++ b/delta/counter_test.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("Counter", func() { + var store *delta.InMemoryCounterStore + var metric *collectors.ConstMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryCounterStore(promlog.New(&promlog.Config{}), time.Minute) + metric = &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 10, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + }) + + It("can return tracked counters", func() { + store.Increment(descriptor, metric) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(metric)) + }) + + It("can increment counters multiple times", func() { + store.Increment(descriptor, metric) + + metric2 := &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 20, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second).Add(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + + store.Increment(descriptor, metric2) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0].Value).To(Equal(float64(30))) + }) + + It("will remove counters outside of TTL", func() { + metric.CollectionTime = metric.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, metric) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/delta/delta_suite_test.go b/delta/delta_suite_test.go new file mode 100644 index 00000000..f0e625f2 --- /dev/null +++ b/delta/delta_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestDelta(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Delta Suite") +} diff --git a/delta/histogram.go b/delta/histogram.go new file mode 100644 index 00000000..30146eed --- /dev/null +++ b/delta/histogram.go @@ -0,0 +1,150 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) + +type HistogramEntry struct { + Collected map[uint64]*collectors.HistogramMetric + mutex *sync.RWMutex +} + +type InMemoryHistogramStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryHistogramStore returns an implementation of HistogramStore which is persisted in-memory +func NewInMemoryHistogramStore(logger log.Logger, ttl time.Duration) *InMemoryHistogramStore { + store := &InMemoryHistogramStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.HistogramMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &HistogramEntry{ + Collected: map[uint64]*collectors.HistogramMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*HistogramEntry) + + key := toHistogramKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = mergeHistograms(existing, currentValue) + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toHistogramKey(hist *collectors.HistogramMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, hist.LabelKeys...) + for i := range hist.LabelKeys { + labels[hist.LabelKeys[i]] = hist.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) + h := collectors.HashNew() + h = collectors.HashAdd(h, hashText) + + return h +} + +func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric { + for key, value := range existing.Buckets { + current.Buckets[key] += value + } + + // Calculate a new mean and overall count + mean := existing.Mean + mean += current.Mean + mean /= 2 + + var count uint64 + for _, v := range current.Buckets { + count += v + } + + current.Mean = mean + current.Count = count + + return current +} + +func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric { + var output []*collectors.HistogramMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*HistogramEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + copy := *collected + output = append(output, ©) + } + + return output +} diff --git a/delta/histogram_test.go b/delta/histogram_test.go new file mode 100644 index 00000000..ac0e27f4 --- /dev/null +++ b/delta/histogram_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("HistogramStore", func() { + var store *delta.InMemoryHistogramStore + var histogram *collectors.HistogramMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute) + histogram = &collectors.HistogramMetric{ + FqName: "histogram_name", + LabelKeys: []string{"labelKey"}, + Mean: 10, + Count: 100, + Buckets: map[float64]uint64{1.00000000000000000001: 1000}, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 8765, + } + }) + + It("can return tracked histograms", func() { + store.Increment(descriptor, histogram) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(histogram)) + }) + + It("will remove histograms outside of TTL", func() { + histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, histogram) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/delta/inmemory_test.go b/delta/inmemory_test.go new file mode 100644 index 00000000..3f726ca6 --- /dev/null +++ b/delta/inmemory_test.go @@ -0,0 +1,19 @@ +package delta + +import ( + "time" + + . "github.com/onsi/ginkgo" +) + +type testMetric struct { + lastCollectedAt time.Time +} + +func (m testMetric) LastCollectedAt() time.Time { + return m.lastCollectedAt +} + +var _ = Describe("InMemoryStorage", func() { + +}) diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 35b7c373..21cd3931 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -35,6 +35,7 @@ import ( "gopkg.in/alecthomas/kingpin.v2" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -206,7 +207,7 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { FillMissingLabels: *collectorFillMissingLabels, DropDelegatedProjects: *monitoringDropDelegatedProjects, AggregateDeltas: *monitoringMetricsAggregateDeltas, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) + }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1)