Skip to content

Commit

Permalink
support boolean expression for native query
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Oct 10, 2024
1 parent 78cbf86 commit eb0b666
Show file tree
Hide file tree
Showing 12 changed files with 679 additions and 277 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ ci-build-configuration: clean
build-supergraph-test:
cd tests/engine && \
ddn connector-link update prometheus --add-all-resources --subgraph ./app/subgraph.yaml && \
ddn supergraph build local
ddn supergraph build local && \
docker compose up -d --build engine

.PHONY: generate-api-types
generate-api-types:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The `timestamp` and `value` fields are the result of the instant query. If the r
- `offset`: the offset modifier allows changing the time offset for individual instant and range vectors in a query.
- `timeout`: the evaluation timeout of the request.
- `fn`: the array of composable PromQL functions.
- `flat`: flatten grouped values out the root array. Use the runtime setting if the value is null.

#### Aggregation

Expand Down Expand Up @@ -154,6 +155,7 @@ The native query is exposed as a read-only function with 2 required fields `job`
- `time`: Evaluation timestamp. Use this argument if you want to run an instant query.
- `step`: the query resolution step width in duration format or float number of seconds. The step should be explicitly set for range queries. Even though the connector can estimate the approximate step width the result may be empty due to too far interval.
- `timeout`: the evaluation timeout of the request.
- `flat`: flatten grouped values out the root array. Use the runtime setting if the value is null

### Prometheus APIs

Expand Down Expand Up @@ -259,7 +261,7 @@ runtime:

#### Flatten values

By default, values are grouped by the label set. If you want to flatten the values array set `flat=true`.
By default, values are grouped by the label set. If you want to flatten out the values array, set `flat=true`.

#### Unix timestamp's unit

Expand Down
35 changes: 0 additions & 35 deletions connector/internal/expression_label.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package internal

import (
"encoding/json"
"fmt"
"regexp"
"slices"
Expand Down Expand Up @@ -212,37 +211,3 @@ func (le *LabelExpressionBuilder) evalLabelComparison(operator string, value any

return true, nil
}

func decodeStringSlice(value any) ([]string, error) {
if utils.IsNil(value) {
return nil, nil
}
var err error
sliceValue := []string{}
if str, ok := value.(string); ok {
// try to parse the slice from the json string
err = json.Unmarshal([]byte(str), &sliceValue)
} else {
sliceValue, err = utils.DecodeStringSlice(value)
}
if err != nil {
return nil, err
}

return sliceValue, nil
}

func intersection[T comparable](sliceA []T, sliceB []T) []T {
var result []T
if len(sliceA) == 0 || len(sliceB) == 0 {
return result
}

for _, a := range sliceA {
if slices.Contains(sliceB, a) {
result = append(result, a)
}
}

return result
}
145 changes: 28 additions & 117 deletions connector/internal/native_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/hasura/ndc-prometheus/connector/client"
"github.com/hasura/ndc-prometheus/connector/metadata"
"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -21,6 +19,14 @@ type nativeQueryParameters struct {
End any
Timeout any
Step any
Where map[string]NativeQueryLabelBoolExp
}

// NewNativeQueryParameters creates a nativeQueryParameters instance
func NewNativeQueryParameters() *nativeQueryParameters {
return &nativeQueryParameters{
Where: make(map[string]NativeQueryLabelBoolExp),
}
}

