Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions app/vtinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,10 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field,
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
)
lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)

// create an entity in trace-id-idx stream, if this trace_id hasn't been seen before.
// Create an entry in the trace-id-idx stream if this trace_id hasn't been seen before.
// The index entry must be written first to ensure that an index always exists for the data.
// During querying, if no index is found, the data must not exist.
if !traceIDCache.Has([]byte(span.TraceID)) {
lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{
{Name: "_msg", Value: msgFieldValue},
Expand All @@ -258,6 +259,9 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field,
}, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}})
traceIDCache.Set([]byte(span.TraceID), nil)
}

lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)

return fields
}

Expand Down
6 changes: 6 additions & 0 deletions app/vtselect/internalselect/internalselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internalselect

import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage"
vtstoragecommon "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netselect"
)

Expand Down Expand Up @@ -125,6 +127,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
defer cp.UpdatePerQueryStatsMetrics()

if err := vtstorage.RunQuery(qctx, writeBlock); err != nil {
if errors.Is(err, vtstoragecommon.ErrOutOfRetention) {
w.Header().Set(vtstoragecommon.OutOfRetentionHeaderName, "true")
return nil
}
return err
}
if errGlobal != nil {
Expand Down
77 changes: 21 additions & 56 deletions app/vtselect/traces/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/cespare/xxhash/v2"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage"
vtstoragecommon "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common"
otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
)

Expand Down Expand Up @@ -163,17 +165,17 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er
}
q.AddPipeOffsetLimit(0, 1)
traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp)
if err != nil && errors.Is(err, vtstoragecommon.ErrOutOfRetention) {
// no hit in the retention period, simply returns empty.
return nil, nil
}
if err != nil {
// something wrong when trying to find the trace_id's start time.
return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err)
}

// fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range.
if !traceTimestamp.IsZero() {
return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow))
}
// slow path: if trace start time not exist, probably the root span was not available.
// try to search from now to 0 timestamp.
return findSpansByTraceID(ctx, cp, traceID)
// trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range.
return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow))
}

// GetTraceList returns multiple traceIDs and spans of them in []*Row format.
Expand Down Expand Up @@ -346,6 +348,9 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co
qClone := q.CloneWithTimeFilter(currentTime.UnixNano(), currentStartTime.UnixNano(), endTime.UnixNano())
qctx = qctx.WithQuery(qClone)
if err := vtstorage.RunQuery(qctx, writeBlock); err != nil {
if errors.Is(err, vtstoragecommon.ErrOutOfRetention) {
return nil, time.Time{}, nil
}
return nil, time.Time{}, err
}

Expand All @@ -355,7 +360,7 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co
if err != nil {
return nil, maxStartTime, err
}
return traceIDList, maxStartTime, nil
return checkTraceIDList(traceIDList), maxStartTime, nil
}

// not enough trace_id, clear the result, extend the time range and try again.
Expand Down Expand Up @@ -384,8 +389,10 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co
}

// findTraceIDTimeSplitTimeRange try to search from {trace_id_idx_stream="xx"} stream, which contains
// the trace_id and start time of the root span. It returns the start time of the trace if found.
// Otherwise, the root span may not reach VictoriaTraces, and zero time is returned.
// the trace_id and the rough start time of this trace. It returns the start time of the trace if found.
//
// If the span with this trace_id never reach VictoriaTraces, the search will to through the whole time range within
// the retention period, and returns an ErrOutOfRetention.
func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *CommonParams) (time.Time, error) {
traceIDStartTimeInt := int64(0)
var missingTimeColumn atomic.Bool
Expand Down Expand Up @@ -420,13 +427,15 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp
currentTime := time.Now()
startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
for startTime.UnixNano() > 0 {
qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano())
qctx = qctx.WithQuery(qq)

if err := vtstorage.RunQuery(qctx, writeBlock); err != nil {
// this could be either a ErrOutOfRetention, or a real error.
return time.Time{}, err
}

if missingTimeColumn.Load() {
return time.Time{}, fmt.Errorf("missing _time column in the result for the query [%s]", qq)
}
Expand All @@ -441,50 +450,7 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp
// found result, perform extra search for traceMaxDurationWindow and then break.
return time.Unix(traceIDStartTimeInt/1e9, traceIDStartTimeInt%1e9), nil
}

return time.Time{}, nil
}

// findSpansByTraceID searches for spans from now to 0 time with steps.
// In order to avoid scanning all data blocks, search is performed on time range splitting by traceSearchStep.
// Once a trace is found, it assumes other spans will exist on the same time range, and only search this
// time range (with traceMaxDurationWindow).
//
// e.g.
// 1. find traces span on [now-traceSearchStep, now], no hit.
// 2. find traces span on [now-2 * traceSearchStep, now - traceSearchStep], hit.
// 3. make sure to include all the spans by an additional search on: [now-2 * traceSearchStep-traceMaxDurationWindow, now-2 * traceSearchStep].
// 4. skip [0, now-2 * traceSearchStep-traceMaxDurationWindow] and return.
func findSpansByTraceID(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) {
// query: trace_id:traceID
currentTime := time.Now()
startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
var (
rows []*Row
err error
)
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
rows, err = findSpansByTraceIDAndTime(ctx, cp, traceID, startTime, endTime)
if err != nil {
return nil, err
}
// no hit in this time range, continue with step.
if len(rows) == 0 {
endTime = startTime
startTime = startTime.Add(-*traceSearchStep)
continue
}

// found result, perform extra search for traceMaxDurationWindow and then break.
extraRows, err := findSpansByTraceIDAndTime(ctx, cp, traceID, startTime.Add(-*traceMaxDurationWindow), startTime)
if err != nil {
return nil, err
}
rows = append(rows, extraRows...)
break
}
return rows, nil
return time.Time{}, vtstoragecommon.ErrOutOfRetention
}

