Skip to content

Commit

Permalink
Locking at the entry level just in case external implementation changes
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Eckhart <[email protected]>
  • Loading branch information
kgeckhart committed Aug 29, 2022
1 parent a28d78e commit ffb190b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
41 changes: 26 additions & 15 deletions collectors/delta_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,58 @@ type DeltaCounterStore interface {
ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric
}

type metricEntry = map[uint64]*CollectedMetric
type metricEntry struct {
collected map[uint64]*CollectedMetric
mutex *sync.RWMutex
}

type inMemoryDeltaCounterStore struct {
store map[string]metricEntry
store map[string]*metricEntry
ttl time.Duration
storeMutex *sync.RWMutex
logger log.Logger
}

// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory
func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore {
return inMemoryDeltaCounterStore{
store: map[string]metricEntry{},
return &inMemoryDeltaCounterStore{
store: map[string]*metricEntry{},
storeMutex: &sync.RWMutex{},
logger: logger,
ttl: ttl,
}
}

func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) {
func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) {
if currentValue == nil {
return
}

var entry metricEntry
var entry *metricEntry
s.storeMutex.Lock()
if _, exists := s.store[metricDescriptor.Name]; !exists {
s.store[metricDescriptor.Name] = metricEntry{}
s.store[metricDescriptor.Name] = &metricEntry{
collected: map[uint64]*CollectedMetric{},
mutex: &sync.RWMutex{},
}
}
entry = s.store[metricDescriptor.Name]
s.storeMutex.Unlock()

key := toCounterKey(currentValue)
existing := entry[key]

entry.mutex.Lock()
defer entry.mutex.Unlock()
existing := entry.collected[key]

if existing == nil {
level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime)
entry[key] = &CollectedMetric{currentValue, time.Now()}
entry.collected[key] = &CollectedMetric{currentValue, time.Now()}
return
}

if existing.metric.reportTime.Before(currentValue.reportTime) {
level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry[key].metric.reportTime, "incoming_time", currentValue.reportTime)
level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime)
currentValue.value = currentValue.value + existing.metric.value
existing.metric = currentValue
existing.lastCollectedAt = time.Now()
Expand Down Expand Up @@ -113,24 +122,26 @@ func toCounterKey(c *ConstMetric) uint64 {
return h
}

func (s inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric {
func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric {
output := map[string][]*CollectedMetric{}
now := time.Now()
ttlWindowStart := now.Add(-s.ttl)

s.storeMutex.Lock()
metric := s.store[metricDescriptorName]
if metric == nil {
entry := s.store[metricDescriptorName]
if entry == nil {
s.storeMutex.Unlock()
return output
}
s.storeMutex.Unlock()

for key, collected := range metric {
entry.mutex.Lock()
defer entry.mutex.Unlock()
for key, collected := range entry.collected {
//Scan and remove metrics which are outside the TTL
if ttlWindowStart.After(collected.lastCollectedAt) {
level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName)
delete(metric, key)
delete(entry.collected, key)
continue
}

Expand Down
35 changes: 23 additions & 12 deletions collectors/delta_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,53 @@ type DeltaDistributionStore interface {
ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram
}

type histogramEntry = map[uint64]*CollectedHistogram
type histogramEntry struct {
collected map[uint64]*CollectedHistogram
mutex *sync.RWMutex
}

type inMemoryDeltaDistributionStore struct {
store map[string]histogramEntry
store map[string]*histogramEntry
ttl time.Duration
storeMutex *sync.RWMutex
logger log.Logger
}

// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory
func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore {
return inMemoryDeltaDistributionStore{
store: map[string]histogramEntry{},
return &inMemoryDeltaDistributionStore{
store: map[string]*histogramEntry{},
storeMutex: &sync.RWMutex{},
logger: logger,
ttl: ttl,
}
}

func (s inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) {
func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) {
if currentValue == nil {
return
}

var entry histogramEntry
var entry *histogramEntry
s.storeMutex.Lock()
if _, exists := s.store[metricDescriptor.Name]; !exists {
s.store[metricDescriptor.Name] = histogramEntry{}
s.store[metricDescriptor.Name] = &histogramEntry{
collected: map[uint64]*CollectedHistogram{},
mutex: &sync.RWMutex{},
}
}
entry = s.store[metricDescriptor.Name]
s.storeMutex.Unlock()

key := toHistogramKey(currentValue)
existing := entry[key]

entry.mutex.Lock()
defer entry.mutex.Unlock()
existing := entry.collected[key]

if existing == nil {
level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime)
entry[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()}
entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()}
return
}

Expand Down Expand Up @@ -133,7 +142,7 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo
return current
}

func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram {
func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram {
output := map[string][]*CollectedHistogram{}
now := time.Now()
ttlWindowStart := now.Add(-s.ttl)
Expand All @@ -146,11 +155,13 @@ func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string)
}
s.storeMutex.Unlock()

for key, collected := range entry {
entry.mutex.Lock()
defer entry.mutex.Unlock()
for key, collected := range entry.collected {
//Scan and remove metrics which are outside the TTL
if ttlWindowStart.After(collected.lastCollectedAt) {
level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName)
delete(entry, key)
delete(entry.collected, key)
continue
}

Expand Down

0 comments on commit ffb190b

Please sign in to comment.