Skip to content

Commit 1286dc0

Browse files
authored
Add CEL support for Data Quality rules (#313)
* Add CEL support for Data Quality rules * Minor cleanup
1 parent af20c7c commit 1286dc0

File tree

5 files changed

+569
-1
lines changed

5 files changed

+569
-1
lines changed

package-lock.json

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schemaregistry/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"@aws-sdk/credential-providers": "^3.637.0",
3434
"@azure/identity": "^4.4.1",
3535
"@azure/keyvault-keys": "^4.8.0",
36+
"@bufbuild/cel": "^0.1.0",
3637
"@bufbuild/protobuf": "^2.0.0",
3738
"@criteria/json-schema": "^0.10.0",
3839
"@criteria/json-schema-validation": "^0.10.0",
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import {RuleRegistry} from "../../serde/rule-registry";
2+
import {RuleContext, RuleExecutor} from "../../serde/serde";
3+
import {ClientConfig} from "../../rest-service";
4+
import stringify from "json-stringify-deterministic";
5+
import {LRUCache} from "lru-cache";
6+
import {createEnv} from "@bufbuild/cel";
7+
import {createRegistry} from "@bufbuild/protobuf";
8+
9+
export class CelExecutor implements RuleExecutor {
10+
config: Map<string, string> | null = null
11+
env = createEnv("", createRegistry());
12+
cache: LRUCache<string, any> = new LRUCache({max: 1000})
13+
14+
static register(): CelExecutor {
15+
const executor = new CelExecutor()
16+
RuleRegistry.registerRuleExecutor(executor)
17+
return executor
18+
}
19+
20+
configure(clientConfig: ClientConfig, config: Map<string, string>) {
21+
this.config = config
22+
}
23+
24+
type(): string {
25+
return "CEL"
26+
}
27+
28+
async transform(ctx: RuleContext, msg: any): Promise<any> {
29+
const args = {
30+
message: msg
31+
}
32+
return await this.execute(ctx, msg, args)
33+
}
34+
35+
async execute(ctx: RuleContext, msg: any, args: { [key: string]: any }): Promise<any> {
36+
let expr = ctx.rule.expr
37+
if (expr == null) {
38+
return msg
39+
}
40+
const index = expr.indexOf(';')
41+
if (index >= 0) {
42+
const guard = expr.substring(0, index)
43+
if (guard.trim().length != 0) {
44+
const guardResult = await this.executeRule(ctx, guard, msg, args)
45+
if (guardResult === false) {
46+
// skip the expr
47+
if (ctx.rule.kind === 'CONDITION') {
48+
return true
49+
}
50+
return msg
51+
}
52+
}
53+
expr = expr.substring(index + 1)
54+
}
55+
return await this.executeRule(ctx, expr, msg, args)
56+
}
57+
58+
async executeRule(ctx: RuleContext, expr: string, obj: any, args: { [key: string]: any }): Promise<any> {
59+
const schema = ctx.target.schema
60+
const scriptType = ctx.target.schemaType
61+
const rule: RuleWithArgs = {
62+
rule: expr,
63+
scriptType: scriptType,
64+
schema: schema
65+
}
66+
const ruleJson = stringify(rule)
67+
let program = this.cache.get(ruleJson)
68+
if (program == null) {
69+
const parsedExpr = this.env.parse(expr)
70+
program = this.env.plan(parsedExpr)
71+
this.cache.set(ruleJson, program)
72+
}
73+
for (const [key, value] of Object.entries(args)) {
74+
this.env.set(key, value)
75+
}
76+
return this.env.eval(program)
77+
}
78+
79+
async close(): Promise<void> {
80+
}
81+
}
82+
83+
interface RuleWithArgs {
84+
rule?: string
85+
scriptType?: string
86+
schema?: string
87+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import {RuleRegistry} from "../../serde/rule-registry";
2+
import {
3+
FieldContext,
4+
FieldRuleExecutor,
5+
FieldTransform,
6+
RuleContext,
7+
} from "../../serde/serde";
8+
import {ClientConfig} from "../../rest-service";
9+
import {CelExecutor} from "./cel-executor";
10+
11+
export class CelFieldExecutor extends FieldRuleExecutor {
12+
executor: CelExecutor = new CelExecutor()
13+
14+
static register(): CelFieldExecutor {
15+
const executor = new CelFieldExecutor()
16+
RuleRegistry.registerRuleExecutor(executor)
17+
return executor
18+
}
19+
20+
configure(clientConfig: ClientConfig, config: Map<string, string>) {
21+
this.config = config
22+
}
23+
24+
type(): string {
25+
return "CEL_FIELD"
26+
}
27+
28+
override newTransform(ctx: RuleContext): FieldTransform {
29+
return new CelFieldExecutorTransform(this.executor)
30+
}
31+
32+
async close(): Promise<void> {
33+
}
34+
}
35+
36+
export class CelFieldExecutorTransform implements FieldTransform {
37+
private executor: CelExecutor
38+
39+
constructor(executor: CelExecutor) {
40+
this.executor = executor
41+
}
42+
43+
async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
44+
if (fieldValue == null) {
45+
return null
46+
}
47+
if (!fieldCtx.isPrimitive()) {
48+
return fieldValue
49+
}
50+
const args = {
51+
value: fieldValue,
52+
fullName: fieldCtx.fullName,
53+
name: fieldCtx.name,
54+
typeName: fieldCtx.typeName(),
55+
tags: Array.from(fieldCtx.tags),
56+
message: fieldCtx.containingMessage
57+
}
58+
return await this.executor.execute(ctx, fieldValue, args)
59+
}
60+
}

0 commit comments

Comments
 (0)