Skip to content

Add CEL support for Data Quality rules #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions schemaregistry/package.json
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
"@aws-sdk/credential-providers": "^3.637.0",
"@azure/identity": "^4.4.1",
"@azure/keyvault-keys": "^4.8.0",
"@bufbuild/cel": "^0.1.0",
"@bufbuild/protobuf": "^2.0.0",
"@criteria/json-schema": "^0.10.0",
"@criteria/json-schema-validation": "^0.10.0",
87 changes: 87 additions & 0 deletions schemaregistry/rules/cel/cel-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {RuleRegistry} from "../../serde/rule-registry";
import {RuleContext, RuleExecutor} from "../../serde/serde";
import {ClientConfig} from "../../rest-service";
import stringify from "json-stringify-deterministic";
import {LRUCache} from "lru-cache";
import {createEnv} from "@bufbuild/cel";
import {createRegistry} from "@bufbuild/protobuf";

export class CelExecutor implements RuleExecutor {
config: Map<string, string> | null = null
env = createEnv("", createRegistry());
cache: LRUCache<string, any> = new LRUCache({max: 1000})

static register(): CelExecutor {
const executor = new CelExecutor()
RuleRegistry.registerRuleExecutor(executor)
return executor
}

configure(clientConfig: ClientConfig, config: Map<string, string>) {
this.config = config
}

type(): string {
return "CEL"
}

async transform(ctx: RuleContext, msg: any): Promise<any> {
const args = {
message: msg
}
return await this.execute(ctx, msg, args)
}

async execute(ctx: RuleContext, msg: any, args: { [key: string]: any }): Promise<any> {
let expr = ctx.rule.expr
if (expr == null) {
return msg
}
const index = expr.indexOf(';')
if (index >= 0) {
const guard = expr.substring(0, index)
if (guard.trim().length != 0) {
const guardResult = await this.executeRule(ctx, guard, msg, args)
if (guardResult === false) {
// skip the expr
if (ctx.rule.kind === 'CONDITION') {
return true
}
return msg
}
}
expr = expr.substring(index + 1)
}
return await this.executeRule(ctx, expr, msg, args)
}

async executeRule(ctx: RuleContext, expr: string, obj: any, args: { [key: string]: any }): Promise<any> {
const schema = ctx.target.schema
const scriptType = ctx.target.schemaType
const rule: RuleWithArgs = {
rule: expr,
scriptType: scriptType,
schema: schema
}
const ruleJson = stringify(rule)
let program = this.cache.get(ruleJson)
if (program == null) {
const parsedExpr = this.env.parse(expr)
program = this.env.plan(parsedExpr)
this.cache.set(ruleJson, program)
}
for (const [key, value] of Object.entries(args)) {
this.env.set(key, value)
}
return this.env.eval(program)
}

async close(): Promise<void> {
}
}

interface RuleWithArgs {
rule?: string
scriptType?: string
schema?: string
}
60 changes: 60 additions & 0 deletions schemaregistry/rules/cel/cel-field-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {RuleRegistry} from "../../serde/rule-registry";
import {
FieldContext,
FieldRuleExecutor,
FieldTransform,
RuleContext,
} from "../../serde/serde";
import {ClientConfig} from "../../rest-service";
import {CelExecutor} from "./cel-executor";

export class CelFieldExecutor extends FieldRuleExecutor {
executor: CelExecutor = new CelExecutor()

static register(): CelFieldExecutor {
const executor = new CelFieldExecutor()
RuleRegistry.registerRuleExecutor(executor)
return executor
}

configure(clientConfig: ClientConfig, config: Map<string, string>) {
this.config = config
}

type(): string {
return "CEL_FIELD"
}

override newTransform(ctx: RuleContext): FieldTransform {
return new CelFieldExecutorTransform(this.executor)
}

async close(): Promise<void> {
}
}

