Skip to content

Commit

Permalink
Merge pull request #378 from grafana/allow-forcing-usage-of-postings-…
Browse files Browse the repository at this point in the history
…for-matchers-cache

Allow forcing usage of PostingsForMatchersCache
  • Loading branch information
colega authored Dec 29, 2022
2 parents 0db77b4 + d23859d commit fa868fd
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 100 deletions.
2 changes: 1 addition & 1 deletion tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func OpenBlockWithCache(logger log.Logger, dir string, pool chunkenc.Pool, cache
if err != nil {
return nil, err
}
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize)
pfmc := NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize, false)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir)

Expand Down
43 changes: 29 additions & 14 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,23 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
WALCompression: false,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
WALCompression: false,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
HeadPostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
HeadPostingsForMatchersCacheForce: false,
}
}

Expand Down Expand Up @@ -189,6 +192,15 @@ type Options struct {
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
// If it is <=0, the default value is assumed.
OutOfOrderCapMax int64

// HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
HeadPostingsForMatchersCacheTTL time.Duration
// HeadPostingsForMatchersCacheSize is the maximum size of cached postings for matchers elements in the Head.
// It's ignored used when HeadPostingsForMatchersCacheTTL is 0.
HeadPostingsForMatchersCacheSize int
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
HeadPostingsForMatchersCacheForce bool
}

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
Expand Down Expand Up @@ -798,6 +810,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.PostingsForMatchersCacheTTL = opts.HeadPostingsForMatchersCacheTTL
headOpts.PostingsForMatchersCacheSize = opts.HeadPostingsForMatchersCacheSize
headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce
if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled
Expand Down
27 changes: 17 additions & 10 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ type HeadOptions struct {
EnableMemorySnapshotOnShutdown bool

IsolationDisabled bool

PostingsForMatchersCacheTTL time.Duration
PostingsForMatchersCacheSize int
PostingsForMatchersCacheForce bool
}

