Skip to content

Commit

Permalink
Use sync.Map to fix concurrent read/write issue
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart committed Sep 9, 2022
1 parent ffb190b commit b26820c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 46 deletions.
37 changes: 14 additions & 23 deletions collectors/delta_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,17 @@ type metricEntry struct {
}

type inMemoryDeltaCounterStore struct {
store map[string]*metricEntry
ttl time.Duration
storeMutex *sync.RWMutex
logger log.Logger
store *sync.Map
ttl time.Duration
logger log.Logger
}

// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory
func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore {
return &inMemoryDeltaCounterStore{
store: map[string]*metricEntry{},
storeMutex: &sync.RWMutex{},
logger: logger,
ttl: ttl,
store: &sync.Map{},
logger: logger,
ttl: ttl,
}
}

Expand All @@ -69,16 +67,11 @@ func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.Metri
return
}

var entry *metricEntry
s.storeMutex.Lock()
if _, exists := s.store[metricDescriptor.Name]; !exists {
s.store[metricDescriptor.Name] = &metricEntry{
collected: map[uint64]*CollectedMetric{},
mutex: &sync.RWMutex{},
}
}
entry = s.store[metricDescriptor.Name]
s.storeMutex.Unlock()
tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{
collected: map[uint64]*CollectedMetric{},
mutex: &sync.RWMutex{},
})
entry := tmp.(*metricEntry)

key := toCounterKey(currentValue)

Expand Down Expand Up @@ -127,13 +120,11 @@ func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map
now := time.Now()
ttlWindowStart := now.Add(-s.ttl)

s.storeMutex.Lock()
entry := s.store[metricDescriptorName]
if entry == nil {
s.storeMutex.Unlock()
tmp, exists := s.store.Load(metricDescriptorName)
if !exists {
return output
}
s.storeMutex.Unlock()
entry := tmp.(*metricEntry)

entry.mutex.Lock()
defer entry.mutex.Unlock()
Expand Down
37 changes: 14 additions & 23 deletions collectors/delta_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,17 @@ type histogramEntry struct {
}

type inMemoryDeltaDistributionStore struct {
store map[string]*histogramEntry
ttl time.Duration
storeMutex *sync.RWMutex
logger log.Logger
store *sync.Map
ttl time.Duration
logger log.Logger
}

// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory
func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore {
return &inMemoryDeltaDistributionStore{
store: map[string]*histogramEntry{},
storeMutex: &sync.RWMutex{},
logger: logger,
ttl: ttl,
store: &sync.Map{},
logger: logger,
ttl: ttl,
}
}

Expand All @@ -69,16 +67,11 @@ func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.
return
}

var entry *histogramEntry
s.storeMutex.Lock()
if _, exists := s.store[metricDescriptor.Name]; !exists {
s.store[metricDescriptor.Name] = &histogramEntry{
collected: map[uint64]*CollectedHistogram{},
mutex: &sync.RWMutex{},
}
}
entry = s.store[metricDescriptor.Name]
s.storeMutex.Unlock()
tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{
collected: map[uint64]*CollectedHistogram{},
mutex: &sync.RWMutex{},
})
entry := tmp.(*histogramEntry)

key := toHistogramKey(currentValue)

Expand Down Expand Up @@ -147,13 +140,11 @@ func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string
now := time.Now()
ttlWindowStart := now.Add(-s.ttl)

s.storeMutex.Lock()
entry := s.store[metricDescriptorName]
if entry == nil {
s.storeMutex.Unlock()
tmp, exists := s.store.Load(metricDescriptorName)
if !exists {
return output
}
s.storeMutex.Unlock()
entry := tmp.(*histogramEntry)

entry.mutex.Lock()
defer entry.mutex.Unlock()
Expand Down

0 comments on commit b26820c

Please sign in to comment.