Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rythm: fix ingestion slack time range #4459

Open
wants to merge 9 commits into
base: main-rhythm
Choose a base branch
from
10 changes: 5 additions & 5 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa

// Continue consuming in sections until we're caught up.
for !sectionEndTime.After(cycleEndTime) {
newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag)
newCommitAt, err := b.consumePartitionSection(ctx, partition, commitRecTs, sectionEndTime, partitionLag)
if err != nil {
return fmt.Errorf("failed to consume partition section: %w", err)
}
Expand All @@ -281,7 +281,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa
return nil
}

func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) {
func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionStartTime time.Time, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) {
level.Info(b.logger).Log(
"msg", "consuming partition section",
"partition", partition,
Expand Down Expand Up @@ -353,7 +353,7 @@ consumerLoop:
break consumerLoop
}

err := b.pushTraces(rec.Key, rec.Value, writer) // TODO - Batch pushes by tenant
err := b.pushTraces(rec.Key, rec.Value, writer, sectionStartTime) // TODO - Batch pushes by tenant
if err != nil {
// All "non-terminal" errors are handled by the TSDBBuilder.
return lag.Commit.At, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err)
Expand Down Expand Up @@ -403,14 +403,14 @@ func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) erro
return nil
}

func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error {
func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter, sectionStartTime time.Time) error {
req, err := b.decoder.Decode(reqBytes)
if err != nil {
return fmt.Errorf("failed to decode trace: %w", err)
}
defer b.decoder.Reset()

return p.pushBytes(string(tenantBytes), req)
return p.pushBytes(string(tenantBytes), req, sectionStartTime)
}

func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
Expand Down
6 changes: 4 additions & 2 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,10 @@ func countFlushedTraces(store storage.Store) int {

func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record {
traceID := generateTraceID(t)

req := test.MakePushBytesRequest(t, 10, traceID)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 10, traceID, startTime, endTime)
records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000)
require.NoError(t, err)

Expand Down
24 changes: 3 additions & 21 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type partitionSectionWriter interface {
pushBytes(tenant string, req *tempopb.PushBytesRequest) error
pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
flush(ctx context.Context, store tempodb.Writer) error
}

Expand Down Expand Up @@ -50,7 +50,7 @@ func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, b
}
}

func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest, startTime time.Time) error {
level.Debug(p.logger).Log(
"msg", "pushing bytes",
"tenant", tenant,
Expand All @@ -68,25 +68,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
if err := proto.Unmarshal(trace.Slice, tr); err != nil {
return fmt.Errorf("failed to unmarshal trace: %w", err)
}

var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

if err := i.AppendTrace(req.Ids[j], tr, startSeconds, endSeconds); err != nil {
if err := i.AppendTrace(req.Ids[j], tr, startTime); err != nil {
return err
}
}
Expand Down
59 changes: 59 additions & 0 deletions modules/blockbuilder/partition_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

func getPartitionWriter(t *testing.T) *writer {
logger := log.NewNopLogger()
cycleEndTs := time.Now().Unix()
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)

return newPartitionSectionWriter(logger, 1, cycleEndTs, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
}

func TestPushBytes(t *testing.T) {
pw := getPartitionWriter(t)

tenant := "test-tenant"
traceID := generateTraceID(t)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 1, traceID, startTime, endTime)

err := pw.pushBytes(tenant, req, now)
require.NoError(t, err)
}

func TestPushBytes_UnmarshalError(t *testing.T) {
pw := getPartitionWriter(t)

tenant := "test-tenant"
traceID := []byte{1, 2, 3, 4}
req := &tempopb.PushBytesRequest{
Ids: [][]byte{traceID},
Traces: []tempopb.PreallocBytes{{Slice: []byte{1, 2}}},
}

err := pw.pushBytes(tenant, req, time.Now())
require.Error(t, err)
require.Contains(t, err.Error(), "failed to unmarshal trace")
}
48 changes: 46 additions & 2 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package blockbuilder
import (
"context"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/pkg/dataquality"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -104,13 +106,55 @@ func (s *tenantStore) resetHeadBlock() error {
return nil
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error {
func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, startTime time.Time) error {
// TODO - Do this async, it slows down consumption
if err := s.cutHeadBlock(false); err != nil {
return err
}
start, end := getTraceTimeRange(tr)
start, end = s.adjustTimeRangeForSlack(startTime, start, end)

return s.headBlock.AppendTrace(traceID, tr, start, end)
return s.headBlock.AppendTrace(traceID, tr, uint32(start), uint32(end))
}

func getTraceTimeRange(tr *tempopb.Trace) (startSeconds uint64, endSeconds uint64) {
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if startSeconds == 0 || s.StartTimeUnixNano < startSeconds {
startSeconds = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > endSeconds {
endSeconds = s.EndTimeUnixNano
}
}
}
}
startSeconds = startSeconds / uint64(time.Second)
endSeconds = endSeconds / uint64(time.Second)

return
}