export class CelFieldExecutorTransform implements FieldTransform {
private executor: CelExecutor

constructor(executor: CelExecutor) {
this.executor = executor
}

async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
if (fieldValue == null) {
return null
}
if (!fieldCtx.isPrimitive()) {
return fieldValue
}
const args = {
value: fieldValue,
fullName: fieldCtx.fullName,
name: fieldCtx.name,
typeName: fieldCtx.typeName(),
tags: Array.from(fieldCtx.tags),
message: fieldCtx.containingMessage
}
return await this.executor.execute(ctx, fieldValue, args)
}
}
393 changes: 392 additions & 1 deletion schemaregistry/test/serde/avro.spec.ts
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import {
AvroSerializer,
AvroSerializerConfig
} from "../../serde/avro";
import {HeaderSchemaIdSerializer, SerdeType, Serializer} from "../../serde/serde";
import {HeaderSchemaIdSerializer, SerdeType, SerializationError, Serializer} from "../../serde/serde";
import {
Client,
Rule,
@@ -30,6 +30,8 @@ import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry";
import {
clearKmsClients
} from "@confluentinc/schemaregistry/rules/encryption/kms-registry";
import {CelExecutor} from "../../rules/cel/cel-executor";
import {CelFieldExecutor} from "../../rules/cel/cel-field-executor";

const rootSchema = `
{
@@ -317,6 +319,8 @@ class FakeClock extends Clock {
}

const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock())
CelExecutor.register()
CelFieldExecutor.register()
JsonataExecutor.register()
AwsKmsDriver.register()
AzureKmsDriver.register()
@@ -656,6 +660,393 @@ describe('AvroSerializer', () => {
expect(obj2.fieldToDelete).toEqual(undefined);
expect(obj2.newOptionalField).toEqual("optional");
})
it('cel condition', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'CONDITION',
mode: RuleMode.WRITE,
type: 'CEL',
expr: "message.stringField == 'hi'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.intField).toEqual(obj.intField);
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
expect(obj2.stringField).toEqual(obj.stringField);
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('cel condition fail', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'CONDITION',
mode: RuleMode.WRITE,
type: 'CEL',
expr: "message.stringField != 'hi'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
try {
await ser.serialize(topic, obj)
expect(true).toBe(false)
} catch (err) {
expect(err).toBeInstanceOf(SerializationError)
}
})
it('cel condition ignore fail', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'CONDITION',
mode: RuleMode.WRITE,
type: 'CEL',
expr: "message.stringField != 'hi'",
onFailure: 'NONE'
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.intField).toEqual(obj.intField);
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
expect(obj2.stringField).toEqual(obj.stringField);
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('cel field transform', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'TRANSFORM',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "name == 'stringField' ; value + '-suffix'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.intField).toEqual(obj.intField);
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
expect(obj2.stringField).toEqual('hi-suffix');
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('cel field complex transform', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'TRANSFORM',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "typeName == 'STRING' ; value + '-suffix'",
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: complexSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
arrayField: [ 'hello' ],
mapField: { 'key': 'world' },
unionField: 'bye',
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.arrayField).toEqual([ 'hello-suffix' ]);
expect(obj2.mapField).toEqual({ 'key': 'world-suffix' });
expect(obj2.unionField).toEqual('bye-suffix');
})
it('cel field complex transform with null', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'TRANSFORM',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "typeName == 'STRING' ; value + '-suffix'",
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: complexSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
arrayField: [ 'hello' ],
mapField: { 'key': 'world' },
unionField: null,
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.arrayField).toEqual([ 'hello-suffix' ]);
expect(obj2.mapField).toEqual({ 'key': 'world-suffix' });
expect(obj2.unionField).toEqual(null);
})
it('cel field condition', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'CONDITION',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "name == 'stringField' ; value == 'hi'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.intField).toEqual(obj.intField);
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
expect(obj2.stringField).toEqual(obj.stringField);
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('cel field condition fail', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)

let encRule: Rule = {
name: 'test-cel',
kind: 'CONDITION',
mode: RuleMode.WRITE,
type: 'CEL_FIELD',
expr: "name == 'stringField' ; value != 'hi'"
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: demoSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
intField: 123,
doubleField: 45.67,
stringField: 'hi',
boolField: true,
bytesField: Buffer.from([1, 2]),
}
try {
await ser.serialize(topic, obj)
expect(true).toBe(false)
} catch (err) {
expect(err).toBeInstanceOf(SerializationError)
}
})
it('basic encryption', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],