// findSpansByTraceIDAndTime search for spans in given time range.
Expand All @@ -495,7 +461,6 @@ func findSpansByTraceIDAndTime(ctx context.Context, cp *CommonParams, traceID st
if err != nil {
return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
}

ctxWithCancel, cancel := context.WithCancel(ctx)
cp.Query = q
qctx := cp.NewQueryContext(ctxWithCancel)
Expand Down
13 changes: 13 additions & 0 deletions app/vtstorage/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import "errors"

const (
// OutOfRetentionHeaderName header is for communication between vtstorage and vtselect, to notify
// vtselect that the query time range is completely out of the retention period.
OutOfRetentionHeaderName = "VT-Out-Of-Retention"
)

var (
ErrOutOfRetention = errors.New("request time out of retention")
)
12 changes: 12 additions & 0 deletions app/vtstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"time"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netinsert"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netselect"
)
Expand Down Expand Up @@ -419,6 +421,12 @@ func (*Storage) MustAddRows(lr *logstorage.LogRows) {
}
}

// isTimestampOutOfRetention check if nanosecond timestamp is earlier than retention.
func isTimestampOutOfRetention(timestamp int64) bool {
minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionPeriod.Milliseconds()
return timestamp/1000000 < minAllowedTimestamp
}

// RunQuery runs the given qctx and calls writeBlock for the returned data blocks
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
qOpt, offset, limit := qctx.Query.GetLastNResultsQuery()
Expand All @@ -428,6 +436,10 @@ func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBloc
}

if localStorage != nil {
_, endTimestamp := qctx.Query.GetFilterTimeRange()
if isTimestampOutOfRetention(endTimestamp) {
return common.ErrOutOfRetention
}
return localStorage.RunQuery(qctx, writeBlock)
}
return netstorageSelect.RunQuery(qctx, writeBlock)
Expand Down
46 changes: 46 additions & 0 deletions app/vtstorage/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package vtstorage

import (
"context"
"errors"
"testing"
"time"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common"
)

func TestRunQueryOutOfRetention(t *testing.T) {
// Create the test storage
storagePath := t.Name()
cfg := &logstorage.StorageConfig{
Retention: 7 * 24 * time.Hour,
}
localStorage = logstorage.MustOpenStorage(storagePath, cfg)
defer func() {
// Close and delete the test storage
localStorage.MustClose()
fs.MustRemoveDir(storagePath)
}()

query, _ := logstorage.ParseQuery("*")
// add a time filter which within the default retention period (7d).
query.AddTimeFilter(0, time.Now().Add(-retentionPeriod.Duration()+5*time.Second).UnixNano())

// the query should be executed with empty result.
qctx := logstorage.NewQueryContext(context.TODO(), &logstorage.QueryStats{}, []logstorage.TenantID{}, query, false)
if err := RunQuery(qctx, func(workerID uint, db *logstorage.DataBlock) {}); err != nil {
t.Fatalf("RunQuery returns error for correct query")
}

// add a time filter which obviously out of the default retention period (7d).
query.AddTimeFilter(0, time.Now().Add(-retentionPeriod.Duration()-10*time.Second).UnixNano())

// the query should stop with ErrOutOfRetention error
qctx = logstorage.NewQueryContext(context.TODO(), &logstorage.QueryStats{}, []logstorage.TenantID{}, query, false)
if !errors.Is(RunQuery(qctx, nil), common.ErrOutOfRetention) {
t.Fatalf("RunQuery fail to returns ErrOutOfRetention for query which with too small endTimestamp")
}
}
8 changes: 8 additions & 0 deletions app/vtstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common"
)

const (
Expand Down Expand Up @@ -292,6 +294,12 @@ func (sn *storageNode) getResponseBodyForPathAndArgs(ctx context.Context, path s
}
}

if resp.Header.Get(common.OutOfRetentionHeaderName) != "" {
// the netstorage will set this header only when the request time range
// is completely out of the retention period.
return nil, "", common.ErrOutOfRetention
}

if resp.StatusCode != http.StatusOK {
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions docs/victoriatraces/changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The following `tip` changes can be tested by building VictoriaTraces components
* [How to build single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/#how-to-build-from-sources)

## tip
* BUGFIX: [Single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) and [VictoriaTraces cluster](https://docs.victoriametrics.com/victoriatraces/cluster/): stop query at the earlier timestamp of the retention period when searching by a non-existed trace ID, and response earlier. See [#48](https://github.com/VictoriaMetrics/VictoriaTraces/issues/48) for details. Thank @JayiceZ for [the pull request](https://github.com/VictoriaMetrics/VictoriaTraces/pull/49).

## [v0.4.0](https://github.com/VictoriaMetrics/VictoriaTraces/releases/tag/v0.4.0)

Expand Down
1 change: 0 additions & 1 deletion docs/victoriatraces/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ aliases:

The following items need to be completed before general availability (GA) version:
- [ ] Finalize the data structure and commit to backward compatibility.
- [ ] Finalize the data distribution algorithm in the cluster version.

The following functionality is planned in the future versions of VictoriaTraces after GA:
- [ ] Provide web UI to visualize traces.
Expand Down