Skip to content

Commit

Permalink
tsdb: add Appender GetRefFunc interface
Browse files Browse the repository at this point in the history
So we can look up series references from a Builder.

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham committed Apr 8, 2024
1 parent db70680 commit d9b746c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 1 deletion.
8 changes: 8 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ type GetRef interface {
GetRef(lset labels.Labels, hash uint64) (SeriesRef, labels.Labels)
}

type GetRefFunc interface {
// Returns reference number that can be used to pass to Appender.Append(),
// and a set of labels that will not cause another copy when passed to Appender.Append().
// 0 means the appender does not have a reference to this series.
// hash should be a hash of lset. cmp should return true if labels match.
GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (SeriesRef, labels.Labels)
}

// ExemplarAppender provides an interface for adding samples to exemplar storage, which
// within Prometheus is in-memory only.
type ExemplarAppender interface {
Expand Down
12 changes: 11 additions & 1 deletion tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,10 @@ type dbAppender struct {
db *DB
}

var _ storage.GetRef = dbAppender{}
var (
_ storage.GetRef = dbAppender{}
_ storage.GetRefFunc = dbAppender{}
)

func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
if g, ok := a.Appender.(storage.GetRef); ok {
Expand All @@ -1181,6 +1184,13 @@ func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef,
return 0, labels.EmptyLabels()
}

func (a dbAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
if g, ok := a.Appender.(storage.GetRefFunc); ok {
return g.GetRefFunc(hash, cmp)
}
return 0, labels.EmptyLabels()
}

func (a dbAppender) Commit() error {
err := a.Appender.Commit()

Expand Down
25 changes: 25 additions & 0 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,21 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
return nil
}

// Fetch a series from the map, given a function which says whether it's the right Labels.
func (m *seriesHashmap) getByFunc(hash uint64, cmp func(labels.Labels) bool) *memSeries {
if s, found := m.unique[hash]; found {
if cmp(s.lset) {
return s
}
}
for _, s := range m.conflicts[hash] {
if cmp(s.lset) {
return s
}
}
return nil
}

func (m *seriesHashmap) set(hash uint64, s *memSeries) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
m.unique[hash] = s
Expand Down Expand Up @@ -2023,6 +2038,16 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
return series
}

func (s *stripeSeries) getByHashFunc(hash uint64, cmp func(labels.Labels) bool) *memSeries {
i := hash & uint64(s.size-1)

s.locks[i].RLock()
series := s.hashes[i].getByFunc(hash, cmp)
s.locks[i].RUnlock()

return series
}

func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
// PreCreation is called here to avoid calling it inside the lock.
// It is not necessary to call it just before creating a series,
Expand Down
16 changes: 16 additions & 0 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func (a *initAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
return 0, labels.EmptyLabels()
}

func (a *initAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
if g, ok := a.app.(storage.GetRefFunc); ok {
return g.GetRefFunc(hash, cmp)
}
return 0, labels.EmptyLabels()
}

func (a *initAppender) Commit() error {
if a.app == nil {
a.head.metrics.activeAppenders.Dec()
Expand Down Expand Up @@ -725,6 +732,15 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
return storage.SeriesRef(s.ref), s.lset
}

func (a *headAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
s := a.head.series.getByHashFunc(hash, cmp)
if s == nil {
return 0, labels.EmptyLabels()
}
// returned labels must be suitable to pass to Append()
return storage.SeriesRef(s.ref), s.lset
}

// log writes all headAppender's data to the WAL.
func (a *headAppender) log() error {
if a.head.wal == nil {
Expand Down

0 comments on commit d9b746c

Please sign in to comment.