Skip to content

Commit

Permalink
sort labels enum and add flat argument (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Oct 10, 2024
1 parent 1a00a73 commit 78cbf86
Show file tree
Hide file tree
Showing 42 changed files with 480 additions and 133 deletions.
29 changes: 25 additions & 4 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ type updateCommand struct {
Exclude []*regexp.Regexp
ExcludeLabels []ExcludeLabels

coroutines int
existedMetrics map[string]any
lock sync.Mutex
coroutines int
apiFormatExists bool
existedMetrics map[string]any
lock sync.Mutex
}

// SetMetadataMetric sets the metadata metric item
Expand Down Expand Up @@ -246,6 +247,7 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
if len(uc.Config.Metadata.NativeOperations.Queries) == 0 {
return nil
}
uc.checkAPIFormatQueryExist(ctx)

newNativeQueries := make(map[string]metadata.NativeQuery)
for key, nativeQuery := range uc.Config.Metadata.NativeOperations.Queries {
Expand All @@ -271,7 +273,7 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return fmt.Errorf("invalid argument type `%s` in the native query `%s`", k, key)
}
}
_, err = uc.Client.FormatQuery(ctx, query)
err = uc.validateQuery(ctx, query)
if err != nil {
return fmt.Errorf("invalid native query %s: %s", key, err)
}
Expand All @@ -291,6 +293,25 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return nil
}

func (uc *updateCommand) checkAPIFormatQueryExist(ctx context.Context) {
_, err := uc.Client.FormatQuery(ctx, "up")
uc.apiFormatExists = err == nil

if err != nil {
slog.Debug("failed to request /api/v1/format_query endpoint", slog.String("error", err.Error()))
}
}

func (uc *updateCommand) validateQuery(ctx context.Context, query string) error {
if uc.apiFormatExists {
_, err := uc.Client.FormatQuery(ctx, query)
return err
}

_, _, err := uc.Client.Query(ctx, query, "now", "30s")
return err
}

func (uc *updateCommand) writeConfigFile() error {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
Expand Down
23 changes: 17 additions & 6 deletions connector/internal/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,21 @@ func (qce *QueryCollectionExecutor) Execute(ctx context.Context) (*schema.RowSet
}, nil
}

flat, err := utils.DecodeNullableBoolean(qce.Arguments[metadata.ArgumentKeyFlat])
if err != nil {
return nil, schema.UnprocessableContentError(fmt.Sprintf("expected boolean type for the flat field, got: %v", err), map[string]any{
"field": metadata.ArgumentKeyFlat,
})
}
if flat == nil {
flat = &qce.Runtime.Flat
}

var rawResults []map[string]any
if expressions.Timestamp != nil {
rawResults, err = qce.queryInstant(ctx, queryString, expressions)
rawResults, err = qce.queryInstant(ctx, queryString, expressions, *flat)
} else {
rawResults, err = qce.queryRange(ctx, queryString, expressions)
rawResults, err = qce.queryRange(ctx, queryString, expressions, *flat)
}

if err != nil {
Expand All @@ -93,8 +103,9 @@ func (qce *QueryCollectionExecutor) Execute(ctx context.Context) (*schema.RowSet
}, nil
}

