Skip to content

Commit

Permalink
Fix field validation
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Jul 3, 2024
1 parent 9f59f37 commit bbeb2bc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion runtime/pkg/metricssql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (t *transformer) transformValueExpr(_ context.Context, node ast.ValueExpr)
func (t *transformer) fromQueryForMetricsView(ctx context.Context, mv *runtimev1.Resource) (string, error) {
spec := mv.GetMetricsView().State.ValidSpec
if spec == nil {
return "", fmt.Errorf("metrics view %q is not ready for querying, reconcile status: %q", mv.Meta.GetName(), mv.Meta.ReconcileStatus)
return "", fmt.Errorf("metrics view %q is not valid: (status: %q, error: %q)", mv.Meta.GetName(), mv.Meta.ReconcileStatus, mv.Meta.ReconcileError)
}

olap, release, err := t.controller.Runtime.OLAP(ctx, t.instanceID, spec.Connector)
Expand Down
39 changes: 22 additions & 17 deletions runtime/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,41 +75,46 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv
}
return nil, fmt.Errorf("could not find table %q: %w", mv.Table, err)
}

fields := make(map[string]*runtimev1.StructType_Field, len(t.Schema.Fields))
cols := make(map[string]*runtimev1.StructType_Field, len(t.Schema.Fields))
for _, f := range t.Schema.Fields {
fields[strings.ToLower(f.Name)] = f
cols[strings.ToLower(f.Name)] = f
}

// Check time dimension exists
if mv.TimeDimension != "" {
f, ok := cols[strings.ToLower(mv.TimeDimension)]
if !ok {
res.TimeDimensionErr = fmt.Errorf("timeseries %q is not a column in table %q", mv.TimeDimension, mv.Table)
} else if f.Type.Code != runtimev1.Type_CODE_TIMESTAMP && f.Type.Code != runtimev1.Type_CODE_DATE {
res.TimeDimensionErr = fmt.Errorf("timeseries %q is not a TIMESTAMP column", mv.TimeDimension)
}
}

// Check security policy rules apply to fields that exist
fields := make(map[string]bool, len(mv.Dimensions)+len(mv.Measures))
for _, d := range mv.Dimensions {
fields[strings.ToLower(d.Name)] = true
}
for _, m := range mv.Measures {
fields[strings.ToLower(m.Name)] = true
}
for _, rule := range mv.SecurityRules {
fa := rule.GetFieldAccess()
if fa == nil {
continue
}

for _, f := range fa.Fields {
if _, ok := fields[strings.ToLower(f)]; !ok {
res.OtherErrs = append(res.OtherErrs, fmt.Errorf("field %q referenced in 'security' is not a dimension or measure", f))
}
}
}

// Check time dimension exists
if mv.TimeDimension != "" {
f, ok := fields[strings.ToLower(mv.TimeDimension)]
if !ok {
res.TimeDimensionErr = fmt.Errorf("timeseries %q is not a column in table %q", mv.TimeDimension, mv.Table)
} else if f.Type.Code != runtimev1.Type_CODE_TIMESTAMP && f.Type.Code != runtimev1.Type_CODE_DATE {
res.TimeDimensionErr = fmt.Errorf("timeseries %q is not a TIMESTAMP column", mv.TimeDimension)
}
}

// For performance, attempt to validate all dimensions and measures at once
err = validateAllDimensionsAndMeasures(ctx, olap, t, mv)
if err != nil {
// One or more dimension/measure expressions failed to validate. We need to check each one individually to provide useful errors.
validateIndividualDimensionsAndMeasures(ctx, olap, t, mv, fields, res)
validateIndividualDimensionsAndMeasures(ctx, olap, t, mv, cols, res)
}

// Pinot does have any native support for time shift using time grain specifiers
Expand Down Expand Up @@ -191,7 +196,7 @@ func validateAllDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStor

// validateIndividualDimensionsAndMeasures validates each dimension and measure individually.
// It adds validation errors to the provided res.
func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec, fields map[string]*runtimev1.StructType_Field, res *ValidateMetricsViewResult) {
func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.OLAPStore, t *drivers.Table, mv *runtimev1.MetricsViewSpec, cols map[string]*runtimev1.StructType_Field, res *ValidateMetricsViewResult) {
// Validate dimensions and measures concurrently with a limit of 10 concurrent validations
var mu sync.Mutex
var grp errgroup.Group
Expand All @@ -202,7 +207,7 @@ func validateIndividualDimensionsAndMeasures(ctx context.Context, olap drivers.O
idx := idx
d := d
grp.Go(func() error {
err := validateDimension(ctx, olap, t, d, fields)
err := validateDimension(ctx, olap, t, d, cols)
if err != nil {
mu.Lock()
defer mu.Unlock()
Expand Down

0 comments on commit bbeb2bc

Please sign in to comment.