From db9fb73d0afa95a23e9b07a1066a81383cc7c7aa Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Mon, 16 Dec 2024 17:15:38 +0100 Subject: [PATCH 1/7] rythm: fix ingestion slack time range --- modules/blockbuilder/blockbuilder.go | 10 +-- modules/blockbuilder/blockbuilder_test.go | 6 +- modules/blockbuilder/partition_writer.go | 24 +------ modules/blockbuilder/partition_writer_test.go | 59 +++++++++++++++ modules/blockbuilder/tenant_store.go | 48 ++++++++++++- modules/blockbuilder/tenant_store_test.go | 46 ++++++++++++ .../processor/localblocks/processor.go | 25 +++++++ modules/ingester/instance_test.go | 2 +- pkg/util/test/req.go | 72 ++++++++++++++++--- tempodb/encoding/common/interfaces.go | 3 + tempodb/encoding/v2/wal_block.go | 4 ++ tempodb/encoding/vparquet2/wal_block.go | 4 ++ tempodb/encoding/vparquet3/wal_block.go | 4 ++ tempodb/encoding/vparquet4/wal_block.go | 8 ++- 14 files changed, 273 insertions(+), 42 deletions(-) create mode 100644 modules/blockbuilder/partition_writer_test.go create mode 100644 modules/blockbuilder/tenant_store_test.go diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 29ca0895c3c..ce1ba729324 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -267,7 +267,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa // Continue consuming in sections until we're caught up. for !sectionEndTime.After(cycleEndTime) { - newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag) + newCommitAt, err := b.consumePartitionSection(ctx, partition, commitRecTs, sectionEndTime, partitionLag) if err != nil { return fmt.Errorf("failed to consume partition section: %w", err) } @@ -281,7 +281,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa return nil } -func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) { +func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionStartTime time.Time, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) { level.Info(b.logger).Log( "msg", "consuming partition section", "partition", partition, @@ -353,7 +353,7 @@ consumerLoop: break consumerLoop } - err := b.pushTraces(rec.Key, rec.Value, writer) // TODO - Batch pushes by tenant + err := b.pushTraces(rec.Key, rec.Value, writer, sectionStartTime) // TODO - Batch pushes by tenant if err != nil { // All "non-terminal" errors are handled by the TSDBBuilder. return lag.Commit.At, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err) @@ -403,14 +403,14 @@ func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) erro return nil } -func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error { +func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter, sectionStartTime time.Time) error { req, err := b.decoder.Decode(reqBytes) if err != nil { return fmt.Errorf("failed to decode trace: %w", err) } defer b.decoder.Reset() - return p.pushBytes(string(tenantBytes), req) + return p.pushBytes(string(tenantBytes), req, sectionStartTime) } func (b *BlockBuilder) getAssignedActivePartitions() []int32 { diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index bef82771aaf..2b15793ac63 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -493,8 +493,10 @@ func countFlushedTraces(store storage.Store) int { func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record { traceID := generateTraceID(t) - - req := test.MakePushBytesRequest(t, 10, traceID) + now := time.Now() + startTime := uint64(now.UnixNano()) + endTime := uint64(now.Add(time.Second).UnixNano()) + req := test.MakePushBytesRequest(t, 10, traceID, startTime, endTime) records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000) require.NoError(t, err) diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index 9a9830e02ea..6aa509107dc 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -18,7 +18,7 @@ import ( ) type partitionSectionWriter interface { - pushBytes(tenant string, req *tempopb.PushBytesRequest) error + pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error flush(ctx context.Context, store tempodb.Writer) error } @@ -50,7 +50,7 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, b } } -func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { +func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error { level.Debug(p.logger).Log( "msg", "pushing bytes", "tenant", tenant, @@ -68,25 +68,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { if err := proto.Unmarshal(trace.Slice, tr); err != nil { return fmt.Errorf("failed to unmarshal trace: %w", err) } - - var start, end uint64 - for _, b := range tr.ResourceSpans { - for _, ss := range b.ScopeSpans { - for _, s := range ss.Spans { - if start == 0 || s.StartTimeUnixNano < start { - start = s.StartTimeUnixNano - } - if s.EndTimeUnixNano > end { - end = s.EndTimeUnixNano - } - } - } - } - - startSeconds := uint32(start / uint64(time.Second)) - endSeconds := uint32(end / uint64(time.Second)) - - if err := i.AppendTrace(req.Ids[j], tr, startSeconds, endSeconds); err != nil { + if err := i.AppendTrace(req.Ids[j], tr, startTime); err != nil { return err } } diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go new file mode 100644 index 00000000000..368560fb72a --- /dev/null +++ b/modules/blockbuilder/partition_writer_test.go @@ -0,0 +1,59 @@ +package blockbuilder + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/wal" + "github.com/stretchr/testify/require" +) + +func getPartitionWriter(t *testing.T) *writer { + logger := log.NewNopLogger() + cycleEndTs := time.Now().Unix() + blockCfg := BlockConfig{} + tmpDir := t.TempDir() + w, err := wal.New(&wal.Config{ + Filepath: tmpDir, + Encoding: backend.EncNone, + IngestionSlack: 3 * time.Minute, + Version: encoding.DefaultEncoding().Version(), + }) + require.NoError(t, err) + + return newPartitionSectionWriter(logger, 1, cycleEndTs, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) +} + +func TestPushBytes(t *testing.T) { + pw := getPartitionWriter(t) + + tenant := "test-tenant" + traceID := generateTraceID(t) + now := time.Now() + startTime := uint64(now.UnixNano()) + endTime := uint64(now.Add(time.Second).UnixNano()) + req := test.MakePushBytesRequest(t, 1, traceID, startTime, endTime) + + err := pw.pushBytes(tenant, req, now) + require.NoError(t, err) +} + +func TestPushBytes_UnmarshalError(t *testing.T) { + pw := getPartitionWriter(t) + + tenant := "test-tenant" + traceID := []byte{1, 2, 3, 4} + req := &tempopb.PushBytesRequest{ + Ids: [][]byte{traceID}, + Traces: []tempopb.PreallocBytes{{Slice: []byte{1, 2}}}, + } + + err := pw.pushBytes(tenant, req, time.Now()) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to unmarshal trace") +} diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index 58fceffcc05..15b7a2e66b8 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -3,11 +3,13 @@ package blockbuilder import ( "context" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/tempo/modules/blockbuilder/util" + "github.com/grafana/tempo/pkg/dataquality" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/tempodb" @@ -104,13 +106,55 @@ func (s *tenantStore) resetHeadBlock() error { return nil } -func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error { +func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, startTime time.Time) error { // TODO - Do this async, it slows down consumption if err := s.cutHeadBlock(false); err != nil { return err } + start, end := getTraceTimeRange(tr) + start, end = s.adjustTimeRangeForSlack(startTime, start, end) - return s.headBlock.AppendTrace(traceID, tr, start, end) + return s.headBlock.AppendTrace(traceID, tr, uint32(start), uint32(end)) +} + +func getTraceTimeRange(tr *tempopb.Trace) (startSeconds uint64, endSeconds uint64) { + for _, b := range tr.ResourceSpans { + for _, ss := range b.ScopeSpans { + for _, s := range ss.Spans { + if startSeconds == 0 || s.StartTimeUnixNano < startSeconds { + startSeconds = s.StartTimeUnixNano + } + if s.EndTimeUnixNano > endSeconds { + endSeconds = s.EndTimeUnixNano + } + } + } + } + startSeconds = startSeconds / uint64(time.Second) + endSeconds = endSeconds / uint64(time.Second) + + return +} + +func (s *tenantStore) adjustTimeRangeForSlack(startTime time.Time, start, end uint64) (uint64, uint64) { + startOfRange := uint64(startTime.Add(-s.headBlock.IngestionSlack()).Unix()) + endOfRange := uint64(startTime.Add(s.headBlock.IngestionSlack()).Unix()) + + warn := false + if start < startOfRange { + warn = true + start = uint64(startTime.Unix()) + } + if end > endOfRange || end < start { + warn = true + end = uint64(startTime.Unix()) + } + + if warn { + dataquality.WarnOutsideIngestionSlack(s.headBlock.BlockMeta().TenantID) + } + + return start, end } func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { diff --git a/modules/blockbuilder/tenant_store_test.go b/modules/blockbuilder/tenant_store_test.go new file mode 100644 index 00000000000..2e0ba5f1d5f --- /dev/null +++ b/modules/blockbuilder/tenant_store_test.go @@ -0,0 +1,46 @@ +package blockbuilder + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/wal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func getTenantStore(t *testing.T) (*tenantStore, error) { + logger := log.NewNopLogger() + cycleEndTs := time.Now().Unix() + blockCfg := BlockConfig{} + tmpDir := t.TempDir() + w, err := wal.New(&wal.Config{ + Filepath: tmpDir, + Encoding: backend.EncNone, + IngestionSlack: 3 * time.Minute, + Version: encoding.DefaultEncoding().Version(), + }) + require.NoError(t, err) + return newTenantStore("test-tenant", 1, cycleEndTs, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{}) +} + +func TestAppendTraceHonorCycleTime(t *testing.T) { + store, err := getTenantStore(t) + require.NoError(t, err) + + traceID := []byte("test-trace-id") + start := time.Now().Add(-1 * time.Hour) + end := time.Now().Add(-1 * time.Hour) + trace := test.MakeTraceWithTimeRange(1, traceID, uint64(start.UnixNano()), uint64(end.UnixNano())) + startTime := time.Now().Add(-1 * time.Hour) + + err = store.AppendTrace(traceID, trace, startTime) + require.NoError(t, err) + + assert.Equal(t, start.Unix(), store.headBlock.BlockMeta().StartTime.Unix()) + assert.Equal(t, end.Unix(), store.headBlock.BlockMeta().EndTime.Unix()) +} diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 0454cd08819..ea6366ac841 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -15,6 +15,7 @@ import ( "github.com/golang/groupcache/lru" "github.com/google/uuid" "github.com/grafana/tempo/modules/ingester" + "github.com/grafana/tempo/pkg/dataquality" "github.com/grafana/tempo/pkg/flushqueues" "github.com/grafana/tempo/pkg/tracesizes" "github.com/grafana/tempo/tempodb" @@ -662,6 +663,8 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error { startSeconds := uint32(start / uint64(time.Second)) endSeconds := uint32(end / uint64(time.Second)) + startSeconds, endSeconds = p.adjustTimeRangeForSlack(startSeconds, endSeconds) + err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds) if err != nil { return err @@ -670,6 +673,28 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error { return nil } +func (p *Processor) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) { + now := time.Now() + startOfRange := uint32(-now.Add(p.headBlock.IngestionSlack()).Unix()) + endOfRange := uint32(now.Add(p.headBlock.IngestionSlack()).Unix()) + + warn := false + if start < startOfRange { + warn = true + start = uint32(now.Unix()) + } + if end > endOfRange || end < start { + warn = true + end = uint32(now.Unix()) + } + + if warn { + dataquality.WarnOutsideIngestionSlack(p.headBlock.BlockMeta().TenantID) + } + + return start, end +} + func (p *Processor) resetHeadBlock() error { meta := &backend.BlockMeta{ BlockID: backend.NewUUID(), diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index fe1d1bc956f..dfab0754a91 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -961,7 +961,7 @@ func makeBatchWithMaxBytes(maxBytes int, traceID []byte) *v1_trace.ResourceSpans batch := test.MakeBatch(1, traceID) for batch.Size() < maxBytes { - batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0)) + batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpan(traceID)) } return batch diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index cc6ecc9e4b4..5d85645adbb 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -18,10 +18,17 @@ import ( ) func MakeSpan(traceID []byte) *v1_trace.Span { - return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1) + now := time.Now() + startTime := uint64(now.UnixNano()) + endTime := uint64(now.Add(time.Second).UnixNano()) + return makeSpanWithAttributeCount(traceID, rand.Int()%10+1, startTime, endTime) +} + +func MakeSpanWithTimeWindow(traceID []byte, startTime uint64, endTime uint64) *v1_trace.Span { + return makeSpanWithAttributeCount(traceID, rand.Int()%10+1, startTime, endTime) } -func MakeSpanWithAttributeCount(traceID []byte, count int) *v1_trace.Span { +func makeSpanWithAttributeCount(traceID []byte, count int, startTime uint64, endTime uint64) *v1_trace.Span { attributes := make([]*v1_common.KeyValue, 0, count) for i := 0; i < count; i++ { attributes = append(attributes, &v1_common.KeyValue{ @@ -29,8 +36,6 @@ func MakeSpanWithAttributeCount(traceID []byte, count int) *v1_trace.Span { Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: RandomString()}}, }) } - - now := time.Now() s := &v1_trace.Span{ Name: "test", TraceId: traceID, @@ -41,8 +46,8 @@ func MakeSpanWithAttributeCount(traceID []byte, count int) *v1_trace.Span { Code: 1, Message: "OK", }, - StartTimeUnixNano: uint64(now.UnixNano()), - EndTimeUnixNano: uint64(now.Add(time.Second).UnixNano()), + StartTimeUnixNano: startTime, + EndTimeUnixNano: endTime, Attributes: attributes, DroppedLinksCount: rand.Uint32(), DroppedAttributesCount: rand.Uint32(), @@ -142,6 +147,43 @@ func MakeBatch(spans int, traceID []byte) *v1_trace.ResourceSpans { return batch } +func makeBatchWithTimeRange(spans int, traceID []byte, startTime, endTime uint64) *v1_trace.ResourceSpans { + traceID = ValidTraceID(traceID) + + batch := &v1_trace.ResourceSpans{ + Resource: &v1_resource.Resource{ + Attributes: []*v1_common.KeyValue{ + { + Key: "service.name", + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{ + StringValue: "test-service", + }, + }, + }, + }, + }, + } + var ss *v1_trace.ScopeSpans + + for i := 0; i < spans; i++ { + // occasionally make a new ss + if ss == nil || rand.Int()%3 == 0 { + ss = &v1_trace.ScopeSpans{ + Scope: &v1_common.InstrumentationScope{ + Name: "super library", + Version: "0.0.1", + }, + } + + batch.ScopeSpans = append(batch.ScopeSpans, ss) + } + + ss.Spans = append(ss.Spans, MakeSpanWithTimeWindow(traceID, startTime, endTime)) + } + return batch +} + func MakeTrace(requests int, traceID []byte) *tempopb.Trace { traceID = ValidTraceID(traceID) @@ -156,6 +198,20 @@ func MakeTrace(requests int, traceID []byte) *tempopb.Trace { return trace } +func MakeTraceWithTimeRange(requests int, traceID []byte, startTime, endTime uint64) *tempopb.Trace { + traceID = ValidTraceID(traceID) + + trace := &tempopb.Trace{ + ResourceSpans: make([]*v1_trace.ResourceSpans, 0), + } + + for i := 0; i < requests; i++ { + trace.ResourceSpans = append(trace.ResourceSpans, makeBatchWithTimeRange(rand.Int()%20+1, traceID, startTime, endTime)) + } + + return trace +} + func MakeTraceWithSpanCount(requests int, spansEach int, traceID []byte) *tempopb.Trace { trace := &tempopb.Trace{ ResourceSpans: make([]*v1_trace.ResourceSpans, 0), @@ -355,8 +411,8 @@ func MakeTraceWithTags(traceID []byte, service string, intValue int64) *tempopb. return trace } -func MakePushBytesRequest(t *testing.T, requests int, traceID []byte) *tempopb.PushBytesRequest { - trace := MakeTrace(requests, traceID) +func MakePushBytesRequest(t *testing.T, requests int, traceID []byte, startTime, endTime uint64) *tempopb.PushBytesRequest { + trace := MakeTraceWithTimeRange(requests, traceID, startTime, endTime) b, err := proto.Marshal(trace) require.NoError(t, err) diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index ef3cd6b9969..c4ddb422c8b 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -2,6 +2,7 @@ package common import ( "context" + "time" "github.com/go-kit/log" @@ -112,6 +113,8 @@ type WALBlock interface { AppendTrace(id ID, tr *tempopb.Trace, start, end uint32) error + IngestionSlack() time.Duration + // Flush any unbuffered data to disk. Must be safe for concurrent use with read operations. Flush() error diff --git a/tempodb/encoding/v2/wal_block.go b/tempodb/encoding/v2/wal_block.go index 5d264821b1a..86935deefea 100644 --- a/tempodb/encoding/v2/wal_block.go +++ b/tempodb/encoding/v2/wal_block.go @@ -241,6 +241,10 @@ func (a *walBlock) Clear() error { return os.Remove(name) } +func (a *walBlock) IngestionSlack() time.Duration { + return a.ingestionSlack +} + // FindTraceByID Find implements common.Finder func (a *walBlock) FindTraceByID(ctx context.Context, id common.ID, _ common.SearchOptions) (*tempopb.Trace, error) { _, span := tracer.Start(ctx, "v2WalBlock.FindTraceByID") diff --git a/tempodb/encoding/vparquet2/wal_block.go b/tempodb/encoding/vparquet2/wal_block.go index 5f089e0fbb4..532094d4d60 100644 --- a/tempodb/encoding/vparquet2/wal_block.go +++ b/tempodb/encoding/vparquet2/wal_block.go @@ -344,6 +344,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui return nil } +func (b *walBlock) IngestionSlack() time.Duration { + return b.ingestionSlack +} + func (b *walBlock) Validate(context.Context) error { return common.ErrUnsupported } diff --git a/tempodb/encoding/vparquet3/wal_block.go b/tempodb/encoding/vparquet3/wal_block.go index 69c89d75867..2168ae218d2 100644 --- a/tempodb/encoding/vparquet3/wal_block.go +++ b/tempodb/encoding/vparquet3/wal_block.go @@ -410,6 +410,10 @@ func (b *walBlock) openWriter() (err error) { return nil } +func (b *walBlock) IngestionSlack() time.Duration { + return b.ingestionSlack +} + func (b *walBlock) Flush() (err error) { if b.ids.Len() == 0 { return nil diff --git a/tempodb/encoding/vparquet4/wal_block.go b/tempodb/encoding/vparquet4/wal_block.go index 627d1180c2a..7d4cf1240cc 100644 --- a/tempodb/encoding/vparquet4/wal_block.go +++ b/tempodb/encoding/vparquet4/wal_block.go @@ -315,6 +315,10 @@ func (b *walBlock) BlockMeta() *backend.BlockMeta { return b.meta } +func (b *walBlock) IngestionSlack() time.Duration { + return b.ingestionSlack +} + func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { // if decoder = nil we were created with OpenWALBlock and will not accept writes if b.decoder == nil { @@ -325,7 +329,7 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { if err != nil { return fmt.Errorf("error preparing trace for read: %w", err) } - + start, end = b.adjustTimeRangeForSlack(start, end) return b.AppendTrace(id, trace, start, end) } @@ -339,8 +343,6 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) } - start, end = b.adjustTimeRangeForSlack(start, end) - // add to current _, err := b.writer.Write([]*Trace{b.buffer}) if err != nil { From e2b06542fe61c881d4b70d742dbb3c7beb7dbcf6 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 19 Dec 2024 11:23:57 +0100 Subject: [PATCH 2/7] fix linting --- modules/blockbuilder/partition_writer_test.go | 2 +- modules/blockbuilder/tenant_store_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go index 368560fb72a..e8c01205aa7 100644 --- a/modules/blockbuilder/partition_writer_test.go +++ b/modules/blockbuilder/partition_writer_test.go @@ -15,7 +15,7 @@ import ( func getPartitionWriter(t *testing.T) *writer { logger := log.NewNopLogger() - cycleEndTs := time.Now().Unix() + cycleEndTs := uint64(time.Now().Unix()) blockCfg := BlockConfig{} tmpDir := t.TempDir() w, err := wal.New(&wal.Config{ diff --git a/modules/blockbuilder/tenant_store_test.go b/modules/blockbuilder/tenant_store_test.go index 2e0ba5f1d5f..70d2282a3a9 100644 --- a/modules/blockbuilder/tenant_store_test.go +++ b/modules/blockbuilder/tenant_store_test.go @@ -15,7 +15,7 @@ import ( func getTenantStore(t *testing.T) (*tenantStore, error) { logger := log.NewNopLogger() - cycleEndTs := time.Now().Unix() + cycleEndTs := uint64(time.Now().Unix()) blockCfg := BlockConfig{} tmpDir := t.TempDir() w, err := wal.New(&wal.Config{ From fe8253b79a60e240f3627e512d9c92ba5b249d24 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 19 Dec 2024 12:21:54 +0100 Subject: [PATCH 3/7] fix startTime for partition writer --- modules/blockbuilder/blockbuilder.go | 8 ++++---- modules/blockbuilder/partition_writer.go | 10 ++++++---- modules/blockbuilder/partition_writer_test.go | 9 +++++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 0f404c8984b..3ed1eab1782 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in }(time.Now()) // TODO - Review what endTimestamp is used here - writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. @@ -353,7 +353,7 @@ consumerLoop: break consumerLoop } - err := b.pushTraces(rec.Key, rec.Value, writer, sectionStartTime) // TODO - Batch pushes by tenant + err := b.pushTraces(rec.Key, rec.Value, writer) // TODO - Batch pushes by tenant if err != nil { // All "non-terminal" errors are handled by the TSDBBuilder. return lag.Commit.At, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err) @@ -403,14 +403,14 @@ func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) erro return nil } -func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter, sectionStartTime time.Time) error { +func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error { req, err := b.decoder.Decode(reqBytes) if err != nil { return fmt.Errorf("failed to decode trace: %w", err) } defer b.decoder.Reset() - return p.pushBytes(string(tenantBytes), req, sectionStartTime) + return p.pushBytes(string(tenantBytes), req) } func (b *BlockBuilder) getAssignedActivePartitions() []int32 { diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index c18c2f367f6..3babeeb47a0 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -18,7 +18,7 @@ import ( ) type partitionSectionWriter interface { - pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error + pushBytes(tenant string, req *tempopb.PushBytesRequest) error flush(ctx context.Context, store tempodb.Writer) error } @@ -27,6 +27,7 @@ type writer struct { blockCfg BlockConfig partition, cycleEndTs uint64 + startTime time.Time overrides Overrides wal *wal.WAL @@ -36,11 +37,12 @@ type writer struct { m map[string]*tenantStore } -func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { +func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, startTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { return &writer{ logger: logger, partition: partition, cycleEndTs: cycleEndTs, + startTime: startTime, blockCfg: blockCfg, overrides: overrides, wal: wal, @@ -50,7 +52,7 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, } } -func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error { +func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { level.Debug(p.logger).Log( "msg", "pushing bytes", "tenant", tenant, @@ -68,7 +70,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest, startTi if err := proto.Unmarshal(trace.Slice, tr); err != nil { return fmt.Errorf("failed to unmarshal trace: %w", err) } - if err := i.AppendTrace(req.Ids[j], tr, startTime); err != nil { + if err := i.AppendTrace(req.Ids[j], tr, p.startTime); err != nil { return err } } diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go index e8c01205aa7..2ff3fb251b4 100644 --- a/modules/blockbuilder/partition_writer_test.go +++ b/modules/blockbuilder/partition_writer_test.go @@ -15,7 +15,8 @@ import ( func getPartitionWriter(t *testing.T) *writer { logger := log.NewNopLogger() - cycleEndTs := uint64(time.Now().Unix()) + cycleEndTs := uint64(time.Now().Add(2 * time.Minute).Unix()) + startTime := time.Now() blockCfg := BlockConfig{} tmpDir := t.TempDir() w, err := wal.New(&wal.Config{ @@ -26,7 +27,7 @@ func getPartitionWriter(t *testing.T) *writer { }) require.NoError(t, err) - return newPartitionSectionWriter(logger, 1, cycleEndTs, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) + return newPartitionSectionWriter(logger, 1, cycleEndTs, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) } func TestPushBytes(t *testing.T) { @@ -39,7 +40,7 @@ func TestPushBytes(t *testing.T) { endTime := uint64(now.Add(time.Second).UnixNano()) req := test.MakePushBytesRequest(t, 1, traceID, startTime, endTime) - err := pw.pushBytes(tenant, req, now) + err := pw.pushBytes(tenant, req) require.NoError(t, err) } @@ -53,7 +54,7 @@ func TestPushBytes_UnmarshalError(t *testing.T) { Traces: []tempopb.PreallocBytes{{Slice: []byte{1, 2}}}, } - err := pw.pushBytes(tenant, req, time.Now()) + err := pw.pushBytes(tenant, req) require.Error(t, err) require.Contains(t, err.Error(), "failed to unmarshal trace") } From 5b73700e88b45985b7aa013c02386d0062c980ba Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 19 Dec 2024 13:25:30 +0100 Subject: [PATCH 4/7] open slack ingestion by using the end cycle time --- modules/blockbuilder/blockbuilder.go | 2 +- modules/blockbuilder/partition_writer.go | 32 +++++------ modules/blockbuilder/partition_writer_test.go | 4 +- modules/blockbuilder/tenant_store.go | 14 ++--- modules/blockbuilder/tenant_store_test.go | 56 ++++++++++++++++++- 5 files changed, 81 insertions(+), 27 deletions(-) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 3ed1eab1782..5ec50151531 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in }(time.Now()) // TODO - Review what endTimestamp is used here - writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + writer := newPartitionSectionWriter(b.logger, uint64(partition), sectionEndTime, sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index 3babeeb47a0..70a2e228a4f 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -25,9 +25,9 @@ type partitionSectionWriter interface { type writer struct { logger log.Logger - blockCfg BlockConfig - partition, cycleEndTs uint64 - startTime time.Time + blockCfg BlockConfig + partition uint64 + startSectionTime, endSectionTime time.Time overrides Overrides wal *wal.WAL @@ -37,18 +37,18 @@ type writer struct { m map[string]*tenantStore } -func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs uint64, startTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { +func newPartitionSectionWriter(logger log.Logger, partition uint64, endSectionTime, startSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { return &writer{ - logger: logger, - partition: partition, - cycleEndTs: cycleEndTs, - startTime: startTime, - blockCfg: blockCfg, - overrides: overrides, - wal: wal, - enc: enc, - mtx: sync.Mutex{}, - m: make(map[string]*tenantStore), + logger: logger, + partition: partition, + endSectionTime: endSectionTime, + startSectionTime: startSectionTime, + blockCfg: blockCfg, + overrides: overrides, + wal: wal, + enc: enc, + mtx: sync.Mutex{}, + m: make(map[string]*tenantStore), } } @@ -70,7 +70,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { if err := proto.Unmarshal(trace.Slice, tr); err != nil { return fmt.Errorf("failed to unmarshal trace: %w", err) } - if err := i.AppendTrace(req.Ids[j], tr, p.startTime); err != nil { + if err := i.AppendTrace(req.Ids[j], tr, p.startSectionTime, p.endSectionTime); err != nil { return err } } @@ -97,7 +97,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) { return i, nil } - i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides) + i, err := newTenantStore(tenant, p.partition, uint64(p.endSectionTime.UnixMilli()), p.blockCfg, p.logger, p.wal, p.enc, p.overrides) if err != nil { return nil, err } diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go index 2ff3fb251b4..f0c020b8d46 100644 --- a/modules/blockbuilder/partition_writer_test.go +++ b/modules/blockbuilder/partition_writer_test.go @@ -15,7 +15,7 @@ import ( func getPartitionWriter(t *testing.T) *writer { logger := log.NewNopLogger() - cycleEndTs := uint64(time.Now().Add(2 * time.Minute).Unix()) + endTime := time.Now().Add(2 * time.Minute) startTime := time.Now() blockCfg := BlockConfig{} tmpDir := t.TempDir() @@ -27,7 +27,7 @@ func getPartitionWriter(t *testing.T) *writer { }) require.NoError(t, err) - return newPartitionSectionWriter(logger, 1, cycleEndTs, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) + return newPartitionSectionWriter(logger, 1, endTime, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) } func TestPushBytes(t *testing.T) { diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index a7be3d224cc..f1d76bd7570 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -106,13 +106,13 @@ func (s *tenantStore) resetHeadBlock() error { return nil } -func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, startTime time.Time) error { +func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, startCicleTime, endCicleTime time.Time) error { // TODO - Do this async, it slows down consumption if err := s.cutHeadBlock(false); err != nil { return err } start, end := getTraceTimeRange(tr) - start, end = s.adjustTimeRangeForSlack(startTime, start, end) + start, end = s.adjustTimeRangeForSlack(startCicleTime, endCicleTime, start, end) return s.headBlock.AppendTrace(traceID, tr, uint32(start), uint32(end)) } @@ -136,18 +136,18 @@ func getTraceTimeRange(tr *tempopb.Trace) (startSeconds uint64, endSeconds uint6 return } -func (s *tenantStore) adjustTimeRangeForSlack(startTime time.Time, start, end uint64) (uint64, uint64) { - startOfRange := uint64(startTime.Add(-s.headBlock.IngestionSlack()).Unix()) - endOfRange := uint64(startTime.Add(s.headBlock.IngestionSlack()).Unix()) +func (s *tenantStore) adjustTimeRangeForSlack(startCicleTime, endCicleTime time.Time, start, end uint64) (uint64, uint64) { + startOfRange := uint64(startCicleTime.Add(-s.headBlock.IngestionSlack()).Unix()) + endOfRange := uint64(endCicleTime.Add(s.headBlock.IngestionSlack()).Unix()) warn := false if start < startOfRange { warn = true - start = uint64(startTime.Unix()) + start = uint64(startCicleTime.Unix()) } if end > endOfRange || end < start { warn = true - end = uint64(startTime.Unix()) + end = uint64(endCicleTime.Unix()) } if warn { diff --git a/modules/blockbuilder/tenant_store_test.go b/modules/blockbuilder/tenant_store_test.go index 70d2282a3a9..7c66d201b03 100644 --- a/modules/blockbuilder/tenant_store_test.go +++ b/modules/blockbuilder/tenant_store_test.go @@ -37,10 +37,64 @@ func TestAppendTraceHonorCycleTime(t *testing.T) { end := time.Now().Add(-1 * time.Hour) trace := test.MakeTraceWithTimeRange(1, traceID, uint64(start.UnixNano()), uint64(end.UnixNano())) startTime := time.Now().Add(-1 * time.Hour) + endTime := startTime.Add(5 * time.Minute) - err = store.AppendTrace(traceID, trace, startTime) + err = store.AppendTrace(traceID, trace, startTime, endTime) require.NoError(t, err) assert.Equal(t, start.Unix(), store.headBlock.BlockMeta().StartTime.Unix()) assert.Equal(t, end.Unix(), store.headBlock.BlockMeta().EndTime.Unix()) } + +func TestAdjustTimeRangeForSlack(t *testing.T) { + store, err := getTenantStore(t) + require.NoError(t, err) + + startCycleTime := time.Now() + endCycleTime := startCycleTime.Add(10 * time.Minute) + + tests := []struct { + name string + start uint64 + end uint64 + expectedStart uint64 + expectedEnd uint64 + }{ + { + name: "within slack range", + start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + end: uint64(endCycleTime.Add(2 * time.Minute).Unix()), + expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + expectedEnd: uint64(endCycleTime.Add(2 * time.Minute).Unix()), + }, + { + name: "start before slack range", + start: uint64(startCycleTime.Add(-10 * time.Minute).Unix()), + end: uint64(endCycleTime.Add(2 * time.Minute).Unix()), + expectedStart: uint64(startCycleTime.Unix()), + expectedEnd: uint64(endCycleTime.Add(2 * time.Minute).Unix()), + }, + { + name: "end after slack range", + start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + end: uint64(endCycleTime.Add(20 * time.Minute).Unix()), + expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + expectedEnd: uint64(endCycleTime.Unix()), + }, + { + name: "end before start", + start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + end: uint64(startCycleTime.Add(-3 * time.Minute).Unix()), + expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()), + expectedEnd: uint64(endCycleTime.Unix()), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + start, end := store.adjustTimeRangeForSlack(startCycleTime, endCycleTime, tt.start, tt.end) + assert.Equal(t, tt.expectedStart, start) + assert.Equal(t, tt.expectedEnd, end) + }) + } +} From a8bd98ab26077e7e3435cdd94247f399fdc26609 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 19 Dec 2024 16:22:15 +0100 Subject: [PATCH 5/7] fix test --- modules/blockbuilder/tenant_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/blockbuilder/tenant_store_test.go b/modules/blockbuilder/tenant_store_test.go index 7c66d201b03..8319ec3a616 100644 --- a/modules/blockbuilder/tenant_store_test.go +++ b/modules/blockbuilder/tenant_store_test.go @@ -15,7 +15,7 @@ import ( func getTenantStore(t *testing.T) (*tenantStore, error) { logger := log.NewNopLogger() - cycleEndTs := uint64(time.Now().Unix()) + cycleEndTs := time.Now().Unix() blockCfg := BlockConfig{} tmpDir := t.TempDir() w, err := wal.New(&wal.Config{ From f3d66cbc455208a92ad513a0a902543177afe69e Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 19 Dec 2024 16:35:12 +0100 Subject: [PATCH 6/7] fix lint --- modules/blockbuilder/partition_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index c207d3e8981..a087b2e03b9 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -97,7 +97,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) { return i, nil } - i, err := newTenantStore(tenant, p.partition, int64(p.endSectionTime.UnixMilli()), p.blockCfg, p.logger, p.wal, p.enc, p.overrides) + i, err := newTenantStore(tenant, p.partition, p.endSectionTime.UnixMilli(), p.blockCfg, p.logger, p.wal, p.enc, p.overrides) if err != nil { return nil, err } From e5c6c051406a5b07139ac0519863447518efb161 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Fri, 20 Dec 2024 15:15:16 +0100 Subject: [PATCH 7/7] propagate changes to old parquet encondings --- modules/blockbuilder/blockbuilder.go | 2 +- modules/blockbuilder/partition_writer.go | 2 +- modules/blockbuilder/partition_writer_test.go | 2 +- tempodb/encoding/vparquet2/wal_block.go | 4 +--- tempodb/encoding/vparquet3/wal_block.go | 4 +--- 5 files changed, 5 insertions(+), 9 deletions(-) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index fcf2e4ba310..53718345868 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in }(time.Now()) // TODO - Review what endTimestamp is used here - writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime, sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + writer := newPartitionSectionWriter(b.logger, int64(partition), sectionStartTime, sectionEndTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index a087b2e03b9..6941fe8d9c6 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -37,7 +37,7 @@ type writer struct { m map[string]*tenantStore } -func newPartitionSectionWriter(logger log.Logger, partition int64, endSectionTime, startSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { +func newPartitionSectionWriter(logger log.Logger, partition int64, startSectionTime, endSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { return &writer{ logger: logger, partition: partition, diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go index f0c020b8d46..d3ff0fa77c7 100644 --- a/modules/blockbuilder/partition_writer_test.go +++ b/modules/blockbuilder/partition_writer_test.go @@ -27,7 +27,7 @@ func getPartitionWriter(t *testing.T) *writer { }) require.NoError(t, err) - return newPartitionSectionWriter(logger, 1, endTime, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) + return newPartitionSectionWriter(logger, 1, startTime, endTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) } func TestPushBytes(t *testing.T) { diff --git a/tempodb/encoding/vparquet2/wal_block.go b/tempodb/encoding/vparquet2/wal_block.go index 532094d4d60..a9b0a21931e 100644 --- a/tempodb/encoding/vparquet2/wal_block.go +++ b/tempodb/encoding/vparquet2/wal_block.go @@ -321,15 +321,13 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { if err != nil { return fmt.Errorf("error preparing trace for read: %w", err) } - + start, end = b.adjustTimeRangeForSlack(start, end) return b.AppendTrace(id, trace, start, end) } func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32) error { b.buffer = traceToParquet(id, trace, b.buffer) - start, end = b.adjustTimeRangeForSlack(start, end) - // add to current _, err := b.writer.Write([]*Trace{b.buffer}) if err != nil { diff --git a/tempodb/encoding/vparquet3/wal_block.go b/tempodb/encoding/vparquet3/wal_block.go index 2168ae218d2..5d91fead259 100644 --- a/tempodb/encoding/vparquet3/wal_block.go +++ b/tempodb/encoding/vparquet3/wal_block.go @@ -325,7 +325,7 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { if err != nil { return fmt.Errorf("error preparing trace for read: %w", err) } - + start, end = b.adjustTimeRangeForSlack(start, end) return b.AppendTrace(id, trace, start, end) } @@ -339,8 +339,6 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) } - start, end = b.adjustTimeRangeForSlack(start, end) - // add to current _, err := b.writer.Write([]*Trace{b.buffer}) if err != nil {