func (qce *QueryCollectionExecutor) queryInstant(ctx context.Context, queryString string, predicate *CollectionRequest) ([]map[string]any, error) {
func (qce *QueryCollectionExecutor) queryInstant(ctx context.Context, queryString string, predicate *CollectionRequest, flat bool) ([]map[string]any, error) {
timeout := qce.Arguments[metadata.ArgumentKeyTimeout]

timestamp, err := qce.getComparisonValue(predicate.Timestamp)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), map[string]any{
Expand All @@ -119,11 +130,11 @@ func (qce *QueryCollectionExecutor) queryInstant(ctx context.Context, queryStrin
vector = vector[:*qce.Request.Query.Limit]
}

results := createQueryResultsFromVector(vector, qce.Metric.Labels, qce.Runtime)
results := createQueryResultsFromVector(vector, qce.Metric.Labels, qce.Runtime, flat)
return results, nil
}

func (qce *QueryCollectionExecutor) queryRange(ctx context.Context, queryString string, predicate *CollectionRequest) ([]map[string]any, error) {
func (qce *QueryCollectionExecutor) queryRange(ctx context.Context, queryString string, predicate *CollectionRequest, flat bool) ([]map[string]any, error) {
step := qce.Arguments[metadata.ArgumentKeyStep]
timeout := qce.Arguments[metadata.ArgumentKeyTimeout]

Expand Down Expand Up @@ -154,7 +165,7 @@ func (qce *QueryCollectionExecutor) queryRange(ctx context.Context, queryString
}
matrix = matrix[*qce.Request.Query.Offset:]
}
results := createQueryResultsFromMatrix(matrix, qce.Metric.Labels, qce.Runtime)
results := createQueryResultsFromMatrix(matrix, qce.Metric.Labels, qce.Runtime, flat)

if qce.Request.Query.Limit != nil && *qce.Request.Query.Limit < len(results) {
results = results[:*qce.Request.Query.Limit]
Expand Down
29 changes: 2 additions & 27 deletions connector/internal/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ var testCases = []struct {
QueryString: `go_gc_duration_seconds{job!~"localhost:9090|node|prometheus"}`,
},
{
Name: "label_expressions_nin",
Name: "label_expressions_regex",
Request: schema.QueryRequest{
Collection: "go_gc_duration_seconds",
Arguments: schema.QueryRequestArguments{},
Expand Down Expand Up @@ -243,7 +243,7 @@ var testCases = []struct {
QueryString: `go_gc_duration_seconds{job=~"node.*"}`,
},
{
Name: "label_expressions_nin",
Name: "label_expressions_nregex",
Request: schema.QueryRequest{
Collection: "go_gc_duration_seconds",
Arguments: schema.QueryRequestArguments{},
Expand Down Expand Up @@ -296,31 +296,6 @@ var testCases = []struct {
},
QueryString: `go_gc_duration_seconds{job=~"ndc-prometheus|node|prometheus"}`,
},
{
Name: "label_expressions_in_string_pg",
Request: schema.QueryRequest{
Collection: "go_gc_duration_seconds",
Arguments: schema.QueryRequestArguments{
"timeout": schema.NewArgumentLiteral("5m").Encode(),
},
Query: schema.Query{
Predicate: schema.NewExpressionAnd(
schema.NewExpressionBinaryComparisonOperator(*schema.NewComparisonTargetColumn("job", nil, nil), "_in", schema.NewComparisonValueScalar(`{ndc-prometheus,node,prometheus}`)),
).Encode(),
},
},
Predicate: CollectionRequest{
LabelExpressions: map[string]*LabelExpression{
"job": {
Name: "job",
Expressions: []schema.ExpressionBinaryComparisonOperator{
*schema.NewExpressionBinaryComparisonOperator(*schema.NewComparisonTargetColumn("job", nil, nil), "_in", schema.NewComparisonValueScalar(`{ndc-prometheus,node,prometheus}`)),
},
},
},
},
QueryString: `go_gc_duration_seconds{job=~"ndc-prometheus|node|prometheus"}`,
},
{
Name: "label_expressions_eq_neq",
Request: schema.QueryRequest{
Expand Down
11 changes: 2 additions & 9 deletions connector/internal/expression_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/hasura/ndc-sdk-go/utils"
)

var pgArrayStringRegex = regexp.MustCompile(`^{([\w-,]+)}$`)

// LabelExpressionField the structured data of a label field expression
type LabelExpressionField struct {
Value string
Expand Down Expand Up @@ -222,13 +220,8 @@ func decodeStringSlice(value any) ([]string, error) {
var err error
sliceValue := []string{}
if str, ok := value.(string); ok {
matches := pgArrayStringRegex.FindStringSubmatch(str)
if len(matches) > 1 {
sliceValue = strings.Split(matches[1], ",")
} else {
// try to parse the slice from the json string
err = json.Unmarshal([]byte(str), &sliceValue)
}
// try to parse the slice from the json string
err = json.Unmarshal([]byte(str), &sliceValue)
} else {
sliceValue, err = utils.DecodeStringSlice(value)
}
Expand Down
30 changes: 20 additions & 10 deletions connector/internal/native_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,21 @@ func (nqe *NativeQueryExecutor) execute(ctx context.Context, params *nativeQuery
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

flat, err := utils.DecodeNullableBoolean(nqe.Arguments[metadata.ArgumentKeyFlat])
if err != nil {
return nil, schema.UnprocessableContentError(fmt.Sprintf("expected boolean type for the flat field, got: %v", err), map[string]any{
"field": metadata.ArgumentKeyFlat,
})
}
if flat == nil {
flat = &nqe.Runtime.Flat
}

var rawResults []map[string]any
if _, ok := nqe.Arguments[metadata.ArgumentKeyTime]; ok {
rawResults, err = nqe.queryInstant(ctx, queryString, params)
rawResults, err = nqe.queryInstant(ctx, queryString, params, *flat)
} else {
rawResults, err = nqe.queryRange(ctx, queryString, params)
rawResults, err = nqe.queryRange(ctx, queryString, params, *flat)
}

if err != nil {
Expand All @@ -184,28 +194,28 @@ func (nqe *NativeQueryExecutor) execute(ctx context.Context, params *nativeQuery
}, nil
}

func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString string, params *nativeQueryParameters) ([]map[string]any, error) {
func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString string, params *nativeQueryParameters, flat bool) ([]map[string]any, error) {
vector, _, err := nqe.Client.Query(ctx, queryString, params.Timestamp, params.Timeout)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}
results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime)
results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat)

return results, nil
}

func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString string, params *nativeQueryParameters) ([]map[string]any, error) {
func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString string, params *nativeQueryParameters, flat bool) ([]map[string]any, error) {
matrix, _, err := nqe.Client.QueryRange(ctx, queryString, params.Start, params.End, params.Step, params.Timeout)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime)
results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat)

return results, nil
}

func createQueryResultsFromVector(vector model.Vector, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings) []map[string]any {
func createQueryResultsFromVector(vector model.Vector, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings, flat bool) []map[string]any {
results := make([]map[string]any, len(vector))
for i, item := range vector {
ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
Expand All @@ -219,7 +229,7 @@ func createQueryResultsFromVector(vector model.Vector, labels map[string]metadat
for label := range labels {
r[label] = string(item.Metric[model.LabelName(label)])
}
if !runtime.Flat {
if !flat {
r[metadata.ValuesKey] = []map[string]any{
{
metadata.TimestampKey: ts,
Expand All @@ -234,8 +244,8 @@ func createQueryResultsFromVector(vector model.Vector, labels map[string]metadat
return results
}

func createQueryResultsFromMatrix(matrix model.Matrix, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings) []map[string]any {
if runtime.Flat {
func createQueryResultsFromMatrix(matrix model.Matrix, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings, flat bool) []map[string]any {
if flat {
return createFlatQueryResultsFromMatrix(matrix, labels, runtime)
}

Expand Down
9 changes: 7 additions & 2 deletions connector/metadata/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ var defaultObjectTypes = map[string]schema.ObjectType{
}

const (
ArgumentKeyFlat = "flat"
ArgumentKeyTime = "time"
ArgumentKeyTimeout = "timeout"
ArgumentKeyStart = "start"
Expand Down Expand Up @@ -293,11 +294,15 @@ var defaultArgumentInfos = map[string]schema.ArgumentInfo{
Type: schema.NewNullableNamedType(string(ScalarTimestamp)).Encode(),
},
ArgumentKeyStep: {
Description: utils.ToPtr("Query resolution step width in duration format or float number of seconds."),
Description: utils.ToPtr("Query resolution step width in duration format or float number of seconds"),
Type: schema.NewNullableNamedType(string(ScalarDuration)).Encode(),
},
ArgumentKeyOffset: {
Description: utils.ToPtr("The offset modifier allows changing the time offset for individual instant and range vectors in a query."),
Description: utils.ToPtr("The offset modifier allows changing the time offset for individual instant and range vectors in a query"),
Type: schema.NewNullableNamedType(string(ScalarDuration)).Encode(),
},
ArgumentKeyFlat: {
Description: utils.ToPtr("Flatten nested the values group to the root array"),
Type: schema.NewNullableNamedType(string(ScalarBoolean)).Encode(),
},
}
2 changes: 1 addition & 1 deletion connector/metadata/native_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ReplaceNativeQueryVariable(query string, name string, value string) string

func createNativeQueryArguments() schema.FunctionInfoArguments {
arguments := schema.FunctionInfoArguments{}
for _, key := range []string{ArgumentKeyStart, ArgumentKeyEnd, ArgumentKeyStep, ArgumentKeyTime, ArgumentKeyTimeout} {
for _, key := range []string{ArgumentKeyStart, ArgumentKeyEnd, ArgumentKeyStep, ArgumentKeyTime, ArgumentKeyTimeout, ArgumentKeyFlat} {
arguments[key] = defaultArgumentInfos[key]
}
return arguments
Expand Down
2 changes: 1 addition & 1 deletion connector/metadata/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func createQueryResultValuesObjectFields() schema.ObjectTypeFields {

func createCollectionArguments() schema.CollectionInfoArguments {
arguments := schema.CollectionInfoArguments{}
for _, key := range []string{ArgumentKeyStep, ArgumentKeyTimeout, ArgumentKeyOffset} {
for _, key := range []string{ArgumentKeyStep, ArgumentKeyTimeout, ArgumentKeyOffset, ArgumentKeyFlat} {
arguments[key] = defaultArgumentInfos[key]
}
return arguments
Expand Down
2 changes: 2 additions & 0 deletions connector/metadata/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metadata

import (
"fmt"
"slices"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
Expand Down Expand Up @@ -109,6 +110,7 @@ func (scb *connectorSchemaBuilder) buildMetricsItem(name string, info MetricInfo
objectName := strcase.ToCamel(name)
scb.ObjectTypes[objectName] = objectType

slices.Sort(labelEnums)
labelEnumScalarName := fmt.Sprintf("%sLabel", objectName)
scalarType := schema.NewScalarType()
scalarType.Representation = schema.NewTypeRepresentationEnum(labelEnums).Encode()
Expand Down
5 changes: 2 additions & 3 deletions tests/configuration/configuration.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# yaml-language-server: $schema=../../jsonschema/configuration.json
# yaml-language-server: $schema=https://raw.githubusercontent.com/hasura/ndc-prometheus/main/jsonschema/configuration.json
connection_settings:
url:
env: CONNECTION_URL
Expand Down Expand Up @@ -110,7 +110,6 @@ metadata:
instance: {}
job: {}
otel_scope_name: {}
otel_scope_version: {}
process_cpu_seconds_total:
type: counter
description: Total user and system CPU time spent in seconds.
Expand Down Expand Up @@ -211,7 +210,7 @@ metadata:
runtime:
flat: true
unix_time_unit: s
concurrency_limit: 3
format:
timestamp: rfc3339
value: float64
concurrency_limit: 3
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,9 @@ definition:
- name: timeout
type: Duration
description: Evaluation timeout
- name: flat
type: Boolean
description: Flatten nested the values group to the root array
source:
dataConnectorName: prometheus
collection: http_client_duration_milliseconds_bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ definition:
- name: timeout
type: Duration
description: Evaluation timeout
- name: flat
type: Boolean
description: Flatten nested the values group to the root array
source:
dataConnectorName: prometheus
collection: http_client_duration_milliseconds_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ definition:
- name: timeout
type: Duration
description: Evaluation timeout
- name: flat
type: Boolean
description: Flatten nested the values group to the root array
source:
dataConnectorName: prometheus
collection: http_client_duration_milliseconds_sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ definition:
- name: timeout
type: Duration
description: Evaluation timeout
- name: flat
type: Boolean
description: Flatten nested the values group to the root array
source:
dataConnectorName: prometheus
collection: http_client_request_size_bytes_total
Expand Down
Loading

0 comments on commit 78cbf86

Please sign in to comment.