const (
Expand All @@ -179,15 +183,18 @@ const (

func DefaultHeadOptions() *HeadOptions {
ho := &HeadOptions{
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
ChunkRange: DefaultBlockDuration,
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
PostingsForMatchersCacheTTL: defaultPostingsForMatchersCacheTTL,
PostingsForMatchersCacheSize: defaultPostingsForMatchersCacheSize,
PostingsForMatchersCacheForce: false,
}
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
return ho
Expand Down Expand Up @@ -254,7 +261,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
stats: stats,
reg: r,

pfmc: NewPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, defaultPostingsForMatchersCacheSize),
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheSize, opts.PostingsForMatchersCacheForce),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err
Expand Down
12 changes: 8 additions & 4 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ type IndexPostingsReader interface {
Postings(name string, values ...string) (index.Postings, error)
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForMatchersCache {
// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
// If `ttl` is 0, then it only deduplicates in-flight requests.
// If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided.
func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int, force bool) *PostingsForMatchersCache {
b := &PostingsForMatchersCache{
calls: &sync.Map{},
cached: list.New(),

ttl: ttl,
cacheSize: cacheSize,
force: force,

timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,
Expand All @@ -43,7 +46,7 @@ func NewPostingsForMatchersCache(ttl time.Duration, cacheSize int) *PostingsForM
return b
}

// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in.
// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in or force is true.
type PostingsForMatchersCache struct {
calls *sync.Map

Expand All @@ -52,6 +55,7 @@ type PostingsForMatchersCache struct {

ttl time.Duration
cacheSize int
force bool

// timeNow is the time.Now that can be replaced for testing purposes
timeNow func() time.Time
Expand All @@ -60,7 +64,7 @@ type PostingsForMatchersCache struct {
}

func (c *PostingsForMatchersCache) PostingsForMatchers(ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
if !concurrent {
if !concurrent && !c.force {
return c.postingsForMatchers(ix, ms...)
}
c.expire()
Expand Down
147 changes: 76 additions & 71 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
func TestPostingsForMatchersCache(t *testing.T) {
const testCacheSize = 5
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize)
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
c := NewPostingsForMatchersCache(ttl, testCacheSize, force)
if c.postingsForMatchers == nil {
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
}
Expand All @@ -36,7 +36,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
return index.ErrPostings(expectedPostingsErr), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)

p, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, expectedMatchers...)
require.NoError(t, err)
Expand All @@ -52,7 +52,7 @@ func TestPostingsForMatchersCache(t *testing.T) {

c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return nil, expectedErr
}, &timeNowMock{})
}, &timeNowMock{}, false)

_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
require.Equal(t, expectedErr, err)
Expand All @@ -61,76 +61,81 @@ func TestPostingsForMatchersCache(t *testing.T) {
t.Run("happy case multiple concurrent calls: two same one different", func(t *testing.T) {
for _, cacheEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("cacheEnabled=%t", cacheEnabled), func(t *testing.T) {
calls := [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same
{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type
{labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name
{labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same
}
for _, forced := range []bool{true, false} {
concurrent := !forced
t.Run(fmt.Sprintf("forced=%t", forced), func(t *testing.T) {
calls := [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, // 1 same
{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar")}, // 2: different match type
{labels.MustNewMatcher(labels.MatchEqual, "diff", "bar")}, // 3: different name
{labels.MustNewMatcher(labels.MatchEqual, "foo", "diff")}, // 4: different value
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchEqual, "boo", "bam")}, // 5 same
}

// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func(ms []*labels.Matcher) string {
s := strings.Builder{}
for i, m := range ms {
if i > 0 {
s.WriteByte(',')
// we'll identify results by each call's error, and the error will be the string value of the first matcher
matchersString := func(ms []*labels.Matcher) string {
s := strings.Builder{}
for i, m := range ms {
if i > 0 {
s.WriteByte(',')
}
s.WriteString(m.String())
}
return s.String()
}
expectedResults := make([]string, len(calls))
for i, c := range calls {
expectedResults[i] = c[0].String()
}
s.WriteString(m.String())
}
return s.String()
}
expectedResults := make([]string, len(calls))
for i, c := range calls {
expectedResults[i] = c[0].String()
}

expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make(chan struct{}, expectedPostingsForMatchersCalls)
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
}
<-release
return nil, fmt.Errorf(matchersString(ms))
}, &timeNowMock{})

results := make([]string, len(calls))
resultsWg := sync.WaitGroup{}
resultsWg.Add(len(calls))

// perform all calls
for i := 0; i < len(calls); i++ {
go func(i int) {
_, err := c.PostingsForMatchers(indexForPostingsMock{}, true, calls[i]...)
results[i] = err.Error()
resultsWg.Done()
}(i)
}
expectedPostingsForMatchersCalls := 5
// we'll block all the calls until we receive the exact amount. if we receive more, WaitGroup will panic
called := make(chan struct{}, expectedPostingsForMatchersCalls)
release := make(chan struct{})
var ttl time.Duration
if cacheEnabled {
ttl = defaultPostingsForMatchersCacheTTL
}
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
select {
case called <- struct{}{}:
default:
}
<-release
return nil, fmt.Errorf(matchersString(ms))
}, &timeNowMock{}, forced)

results := make([]string, len(calls))
resultsWg := sync.WaitGroup{}
resultsWg.Add(len(calls))

// perform all calls
for i := 0; i < len(calls); i++ {
go func(i int) {
_, err := c.PostingsForMatchers(indexForPostingsMock{}, concurrent, calls[i]...)
results[i] = err.Error()
resultsWg.Done()
}(i)
}

// wait until all calls arrive to the mocked function
for i := 0; i < expectedPostingsForMatchersCalls; i++ {
<-called
}
// wait until all calls arrive to the mocked function
for i := 0; i < expectedPostingsForMatchersCalls; i++ {
<-called
}

// let them all return
close(release)
// let them all return
close(release)

// wait for the results
resultsWg.Wait()
// wait for the results
resultsWg.Wait()

// check that we got correct results
for i, c := range calls {
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
// check that we got correct results
for i, c := range calls {
require.Equal(t, matchersString(c), results[i], "Call %d should have returned error %q, but got %q instead", i, matchersString(c), results[i])
}
})
}
})
}
Expand All @@ -143,7 +148,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)

// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, false, expectedMatchers...)
Expand All @@ -163,7 +168,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, &timeNowMock{})
}, &timeNowMock{}, false)

// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
Expand All @@ -186,7 +191,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
call++
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
}, timeNow)
}, timeNow, false)

// first call, fills the cache
p, err := c.PostingsForMatchers(indexForPostingsMock{}, true, expectedMatchers...)
Expand Down Expand Up @@ -220,7 +225,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
k := matchersKey(ms)
callsPerMatchers[k]++
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
}, timeNow)
}, timeNow, false)

// each one of the first testCacheSize calls is cached properly
for _, matchers := range calls {
Expand Down

0 comments on commit fa868fd

Please sign in to comment.