type NativeQueryExecutor struct {
Expand All @@ -37,7 +43,7 @@ type NativeQueryExecutor struct {
// Explain explains the query request
func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*nativeQueryParameters, string, error) {
var err error
params := &nativeQueryParameters{}
params := NewNativeQueryParameters()
queryString := nqe.NativeQuery.Query
for key, arg := range nqe.Arguments {
switch key {
Expand All @@ -51,6 +57,16 @@ func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*nativeQueryParame
params.Timestamp = arg
case metadata.ArgumentKeyTimeout:
params.Timeout = arg
case metadata.ArgumentKeyWhere:
if utils.IsNil(arg) {
continue
}

boolExps, err := decodeNativeQueryLabelBoolExps(arg)
if err != nil {
return nil, "", schema.UnprocessableContentError(err.Error(), nil)
}
params.Where = boolExps
default:
argInfo, ok := nqe.NativeQuery.Arguments[key]
if ok {
Expand Down Expand Up @@ -99,7 +115,7 @@ func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*nativeQueryParame

// ExplainRaw explains the raw promQL query request
func (nqe *NativeQueryExecutor) ExplainRaw(ctx context.Context) (*nativeQueryParameters, string, error) {
params := &nativeQueryParameters{}
params := NewNativeQueryParameters()
var err error
var queryString string
for key, arg := range nqe.Arguments {
Expand Down Expand Up @@ -199,6 +215,11 @@ func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString st
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

span := trace.SpanFromContext(ctx)
span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("where", params.Where)))
vector = nqe.filterVectorResults(vector, params.Where)

results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat)

return results, nil
Expand All @@ -210,120 +231,10 @@ func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString stri
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

span := trace.SpanFromContext(ctx)
span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("where", params.Where)))
matrix = nqe.filterMatrixResults(matrix, params.Where)
results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat)

return results, nil
}

func createQueryResultsFromVector(vector model.Vector, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings, flat bool) []map[string]any {
results := make([]map[string]any, len(vector))
for i, item := range vector {
ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
value := formatValue(item.Value, runtime.Format.Value)
r := map[string]any{
metadata.TimestampKey: ts,
metadata.ValueKey: value,
metadata.LabelsKey: item.Metric,
}

for label := range labels {
r[label] = string(item.Metric[model.LabelName(label)])
}
if !flat {
r[metadata.ValuesKey] = []map[string]any{
{
metadata.TimestampKey: ts,
metadata.ValueKey: value,
},
}
}

results[i] = r
}

return results
}

func createQueryResultsFromMatrix(matrix model.Matrix, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings, flat bool) []map[string]any {
if flat {
return createFlatQueryResultsFromMatrix(matrix, labels, runtime)
}

return createGroupQueryResultsFromMatrix(matrix, labels, runtime)
}

func createGroupQueryResultsFromMatrix(matrix model.Matrix, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings) []map[string]any {
results := make([]map[string]any, len(matrix))
for i, item := range matrix {
r := map[string]any{
metadata.LabelsKey: item.Metric,
}

for label := range labels {
r[label] = string(item.Metric[model.LabelName(label)])
}

valuesLen := len(item.Values)
values := make([]map[string]any, valuesLen)
for i, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
v := formatValue(value.Value, runtime.Format.Value)
values[i] = map[string]any{
metadata.TimestampKey: ts,
metadata.ValueKey: v,
}
if i == valuesLen-1 {
r[metadata.TimestampKey] = ts
r[metadata.ValueKey] = v
}
}

r[metadata.ValuesKey] = values
results[i] = r
}

return results
}

func createFlatQueryResultsFromMatrix(matrix model.Matrix, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings) []map[string]any {
results := []map[string]any{}

for _, item := range matrix {
for _, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
v := formatValue(value.Value, runtime.Format.Value)
r := map[string]any{
metadata.LabelsKey: item.Metric,
metadata.TimestampKey: ts,
metadata.ValueKey: v,
metadata.ValuesKey: nil,
}

for label := range labels {
r[label] = string(item.Metric[model.LabelName(label)])
}

results = append(results, r)
}
}

return results
}

func formatTimestamp(ts model.Time, format metadata.TimestampFormat, unixTime client.UnixTimeUnit) any {
switch format {
case metadata.TimestampRFC3339:
return ts.Time().Format(time.RFC3339)
default:
return ts.Unix() * int64(time.Second/unixTime.Duration())
}
}

func formatValue(value model.SampleValue, format metadata.ValueFormat) any {
switch format {
case metadata.ValueFloat64:
return float64(value)
default:
return value.String()
}
}
Loading

0 comments on commit eb0b666

Please sign in to comment.