Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rythm: fix ingestion slack time range #4459

Open
wants to merge 9 commits into
base: main-rhythm
Choose a base branch
from
6 changes: 3 additions & 3 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa

// Continue consuming in sections until we're caught up.
for !sectionEndTime.After(cycleEndTime) {
newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag)
newCommitAt, err := b.consumePartitionSection(ctx, partition, commitRecTs, sectionEndTime, partitionLag)
if err != nil {
return fmt.Errorf("failed to consume partition section: %w", err)
}
Expand All @@ -281,7 +281,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, pa
return nil
}

func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) {
func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionStartTime time.Time, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) {
level.Info(b.logger).Log(
"msg", "consuming partition section",
"partition", partition,
Expand All @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
}(time.Now())

// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime, sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
6 changes: 4 additions & 2 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,10 @@ func countFlushedTraces(store storage.Store) int {

func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record {
traceID := generateTraceID(t)

req := test.MakePushBytesRequest(t, 10, traceID)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 10, traceID, startTime, endTime)
records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000)
require.NoError(t, err)

Expand Down
48 changes: 16 additions & 32 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type partitionSectionWriter interface {
type writer struct {
logger log.Logger

blockCfg BlockConfig
partition, cycleEndTs int64
blockCfg BlockConfig
partition int64
startSectionTime, endSectionTime time.Time

overrides Overrides
wal *wal.WAL
Expand All @@ -36,17 +37,18 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition int64, endSectionTime, startSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
cycleEndTs: cycleEndTs,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
logger: logger,
partition: partition,
endSectionTime: endSectionTime,
startSectionTime: startSectionTime,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
}
}

Expand All @@ -68,25 +70,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
if err := proto.Unmarshal(trace.Slice, tr); err != nil {
return fmt.Errorf("failed to unmarshal trace: %w", err)
}

var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

if err := i.AppendTrace(req.Ids[j], tr, startSeconds, endSeconds); err != nil {
if err := i.AppendTrace(req.Ids[j], tr, p.startSectionTime, p.endSectionTime); err != nil {
return err
}
}
Expand All @@ -113,7 +97,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
return i, nil
}

i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.partition, p.endSectionTime.UnixMilli(), p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
60 changes: 60 additions & 0 deletions modules/blockbuilder/partition_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

func getPartitionWriter(t *testing.T) *writer {
logger := log.NewNopLogger()
endTime := time.Now().Add(2 * time.Minute)
startTime := time.Now()
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)

return newPartitionSectionWriter(logger, 1, endTime, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
}

func TestPushBytes(t *testing.T) {
pw := getPartitionWriter(t)

tenant := "test-tenant"
traceID := generateTraceID(t)
now := time.Now()
startTime := uint64(now.UnixNano())
endTime := uint64(now.Add(time.Second).UnixNano())
req := test.MakePushBytesRequest(t, 1, traceID, startTime, endTime)

err := pw.pushBytes(tenant, req)
require.NoError(t, err)
}

func TestPushBytes_UnmarshalError(t *testing.T) {
pw := getPartitionWriter(t)

tenant := "test-tenant"
traceID := []byte{1, 2, 3, 4}
req := &tempopb.PushBytesRequest{
Ids: [][]byte{traceID},
Traces: []tempopb.PreallocBytes{{Slice: []byte{1, 2}}},
}

err := pw.pushBytes(tenant, req)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to unmarshal trace")
}
48 changes: 46 additions & 2 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package blockbuilder
import (
"context"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/pkg/dataquality"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -104,13 +106,55 @@ func (s *tenantStore) resetHeadBlock() error {
return nil
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error {
func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, startCicleTime, endCicleTime time.Time) error {
// TODO - Do this async, it slows down consumption
if err := s.cutHeadBlock(false); err != nil {
return err
}
start, end := getTraceTimeRange(tr)
start, end = s.adjustTimeRangeForSlack(startCicleTime, endCicleTime, start, end)

return s.headBlock.AppendTrace(traceID, tr, start, end)
return s.headBlock.AppendTrace(traceID, tr, uint32(start), uint32(end))
}

func getTraceTimeRange(tr *tempopb.Trace) (startSeconds uint64, endSeconds uint64) {
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if startSeconds == 0 || s.StartTimeUnixNano < startSeconds {
startSeconds = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > endSeconds {
endSeconds = s.EndTimeUnixNano
}
}
}
}
startSeconds = startSeconds / uint64(time.Second)
endSeconds = endSeconds / uint64(time.Second)

