From 8968b4e620298944c9c77c6a90e0e4a3803e5d10 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Wed, 27 Nov 2024 17:30:27 +0000 Subject: [PATCH] Wip --- tempodb/blocklist/poller.go | 93 ++++++++++++++++---------------- tempodb/blocklist/poller_test.go | 18 ++++--- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index e8571497510..4303c7f69b2 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -181,13 +181,13 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { var ( consecutiveErrorsRemaining = p.cfg.TolerateConsecutiveErrors - newBlockList = make([]*backend.BlockMeta, 0) - newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0) + newBlockList = make([]*backend.BlockMeta, 0, 1000) + newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0, 1000) err error ) for consecutiveErrorsRemaining >= 0 { - newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(ctx, tenantID, previous) + err = p.pollTenantAndCreateIndex(ctx, tenantID, previous, &newBlockList, &newCompactedBlockList) if err == nil { break } @@ -241,7 +241,9 @@ func (p *Poller) pollTenantAndCreateIndex( ctx context.Context, tenantID string, previous *List, -) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + newBlockList *[]*backend.BlockMeta, + newCompactedBlockList *[]*backend.CompactedBlockMeta, +) error { derivedCtx, span := tracer.Start(ctx, "Poller.pollTenantAndCreateIndex", trace.WithAttributes(attribute.String("tenant", tenantID))) defer span.End() @@ -260,7 +262,9 @@ func (p *Poller) pollTenantAndCreateIndex( span.SetAttributes(attribute.Int("metas", len(i.Meta))) span.SetAttributes(attribute.Int("compactedMetas", len(i.CompactedMeta))) - return i.Meta, i.CompactedMeta, nil + *newBlockList = append(*newBlockList, i.Meta...) + *newCompactedBlockList = append(*newCompactedBlockList, i.CompactedMeta...) + return nil } metricTenantIndexErrors.WithLabelValues(tenantID).Inc() @@ -268,7 +272,7 @@ func (p *Poller) pollTenantAndCreateIndex( // there was an error, return the error if we're not supposed to fallback to polling if !p.cfg.PollFallback { - return nil, nil, fmt.Errorf("failed to pull tenant index and no fallback configured: %w", err) + return fmt.Errorf("failed to pull tenant index and no fallback configured: %w", err) } // polling fallback is true, log the error and continue in this method to completely poll the backend @@ -279,52 +283,54 @@ func (p *Poller) pollTenantAndCreateIndex( // there was a failure to pull the tenant index and we are configured to fall // back to polling. metricTenantIndexBuilder.WithLabelValues(tenantID).Set(1) - blocklist, compactedBlocklist, err := p.pollTenantBlocks(derivedCtx, tenantID, previous) + err := p.pollTenantBlocks(derivedCtx, tenantID, previous, newBlockList, newCompactedBlockList) if err != nil { - return nil, nil, fmt.Errorf("failed to poll tenant blocks: %w", err) + return fmt.Errorf("failed to poll tenant blocks: %w", err) } // everything is happy, write this tenant index - level.Info(p.logger).Log("msg", "writing tenant index", "tenant", tenantID, "metas", len(blocklist), "compactedMetas", len(compactedBlocklist)) - err = p.writer.WriteTenantIndex(derivedCtx, tenantID, blocklist, compactedBlocklist) + level.Info(p.logger).Log("msg", "writing tenant index", "tenant", tenantID, "metas", len(*newBlockList), "compactedMetas", len(*newCompactedBlockList)) + err = p.writer.WriteTenantIndex(derivedCtx, tenantID, *newBlockList, *newCompactedBlockList) if err != nil { metricTenantIndexErrors.WithLabelValues(tenantID).Inc() level.Error(p.logger).Log("msg", "failed to write tenant index", "tenant", tenantID, "err", err) } - if len(blocklist) == 0 && len(compactedBlocklist) == 0 { + if len(*newBlockList) == 0 && len(*newCompactedBlockList) == 0 { err := p.deleteTenant(ctx, tenantID) if err != nil { - return nil, nil, fmt.Errorf("failed to delete tenant: %w", err) + return fmt.Errorf("failed to delete tenant: %w", err) } } metricTenantIndexAgeSeconds.WithLabelValues(tenantID).Set(0) - return blocklist, compactedBlocklist, nil + return nil } func (p *Poller) pollTenantBlocks( ctx context.Context, tenantID string, previous *List, -) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + newBlockList *[]*backend.BlockMeta, + newCompactedBlockList *[]*backend.CompactedBlockMeta, +) error { derivedCtx, span := tracer.Start(ctx, "Poller.pollTenantBlocks") defer span.End() currentBlockIDs, currentCompactedBlockIDs, err := p.reader.Blocks(derivedCtx, tenantID) if err != nil { - return nil, nil, fmt.Errorf("failed listing tenant blocks: %w", err) + return fmt.Errorf("failed listing tenant blocks: %w", err) } var ( - metas = previous.Metas(tenantID) - compactedMetas = previous.CompactedMetas(tenantID) - mm = make(map[backend.UUID]*backend.BlockMeta, len(metas)) - cm = make(map[backend.UUID]*backend.CompactedBlockMeta, len(compactedMetas)) - newBlockList = make([]*backend.BlockMeta, 0, len(currentBlockIDs)) - newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(currentCompactedBlockIDs)) - unknownBlockIDs = make(map[uuid.UUID]bool, 1000) + metas = previous.Metas(tenantID) + compactedMetas = previous.CompactedMetas(tenantID) + mm = make(map[backend.UUID]*backend.BlockMeta, len(metas)) + cm = make(map[backend.UUID]*backend.CompactedBlockMeta, len(compactedMetas)) + // newBlockList = make([]*backend.BlockMeta, 0, len(currentBlockIDs)) + // newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(currentCompactedBlockIDs)) + unknownBlockIDs = make(map[uuid.UUID]bool, 1000) ) span.SetAttributes(attribute.Int("metas", len(metas))) @@ -342,7 +348,7 @@ func (p *Poller) pollTenantBlocks( for _, blockID := range currentBlockIDs { // if we already have this block id in our previous list, use the existing data. if v, ok := mm[backend.UUID(blockID)]; ok { - newBlockList = append(newBlockList, v) + *newBlockList = append(*newBlockList, v) continue } unknownBlockIDs[blockID] = false @@ -352,7 +358,7 @@ func (p *Poller) pollTenantBlocks( for _, blockID := range currentCompactedBlockIDs { // if we already have this block id in our previous list, use the existing data. if v, ok := cm[backend.UUID(blockID)]; ok { - newCompactedBlocklist = append(newCompactedBlocklist, v) + *newCompactedBlockList = append(*newCompactedBlockList, v) continue } @@ -364,42 +370,39 @@ func (p *Poller) pollTenantBlocks( } - newM, newCm, err := p.pollUnknown(derivedCtx, unknownBlockIDs, tenantID) + err = p.pollUnknown(derivedCtx, unknownBlockIDs, tenantID, newBlockList, newCompactedBlockList) if err != nil { - return nil, nil, fmt.Errorf("failed reading unknown blocks: %w", err) + return fmt.Errorf("failed reading unknown blocks: %w", err) } - newBlockList = append(newBlockList, newM...) - newCompactedBlocklist = append(newCompactedBlocklist, newCm...) - - sort.Slice(newBlockList, func(i, j int) bool { - return newBlockList[i].StartTime.Before(newBlockList[j].StartTime) + sort.Slice(*newBlockList, func(i, j int) bool { + return (*newBlockList)[i].StartTime.Before((*newBlockList)[j].StartTime) }) - sort.Slice(newCompactedBlocklist, func(i, j int) bool { - return newCompactedBlocklist[i].StartTime.Before(newCompactedBlocklist[j].StartTime) + sort.Slice(*newCompactedBlockList, func(i, j int) bool { + return (*newCompactedBlockList)[i].StartTime.Before((*newCompactedBlockList)[j].StartTime) }) - return newBlockList, newCompactedBlocklist, nil + return nil } func (p *Poller) pollUnknown( ctx context.Context, unknownBlocks map[uuid.UUID]bool, tenantID string, -) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + newBlockList *[]*backend.BlockMeta, + newCompactedBlockList *[]*backend.CompactedBlockMeta, +) error { derivedCtx, span := tracer.Start(ctx, "pollUnknown", trace.WithAttributes( attribute.Int("unknownBlockIDs", len(unknownBlocks)), )) defer span.End() var ( - err error - errs []error - mtx sync.Mutex - bg = boundedwaitgroup.New(p.cfg.PollConcurrency) - newBlockList = make([]*backend.BlockMeta, 0, len(unknownBlocks)) - newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(unknownBlocks)) + err error + errs []error + mtx sync.Mutex + bg = boundedwaitgroup.New(p.cfg.PollConcurrency) ) for blockID, compacted := range unknownBlocks { @@ -423,12 +426,12 @@ func (p *Poller) pollUnknown( mtx.Lock() defer mtx.Unlock() if m != nil { - newBlockList = append(newBlockList, m) + *newBlockList = append(*newBlockList, m) return } if cm != nil { - newCompactedBlocklist = append(newCompactedBlocklist, cm) + *newCompactedBlockList = append(*newCompactedBlockList, cm) return } @@ -446,10 +449,10 @@ func (p *Poller) pollUnknown( span.SetStatus(codes.Error, "") span.RecordError(err) - return nil, nil, err + return err } - return newBlockList, newCompactedBlocklist, nil + return nil } func (p *Poller) pollBlock( diff --git a/tempodb/blocklist/poller_test.go b/tempodb/blocklist/poller_test.go index f0b27b1467b..9b7fad79317 100644 --- a/tempodb/blocklist/poller_test.go +++ b/tempodb/blocklist/poller_test.go @@ -1009,11 +1009,13 @@ func BenchmarkPoller10k(b *testing.B) { currentPerTenantCompacted := maps.Clone(previousPerTenantCompacted) var ( - c = newMockCompactor(currentPerTenantCompacted, false) - w = &backend.MockWriter{} - s = &mockJobSharder{owns: true} - r = newMockReader(currentPerTenant, currentPerTenantCompacted, false) - previous = newBlocklist(previousPerTenant, previousPerTenantCompacted) + c = newMockCompactor(currentPerTenantCompacted, false) + w = &backend.MockWriter{} + s = &mockJobSharder{owns: true} + r = newMockReader(currentPerTenant, currentPerTenantCompacted, false) + previous = newBlocklist(previousPerTenant, previousPerTenantCompacted) + newBlockList = make([]*backend.BlockMeta, 0, tc.tenantCount*tc.blocksPerTenant) + newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0, tc.tenantCount*tc.blocksPerTenant) ) // This mock reader returns error or nil based on the tenant ID @@ -1027,7 +1029,7 @@ func BenchmarkPoller10k(b *testing.B) { runName := fmt.Sprintf("%d-%d", tc.tenantCount, tc.blocksPerTenant) b.Run(runName, func(b *testing.B) { for tenant := range previousPerTenant { - benchmarkPollTenant(b, poller, tenant, previous) + benchmarkPollTenant(b, poller, tenant, previous, &newBlockList, &newCompactedBlockList) } }) } @@ -1162,10 +1164,10 @@ func BenchmarkFullPoller(b *testing.B) { } } -func benchmarkPollTenant(b *testing.B, poller *Poller, tenant string, previous *List) { +func benchmarkPollTenant(b *testing.B, poller *Poller, tenant string, previous *List, newBlockList *[]*backend.BlockMeta, newCompactedBlockList *[]*backend.CompactedBlockMeta) { b.ResetTimer() for n := 0; n < b.N; n++ { - _, _, err := poller.pollTenantBlocks(context.Background(), tenant, previous) + err := poller.pollTenantBlocks(context.Background(), tenant, previous, newBlockList, newCompactedBlockList) require.NoError(b, err) } }