Skip to content

Commit

Permalink
TSDB: Lock around access to series labels
Browse files Browse the repository at this point in the history
So we can modify them to reset the symbol-table.

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham committed May 9, 2024
1 parent 9c45470 commit 392e99d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 19 deletions.
21 changes: 14 additions & 7 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1800,12 +1800,12 @@ type seriesHashmap struct {

func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
if s, found := m.unique[hash]; found {
if labels.Equal(s.lset, lset) {
if labels.Equal(s.labels(), lset) {
return s
}
}
for _, s := range m.conflicts[hash] {
if labels.Equal(s.lset, lset) {
if labels.Equal(s.labels(), lset) {
return s
}
}
Expand All @@ -1828,7 +1828,7 @@ func (m *seriesHashmap) getByFunc(hash uint64, cmp func(labels.Labels) bool) *me
}

func (m *seriesHashmap) set(hash uint64, s *memSeries) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.labels(), s.labels()) {
m.unique[hash] = s
return
}
Expand All @@ -1837,7 +1837,7 @@ func (m *seriesHashmap) set(hash uint64, s *memSeries) {
}
l := m.conflicts[hash]
for i, prev := range l {
if labels.Equal(prev.lset, s.lset) {
if labels.Equal(prev.labels(), s.labels()) {
l[i] = s
return
}
Expand Down Expand Up @@ -1986,7 +1986,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
deleted[storage.SeriesRef(series.ref)] = struct{}{}
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
}

// Run through all series shard by shard, checking which should be deleted.
Expand Down Expand Up @@ -2079,7 +2079,7 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu
}
// Setting the series in the s.hashes marks the creation of series
// as any further calls to this methods would return that series.
s.seriesLifecycleCallback.PostCreation(series.lset)
s.seriesLifecycleCallback.PostCreation(series.labels())

i = uint64(series.ref) & uint64(s.size-1)

Expand Down Expand Up @@ -2122,7 +2122,6 @@ func (s sample) Type() chunkenc.ValueType {
type memSeries struct {
// Members up to the Mutex are not changed after construction, so can be accessed without a lock.
ref chunks.HeadSeriesRef
lset labels.Labels
meta *metadata.Metadata

// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
Expand All @@ -2135,6 +2134,7 @@ type memSeries struct {
sync.Mutex

// Everything after here should only be accessed with the lock.
lset labels.Labels

// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
Expand Down Expand Up @@ -2204,6 +2204,13 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64,
return s
}

// Helper method to access labels under lock.
func (s *memSeries) labels() labels.Labels {
s.Lock()
defer s.Unlock()
return s.lset
}

func (s *memSeries) minTime() int64 {
if len(s.mmappedChunks) > 0 {
return s.mmappedChunks[0].minTime
Expand Down
6 changes: 3 additions & 3 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()

err := a.head.exemplars.ValidateExemplar(s.lset, e)
err := a.head.exemplars.ValidateExemplar(s.labels(), e)
if err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) || errors.Is(err, storage.ErrExemplarsDisabled) {
// Duplicate, don't return an error but don't accept the exemplar.
Expand Down Expand Up @@ -715,7 +715,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
return 0, labels.EmptyLabels()
}
// returned labels must be suitable to pass to Append()
return storage.SeriesRef(s.ref), s.lset
return storage.SeriesRef(s.ref), s.labels()
}

func (a *headAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
Expand Down Expand Up @@ -832,7 +832,7 @@ func (a *headAppender) Commit() (err error) {
continue
}
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
if err := a.head.exemplars.AddExemplar(s.labels(), e.exemplar); err != nil {
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
continue
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
}

slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.lset, b.lset)
return labels.Compare(a.labels(), b.labels())
})

// Convert back to list.
Expand Down Expand Up @@ -200,7 +200,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.lset)
builder.Assign(s.labels())

if chks == nil {
return nil
Expand Down Expand Up @@ -270,7 +270,7 @@ func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef,
return "", storage.ErrNotFound
}

value := memSeries.lset.Get(label)
value := memSeries.labels().Get(label)
if value == "" {
return "", storage.ErrNotFound
}
Expand All @@ -290,7 +290,7 @@ func (h *headIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.Seri
if memSeries == nil {
return nil, storage.ErrNotFound
}
memSeries.lset.Range(func(lbl labels.Label) {
memSeries.labels().Range(func(lbl labels.Label) {
namesMap[lbl.Name] = struct{}{}
})
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
err = h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) {
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
) {
level.Debug(h.logger).Log(
"msg", "M-mapped chunks overlap on a duplicate series record",
"series", mSeries.lset.String(),
"series", mSeries.labels().String(),
"oldref", mSeries.ref,
"oldmint", mSeries.mmappedChunks[0].minTime,
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
Expand Down Expand Up @@ -934,7 +934,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {

buf.PutByte(chunkSnapshotRecordTypeSeries)
buf.PutBE64(uint64(s.ref))
record.EncodeLabels(&buf, s.lset)
record.EncodeLabels(&buf, s.labels())
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.

s.Lock()
Expand Down Expand Up @@ -1487,7 +1487,7 @@ Outer:
continue
}

if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
if err := h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{
Labels: e.Labels,
Value: e.V,
Ts: e.T,
Expand Down
2 changes: 1 addition & 1 deletion tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
oh.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.lset)
builder.Assign(s.labels())

if chks == nil {
return nil
Expand Down

0 comments on commit 392e99d

Please sign in to comment.