Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Oct 11, 2024
1 parent a936e0a commit 3e87125
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 95 deletions.
31 changes: 13 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ This connector is built using the [Go Data Connector SDK](https://github.com/has

## Features

### Metrics Collection
### Metrics

#### How it works

The connector can introspect and automatically transform available metrics on the Prometheus server to collection queries. Each collection has a common structure:
The connector can introspect and automatically transform available metrics on the Prometheus server to collection queries. Each metric has a common structure:

```gql
{
Expand Down Expand Up @@ -108,7 +108,7 @@ The equivalent GraphQL query will be:

#### How it works

When simple queries don't meet your need you can define native queries in [the configuration file](./tests/configuration/configuration.yaml) with prepared variables with the `${<name>}` template.
When simple queries don't meet your need you can define native queries in [the configuration file](./tests/configuration/configuration.yaml) with prepared variables with the `${<name>}` template. Native queries are defined as collections.

```yaml
metadata:
Expand All @@ -131,16 +131,17 @@ The native query is exposed as a read-only function with 2 required fields `job`
```gql
{
service_up(
start: "2024-09-24T00:00:00Z"
job: "node"
instance: "localhost:9090"
args: { step: "1m", job: "node", instance: "node-exporter:9100" }
where: {
timestamp: { _gt: "2024-10-11T00:00:00Z" }
job: { _in: ["node"] }
}
) {
timestamp
value
labels
job
instance
values {
value
timestamp
value
}
}
}
Expand All @@ -149,14 +150,8 @@ The native query is exposed as a read-only function with 2 required fields `job`
> [!NOTE]
> Labels aren't automatically added. You need to define them manually.

#### Common arguments

- `start` & `end`: time range arguments for the range query.
- `time`: Evaluation timestamp. Use this argument if you want to run an instant query.
- `step`: the query resolution step width in duration format or float number of seconds. The step should be explicitly set for range queries. Even though the connector can estimate the approximate step width the result may be empty due to too far interval.
- `timeout`: the evaluation timeout of the request.
- `flat`: flatten grouped values out of the root array. Use the runtime setting if the value is null
- `where`: boolean expression to post-filter results by labels. This argument is designed for permissions.
> [!NOTE]
> Label and value boolean expressions in `where` are used to filter results after the query was executed.

### Prometheus APIs

Expand Down
6 changes: 1 addition & 5 deletions connector/internal/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,7 @@ func (qce *QueryCollectionExecutor) evalValueComparisonCondition(operator *schem
if operator == nil {
return "", nil
}
value, err := qce.getComparisonValue(operator.Value)
if err != nil {
return "", fmt.Errorf("invalid value expression: %s", err)
}
v, err := utils.DecodeNullableFloat[float64](value)
v, err := getComparisonValueFloat64(operator.Value, qce.Variables)
if err != nil {
return "", fmt.Errorf("invalid value expression: %s", err)
}
Expand Down
74 changes: 61 additions & 13 deletions connector/internal/native_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString stri

span := trace.SpanFromContext(ctx)
span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression)))
matrix, err = nqe.filterMatrixResults(matrix, params.Expression)
matrix, err = nqe.filterMatrixResults(matrix, params)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}
Expand All @@ -176,7 +176,7 @@ func (nqe *NativeQueryExecutor) filterVectorResults(vector model.Vector, expr sc
}
results := model.Vector{}
for _, item := range vector {
valid, err := nqe.validateLabelBoolExp(item.Metric, expr)
valid, err := nqe.validateBoolExp(item.Metric, item.Value, expr)
if err != nil {
return nil, err
}
Expand All @@ -187,34 +187,82 @@ func (nqe *NativeQueryExecutor) filterVectorResults(vector model.Vector, expr sc
return results, nil
}

func (nqe *NativeQueryExecutor) filterMatrixResults(matrix model.Matrix, expr schema.Expression) (model.Matrix, error) {
if expr == nil || len(matrix) == 0 {
func (nqe *NativeQueryExecutor) filterMatrixResults(matrix model.Matrix, params *NativeQueryRequest) (model.Matrix, error) {
if params.Expression == nil || len(matrix) == 0 {
return matrix, nil
}
results := model.Matrix{}
for _, item := range matrix {
valid, err := nqe.validateLabelBoolExp(item.Metric, expr)
if err != nil {
return nil, err
if !params.HasValueBoolExp {
valid, err := nqe.validateBoolExp(item.Metric, 0, params.Expression)
if err != nil {
return nil, err
}
if valid {
results = append(results, item)
}
continue
}
if valid {
results = append(results, item)

newItem := model.SampleStream{
Metric: item.Metric,
Histograms: item.Histograms,
}

for _, v := range item.Values {
valid, err := nqe.validateBoolExp(item.Metric, v.Value, params.Expression)
if err != nil {
return nil, err
}
if valid {
newItem.Values = append(newItem.Values, v)
}
}

if len(newItem.Values) > 0 {
results = append(results, &newItem)
}
}
return results, nil
}

func (nqe *NativeQueryExecutor) validateLabelBoolExp(labels model.Metric, expr schema.Expression) (bool, error) {
func (nqe *NativeQueryExecutor) validateBoolExp(labels model.Metric, value model.SampleValue, expr schema.Expression) (bool, error) {
switch exprs := expr.Interface().(type) {
case *schema.ExpressionAnd:
for _, e := range exprs.Expressions {
valid, err := nqe.validateLabelBoolExp(labels, e)
valid, err := nqe.validateBoolExp(labels, value, e)
if !valid || err != nil {
return false, err
}
}
return true, nil
case *schema.ExpressionBinaryComparisonOperator:
if exprs.Column.Name == metadata.ValueKey {
floatValue, err := getComparisonValueFloat64(exprs.Value, nqe.Variables)
if err != nil {
return false, err
}
if floatValue == nil {
return true, nil
}
switch exprs.Operator {
case metadata.Equal:
return value.Equal(model.SampleValue(*floatValue)), nil
case metadata.NotEqual:
return !value.Equal(model.SampleValue(*floatValue)), nil
case metadata.Least:
return float64(value) < *floatValue, nil
case metadata.LeastOrEqual:
return float64(value) <= *floatValue, nil
case metadata.Greater:
return float64(value) > *floatValue, nil
case metadata.GreaterOrEqual:
return float64(value) >= *floatValue, nil
default:
return false, fmt.Errorf("unsupported value operator %s", exprs.Operator)
}
}

labelValue := labels[model.LabelName(exprs.Column.Name)]
switch exprs.Operator {
case metadata.Equal, metadata.NotEqual, metadata.Regex, metadata.NotRegex:
Expand Down Expand Up @@ -255,7 +303,7 @@ func (nqe *NativeQueryExecutor) validateLabelBoolExp(labels model.Metric, expr s
}
}
case *schema.ExpressionNot:
valid, err := nqe.validateLabelBoolExp(labels, exprs.Expression)
valid, err := nqe.validateBoolExp(labels, value, exprs.Expression)
if err != nil {
return false, err
}
Expand All @@ -265,7 +313,7 @@ func (nqe *NativeQueryExecutor) validateLabelBoolExp(labels model.Metric, expr s
return true, nil
}
for _, e := range exprs.Expressions {
valid, err := nqe.validateLabelBoolExp(labels, e)
valid, err := nqe.validateBoolExp(labels, value, e)
if err != nil {
return false, err
}
Expand Down
34 changes: 26 additions & 8 deletions connector/internal/native_query_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (

// NativeQueryRequest the structured native request which is evaluated from the raw expression
type NativeQueryRequest struct {
Timestamp any
Start any
End any
Timeout any
Step any
OrderBy []ColumnOrder
Variables map[string]any
Expression schema.Expression
Timestamp any
Start any
End any
Timeout any
Step any
OrderBy []ColumnOrder
Variables map[string]any
Expression schema.Expression
HasValueBoolExp bool
}

// EvalNativeQueryRequest evaluates the requested collection data of the query request
Expand Down Expand Up @@ -61,6 +62,20 @@ func (pr *NativeQueryRequest) evalQueryPredicate(expression schema.Expression) (
}
}
return schema.NewExpressionAnd(exprs...), nil
case *schema.ExpressionOr:
exprs := []schema.ExpressionEncoder{}
for _, nestedExpr := range expr.Expressions {
evalExpr, err := pr.evalQueryPredicate(nestedExpr)
if err != nil {
return nil, err
}
if evalExpr != nil {
exprs = append(exprs, evalExpr)
}
}
return schema.NewExpressionOr(exprs...), nil
case *schema.ExpressionNot, *schema.ExpressionUnaryComparisonOperator:
return expr, nil
case *schema.ExpressionBinaryComparisonOperator:
if expr.Column.Type != schema.ComparisonTargetTypeColumn {
return nil, fmt.Errorf("%s: unsupported comparison target `%s`", expr.Column.Name, expr.Column.Type)
Expand Down Expand Up @@ -102,6 +117,9 @@ func (pr *NativeQueryRequest) evalQueryPredicate(expression schema.Expression) (
default:
return nil, fmt.Errorf("unsupported operator `%s` for the timestamp", expr.Operator)
}
case metadata.ValueKey:
pr.HasValueBoolExp = true
return expr, nil
default:
return expr, nil
}
Expand Down
Loading

0 comments on commit 3e87125

Please sign in to comment.