return
}

func (s *tenantStore) adjustTimeRangeForSlack(startCicleTime, endCicleTime time.Time, start, end uint64) (uint64, uint64) {
startOfRange := uint64(startCicleTime.Add(-s.headBlock.IngestionSlack()).Unix())
endOfRange := uint64(endCicleTime.Add(s.headBlock.IngestionSlack()).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint64(startCicleTime.Unix())
}
if end > endOfRange || end < start {
warn = true
end = uint64(endCicleTime.Unix())
}

if warn {
dataquality.WarnOutsideIngestionSlack(s.headBlock.BlockMeta().TenantID)
}

return start, end
}

func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
Expand Down
100 changes: 100 additions & 0 deletions modules/blockbuilder/tenant_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package blockbuilder

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func getTenantStore(t *testing.T) (*tenantStore, error) {
logger := log.NewNopLogger()
cycleEndTs := time.Now().Unix()
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
IngestionSlack: 3 * time.Minute,
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)
return newTenantStore("test-tenant", 1, cycleEndTs, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{})
}

func TestAppendTraceHonorCycleTime(t *testing.T) {
store, err := getTenantStore(t)
require.NoError(t, err)

traceID := []byte("test-trace-id")
start := time.Now().Add(-1 * time.Hour)
end := time.Now().Add(-1 * time.Hour)
trace := test.MakeTraceWithTimeRange(1, traceID, uint64(start.UnixNano()), uint64(end.UnixNano()))
startTime := time.Now().Add(-1 * time.Hour)
endTime := startTime.Add(5 * time.Minute)

err = store.AppendTrace(traceID, trace, startTime, endTime)
require.NoError(t, err)

assert.Equal(t, start.Unix(), store.headBlock.BlockMeta().StartTime.Unix())
assert.Equal(t, end.Unix(), store.headBlock.BlockMeta().EndTime.Unix())
}

func TestAdjustTimeRangeForSlack(t *testing.T) {
store, err := getTenantStore(t)
require.NoError(t, err)

startCycleTime := time.Now()
endCycleTime := startCycleTime.Add(10 * time.Minute)

tests := []struct {
name string
start uint64
end uint64
expectedStart uint64
expectedEnd uint64
}{
{
name: "within slack range",
start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint64(endCycleTime.Add(2 * time.Minute).Unix()),
expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint64(endCycleTime.Add(2 * time.Minute).Unix()),
},
{
name: "start before slack range",
start: uint64(startCycleTime.Add(-10 * time.Minute).Unix()),
end: uint64(endCycleTime.Add(2 * time.Minute).Unix()),
expectedStart: uint64(startCycleTime.Unix()),
expectedEnd: uint64(endCycleTime.Add(2 * time.Minute).Unix()),
},
{
name: "end after slack range",
start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint64(endCycleTime.Add(20 * time.Minute).Unix()),
expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint64(endCycleTime.Unix()),
},
{
name: "end before start",
start: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
end: uint64(startCycleTime.Add(-3 * time.Minute).Unix()),
expectedStart: uint64(startCycleTime.Add(-2 * time.Minute).Unix()),
expectedEnd: uint64(endCycleTime.Unix()),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
start, end := store.adjustTimeRangeForSlack(startCycleTime, endCycleTime, tt.start, tt.end)
assert.Equal(t, tt.expectedStart, start)
assert.Equal(t, tt.expectedEnd, end)
})
}
}
25 changes: 25 additions & 0 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/golang/groupcache/lru"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/dataquality"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -662,6 +663,8 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

startSeconds, endSeconds = p.adjustTimeRangeForSlack(startSeconds, endSeconds)

err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds)
if err != nil {
return err
Expand All @@ -670,6 +673,28 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
return nil
}

func (p *Processor) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(-now.Add(p.headBlock.IngestionSlack()).Unix())
endOfRange := uint32(now.Add(p.headBlock.IngestionSlack()).Unix())

warn := false
if start < startOfRange {
warn = true
start = uint32(now.Unix())
}
if end > endOfRange || end < start {
warn = true
end = uint32(now.Unix())
}

if warn {
dataquality.WarnOutsideIngestionSlack(p.headBlock.BlockMeta().TenantID)
}

return start, end
}

func (p *Processor) resetHeadBlock() error {
meta := &backend.BlockMeta{
BlockID: backend.NewUUID(),
Expand Down
Loading