diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index 31011d483..b105c5637 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -247,9 +247,10 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID}, ) - lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) - // create an entity in trace-id-idx stream, if this trace_id hasn't been seen before. + // Create an entry in the trace-id-idx stream if this trace_id hasn't been seen before. + // The index entry must be written first to ensure that an index always exists for the data. + // During querying, if no index is found, the data must not exist. if !traceIDCache.Has([]byte(span.TraceID)) { lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{ {Name: "_msg", Value: msgFieldValue}, @@ -258,6 +259,9 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, }, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}}) traceIDCache.Set([]byte(span.TraceID), nil) } + + lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) + return fields } diff --git a/app/vtselect/internalselect/internalselect.go b/app/vtselect/internalselect/internalselect.go index 2953e688c..d6508733a 100644 --- a/app/vtselect/internalselect/internalselect.go +++ b/app/vtselect/internalselect/internalselect.go @@ -2,6 +2,7 @@ package internalselect import ( "context" + "errors" "fmt" "net/http" "strconv" @@ -18,6 +19,7 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage" + vtstoragecommon "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netselect" ) @@ -125,6 +127,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req defer cp.UpdatePerQueryStatsMetrics() if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + if errors.Is(err, vtstoragecommon.ErrOutOfRetention) { + w.Header().Set(vtstoragecommon.OutOfRetentionHeaderName, "true") + return nil + } return err } if errGlobal != nil { diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index f766e381a..5117a586c 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -2,6 +2,7 @@ package query import ( "context" + "errors" "flag" "fmt" "net/http" @@ -15,6 +16,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage" + vtstoragecommon "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common" otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" ) @@ -163,17 +165,17 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er } q.AddPipeOffsetLimit(0, 1) traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) + if err != nil && errors.Is(err, vtstoragecommon.ErrOutOfRetention) { + // no hit in the retention period, simply returns empty. + return nil, nil + } if err != nil { + // something wrong when trying to find the trace_id's start time. return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err) } - // fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. - if !traceTimestamp.IsZero() { - return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) - } - // slow path: if trace start time not exist, probably the root span was not available. - // try to search from now to 0 timestamp. - return findSpansByTraceID(ctx, cp, traceID) + // trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. + return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) } // GetTraceList returns multiple traceIDs and spans of them in []*Row format. @@ -346,6 +348,9 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co qClone := q.CloneWithTimeFilter(currentTime.UnixNano(), currentStartTime.UnixNano(), endTime.UnixNano()) qctx = qctx.WithQuery(qClone) if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + if errors.Is(err, vtstoragecommon.ErrOutOfRetention) { + return nil, time.Time{}, nil + } return nil, time.Time{}, err } @@ -355,7 +360,7 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co if err != nil { return nil, maxStartTime, err } - return traceIDList, maxStartTime, nil + return checkTraceIDList(traceIDList), maxStartTime, nil } // not enough trace_id, clear the result, extend the time range and try again. @@ -384,8 +389,10 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co } // findTraceIDTimeSplitTimeRange try to search from {trace_id_idx_stream="xx"} stream, which contains -// the trace_id and start time of the root span. It returns the start time of the trace if found. -// Otherwise, the root span may not reach VictoriaTraces, and zero time is returned. +// the trace_id and the rough start time of this trace. It returns the start time of the trace if found. +// +// If the span with this trace_id never reach VictoriaTraces, the search will to through the whole time range within +// the retention period, and returns an ErrOutOfRetention. func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *CommonParams) (time.Time, error) { traceIDStartTimeInt := int64(0) var missingTimeColumn atomic.Bool @@ -420,13 +427,15 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp currentTime := time.Now() startTime := currentTime.Add(-*traceSearchStep) endTime := currentTime - for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period. + for startTime.UnixNano() > 0 { qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano()) qctx = qctx.WithQuery(qq) if err := vtstorage.RunQuery(qctx, writeBlock); err != nil { + // this could be either a ErrOutOfRetention, or a real error. return time.Time{}, err } + if missingTimeColumn.Load() { return time.Time{}, fmt.Errorf("missing _time column in the result for the query [%s]", qq) } @@ -441,50 +450,7 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp // found result, perform extra search for traceMaxDurationWindow and then break. return time.Unix(traceIDStartTimeInt/1e9, traceIDStartTimeInt%1e9), nil } - - return time.Time{}, nil -} - -// findSpansByTraceID searches for spans from now to 0 time with steps. -// In order to avoid scanning all data blocks, search is performed on time range splitting by traceSearchStep. -// Once a trace is found, it assumes other spans will exist on the same time range, and only search this -// time range (with traceMaxDurationWindow). -// -// e.g. -// 1. find traces span on [now-traceSearchStep, now], no hit. -// 2. find traces span on [now-2 * traceSearchStep, now - traceSearchStep], hit. -// 3. make sure to include all the spans by an additional search on: [now-2 * traceSearchStep-traceMaxDurationWindow, now-2 * traceSearchStep]. -// 4. skip [0, now-2 * traceSearchStep-traceMaxDurationWindow] and return. -func findSpansByTraceID(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) { - // query: trace_id:traceID - currentTime := time.Now() - startTime := currentTime.Add(-*traceSearchStep) - endTime := currentTime - var ( - rows []*Row - err error - ) - for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period. - rows, err = findSpansByTraceIDAndTime(ctx, cp, traceID, startTime, endTime) - if err != nil { - return nil, err - } - // no hit in this time range, continue with step. - if len(rows) == 0 { - endTime = startTime - startTime = startTime.Add(-*traceSearchStep) - continue - } - - // found result, perform extra search for traceMaxDurationWindow and then break. - extraRows, err := findSpansByTraceIDAndTime(ctx, cp, traceID, startTime.Add(-*traceMaxDurationWindow), startTime) - if err != nil { - return nil, err - } - rows = append(rows, extraRows...) - break - } - return rows, nil + return time.Time{}, vtstoragecommon.ErrOutOfRetention } // findSpansByTraceIDAndTime search for spans in given time range. @@ -495,7 +461,6 @@ func findSpansByTraceIDAndTime(ctx context.Context, cp *CommonParams, traceID st if err != nil { return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) } - ctxWithCancel, cancel := context.WithCancel(ctx) cp.Query = q qctx := cp.NewQueryContext(ctxWithCancel) diff --git a/app/vtstorage/common/common.go b/app/vtstorage/common/common.go new file mode 100644 index 000000000..12d13aaaf --- /dev/null +++ b/app/vtstorage/common/common.go @@ -0,0 +1,13 @@ +package common + +import "errors" + +const ( + // OutOfRetentionHeaderName header is for communication between vtstorage and vtselect, to notify + // vtselect that the query time range is completely out of the retention period. + OutOfRetentionHeaderName = "VT-Out-Of-Retention" +) + +var ( + ErrOutOfRetention = errors.New("request time out of retention") +) diff --git a/app/vtstorage/main.go b/app/vtstorage/main.go index 1f51a476e..ad5a495d5 100644 --- a/app/vtstorage/main.go +++ b/app/vtstorage/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -17,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netinsert" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netselect" ) @@ -419,6 +421,12 @@ func (*Storage) MustAddRows(lr *logstorage.LogRows) { } } +// isTimestampOutOfRetention check if nanosecond timestamp is earlier than retention. +func isTimestampOutOfRetention(timestamp int64) bool { + minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionPeriod.Milliseconds() + return timestamp/1000000 < minAllowedTimestamp +} + // RunQuery runs the given qctx and calls writeBlock for the returned data blocks func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error { qOpt, offset, limit := qctx.Query.GetLastNResultsQuery() @@ -428,6 +436,10 @@ func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBloc } if localStorage != nil { + _, endTimestamp := qctx.Query.GetFilterTimeRange() + if isTimestampOutOfRetention(endTimestamp) { + return common.ErrOutOfRetention + } return localStorage.RunQuery(qctx, writeBlock) } return netstorageSelect.RunQuery(qctx, writeBlock) diff --git a/app/vtstorage/main_test.go b/app/vtstorage/main_test.go new file mode 100644 index 000000000..353b664f1 --- /dev/null +++ b/app/vtstorage/main_test.go @@ -0,0 +1,46 @@ +package vtstorage + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + + "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common" +) + +func TestRunQueryOutOfRetention(t *testing.T) { + // Create the test storage + storagePath := t.Name() + cfg := &logstorage.StorageConfig{ + Retention: 7 * 24 * time.Hour, + } + localStorage = logstorage.MustOpenStorage(storagePath, cfg) + defer func() { + // Close and delete the test storage + localStorage.MustClose() + fs.MustRemoveDir(storagePath) + }() + + query, _ := logstorage.ParseQuery("*") + // add a time filter which within the default retention period (7d). + query.AddTimeFilter(0, time.Now().Add(-retentionPeriod.Duration()+5*time.Second).UnixNano()) + + // the query should be executed with empty result. + qctx := logstorage.NewQueryContext(context.TODO(), &logstorage.QueryStats{}, []logstorage.TenantID{}, query, false) + if err := RunQuery(qctx, func(workerID uint, db *logstorage.DataBlock) {}); err != nil { + t.Fatalf("RunQuery returns error for correct query") + } + + // add a time filter which obviously out of the default retention period (7d). + query.AddTimeFilter(0, time.Now().Add(-retentionPeriod.Duration()-10*time.Second).UnixNano()) + + // the query should stop with ErrOutOfRetention error + qctx = logstorage.NewQueryContext(context.TODO(), &logstorage.QueryStats{}, []logstorage.TenantID{}, query, false) + if !errors.Is(RunQuery(qctx, nil), common.ErrOutOfRetention) { + t.Fatalf("RunQuery fail to returns ErrOutOfRetention for query which with too small endTimestamp") + } +} diff --git a/app/vtstorage/netselect/netselect.go b/app/vtstorage/netselect/netselect.go index 8f223e180..bc854e6f5 100644 --- a/app/vtstorage/netselect/netselect.go +++ b/app/vtstorage/netselect/netselect.go @@ -22,6 +22,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/common" ) const ( @@ -292,6 +294,12 @@ func (sn *storageNode) getResponseBodyForPathAndArgs(ctx context.Context, path s } } + if resp.Header.Get(common.OutOfRetentionHeaderName) != "" { + // the netstorage will set this header only when the request time range + // is completely out of the retention period. + return nil, "", common.ErrOutOfRetention + } + if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { diff --git a/docs/victoriatraces/changelog/CHANGELOG.md b/docs/victoriatraces/changelog/CHANGELOG.md index 85ffa0d69..bef0f4773 100644 --- a/docs/victoriatraces/changelog/CHANGELOG.md +++ b/docs/victoriatraces/changelog/CHANGELOG.md @@ -11,6 +11,7 @@ The following `tip` changes can be tested by building VictoriaTraces components * [How to build single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/#how-to-build-from-sources) ## tip +* BUGFIX: [Single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) and [VictoriaTraces cluster](https://docs.victoriametrics.com/victoriatraces/cluster/): stop query at the earlier timestamp of the retention period when searching by a non-existed trace ID, and response earlier. See [#48](https://github.com/VictoriaMetrics/VictoriaTraces/issues/48) for details. Thank @JayiceZ for [the pull request](https://github.com/VictoriaMetrics/VictoriaTraces/pull/49). ## [v0.4.0](https://github.com/VictoriaMetrics/VictoriaTraces/releases/tag/v0.4.0) diff --git a/docs/victoriatraces/roadmap.md b/docs/victoriatraces/roadmap.md index 792db2e11..fc480a6e3 100644 --- a/docs/victoriatraces/roadmap.md +++ b/docs/victoriatraces/roadmap.md @@ -15,7 +15,6 @@ aliases: The following items need to be completed before general availability (GA) version: - [ ] Finalize the data structure and commit to backward compatibility. -- [ ] Finalize the data distribution algorithm in the cluster version. The following functionality is planned in the future versions of VictoriaTraces after GA: - [ ] Provide web UI to visualize traces.