func (s *tenantStore) adjustTimeRangeForSlack(startTime time.Time, start, end uint64) (uint64, uint64) {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
startOfRange := uint64(startTime.Add(-s.headBlock.IngestionSlack()).Unix())
endOfRange := uint64(startTime.Add(s.headBlock.IngestionSlack()).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint64(startTime.Unix())
}
if end > endOfRange || end < start {
warn = true
end = uint64(startTime.Unix())
}

if warn {
dataquality.WarnOutsideIngestionSlack(s.headBlock.BlockMeta().TenantID)
}

return start, end
}

func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
Expand Down
46 changes: 46 additions & 0 deletions modules/blockbuilder/tenant_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func getTenantStore(t *testing.T) (*tenantStore, error) {
logger := log.NewNopLogger()
cycleEndTs := time.Now().Unix()
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)
return newTenantStore("test-tenant", 1, cycleEndTs, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{})
}

func TestAppendTraceHonorCycleTime(t *testing.T) {
store, err := getTenantStore(t)
require.NoError(t, err)

traceID := []byte("test-trace-id")
start := time.Now().Add(-1 * time.Hour)
end := time.Now().Add(-1 * time.Hour)
trace := test.MakeTraceWithTimeRange(1, traceID, uint64(start.UnixNano()), uint64(end.UnixNano()))
startTime := time.Now().Add(-1 * time.Hour)

err = store.AppendTrace(traceID, trace, startTime)
require.NoError(t, err)

assert.Equal(t, start.Unix(), store.headBlock.BlockMeta().StartTime.Unix())
assert.Equal(t, end.Unix(), store.headBlock.BlockMeta().EndTime.Unix())
}
25 changes: 25 additions & 0 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/golang/groupcache/lru"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/dataquality"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -662,6 +663,8 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

startSeconds, endSeconds = p.adjustTimeRangeForSlack(startSeconds, endSeconds)

err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds)
if err != nil {
return err
Expand All @@ -670,6 +673,28 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
return nil
}

func (p *Processor) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(-now.Add(p.headBlock.IngestionSlack()).Unix())
endOfRange := uint32(now.Add(p.headBlock.IngestionSlack()).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint32(now.Unix())
}
if end > endOfRange || end < start {
warn = true
end = uint32(now.Unix())
}

if warn {
dataquality.WarnOutsideIngestionSlack(p.headBlock.BlockMeta().TenantID)
}

return start, end
}

func (p *Processor) resetHeadBlock() error {
meta := &backend.BlockMeta{
BlockID: backend.NewUUID(),
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ func makeBatchWithMaxBytes(maxBytes int, traceID []byte) *v1_trace.ResourceSpans
batch := test.MakeBatch(1, traceID)

for batch.Size() < maxBytes {
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0))
batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpan(traceID))
}

return batch
Expand Down
Loading
Loading