diff --git a/schemaregistry/rules/encryption/encrypt-executor.ts b/schemaregistry/rules/encryption/encrypt-executor.ts index da96734d..c0973229 100644 --- a/schemaregistry/rules/encryption/encrypt-executor.ts +++ b/schemaregistry/rules/encryption/encrypt-executor.ts @@ -6,6 +6,7 @@ import { MAGIC_BYTE_V0, RuleContext, RuleError, + RuleExecutor, } from "../../serde/serde"; import {RuleMode,} from "../../schemaregistry-client"; import {DekClient, Dek, DekRegistryClient, Kek} from "./dekregistry/dekregistry-client"; @@ -61,29 +62,29 @@ export class Clock { } } -export class FieldEncryptionExecutor extends FieldRuleExecutor { +export class EncryptionExecutor implements RuleExecutor { + config: Map | null = null client: DekClient | null = null clock: Clock /** * Register the field encryption executor with the rule registry. */ - static register(): FieldEncryptionExecutor { + static register(): EncryptionExecutor { return this.registerWithClock(new Clock()) } - static registerWithClock(clock: Clock): FieldEncryptionExecutor { - const executor = new FieldEncryptionExecutor(clock) + static registerWithClock(clock: Clock): EncryptionExecutor { + const executor = new EncryptionExecutor(clock) RuleRegistry.registerRuleExecutor(executor) return executor } constructor(clock: Clock = new Clock()) { - super() this.clock = clock } - override configure(clientConfig: ClientConfig, config: Map) { + configure(clientConfig: ClientConfig, config: Map) { if (this.client != null) { if (!deepEqual(this.client.config(), clientConfig)) { throw new RuleError('executor already configured') @@ -110,20 +111,23 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor { } } - override type(): string { - return 'ENCRYPT' + type(): string { + return 'ENCRYPT_PAYLOAD' } - override newTransform(ctx: RuleContext): FieldTransform { + async transform(ctx: RuleContext, msg: any): Promise { + const transform = this.newTransform(ctx) + return await transform.transform(ctx, FieldType.BYTES, msg) + } + + newTransform(ctx: RuleContext): EncryptionExecutorTransform { const cryptor = this.getCryptor(ctx) const kekName = this.getKekName(ctx) const dekExpiryDays = this.getDekExpiryDays(ctx) - const transform = - new FieldEncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays) - return transform + return new EncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays) } - override async close(): Promise { + async close(): Promise { if (this.client != null) { await this.client.close() } @@ -135,8 +139,7 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor { if (dekAlgorithmStr != null) { dekAlgorithm = DekFormat[dekAlgorithmStr as keyof typeof DekFormat] } - const cryptor = new Cryptor(dekAlgorithm) - return cryptor + return new Cryptor(dekAlgorithm) } private getKekName(ctx: RuleContext): string { @@ -269,15 +272,15 @@ export class Cryptor { } } -export class FieldEncryptionExecutorTransform implements FieldTransform { - private executor: FieldEncryptionExecutor +export class EncryptionExecutorTransform { + private executor: EncryptionExecutor private cryptor: Cryptor private kekName: string private kek: Kek | null = null private dekExpiryDays: number constructor( - executor: FieldEncryptionExecutor, + executor: EncryptionExecutor, cryptor: Cryptor, kekName: string, dekExpiryDays: number, @@ -481,15 +484,15 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { (now - dek.ts!) / MILLIS_IN_DAY >= this.dekExpiryDays } - async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise { + async transform(ctx: RuleContext, fieldType: FieldType, fieldValue: any): Promise { if (fieldValue == null) { return null } switch (ctx.ruleMode) { case RuleMode.WRITE: { - let plaintext = this.toBytes(fieldCtx.type, fieldValue) + let plaintext = this.toBytes(fieldType, fieldValue) if (plaintext == null) { - throw new RuleError(`type ${fieldCtx.type} not supported for encryption`) + throw new RuleError(`type ${fieldType} not supported for encryption`) } let version: number | null = null if (this.isDekRotated()) { @@ -501,18 +504,18 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { if (this.isDekRotated()) { ciphertext = this.prefixVersion(dek.version!, ciphertext) } - if (fieldCtx.type === FieldType.STRING) { + if (fieldType === FieldType.STRING) { return ciphertext.toString('base64') } else { - return this.toObject(fieldCtx.type, ciphertext) + return this.toObject(fieldType, ciphertext) } } case RuleMode.READ: { let ciphertext - if (fieldCtx.type === FieldType.STRING) { + if (fieldType === FieldType.STRING) { ciphertext = Buffer.from(fieldValue, 'base64') } else { - ciphertext = this.toBytes(fieldCtx.type, fieldValue) + ciphertext = this.toBytes(fieldType, fieldValue) } if (ciphertext == null) { return fieldValue @@ -528,7 +531,7 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { let dek = await this.getOrCreateDek(ctx, version) let keyMaterialBytes = await this.executor.client!.getDekKeyMaterialBytes(dek) let plaintext = await this.cryptor.decrypt(keyMaterialBytes!, ciphertext) - return this.toObject(fieldCtx.type, plaintext) + return this.toObject(fieldType, plaintext) } default: throw new RuleError(`unsupported rule mode ${ctx.ruleMode}`) @@ -586,3 +589,55 @@ function getKmsClient(config: Map, kek: Kek): KmsClient { } return kmsClient } + +export class FieldEncryptionExecutor extends FieldRuleExecutor { + executor: EncryptionExecutor + + /** + * Register the field encryption executor with the rule registry. + */ + static register(): FieldEncryptionExecutor { + return this.registerWithClock(new Clock()) + } + + static registerWithClock(clock: Clock): FieldEncryptionExecutor { + const executor = new FieldEncryptionExecutor(clock) + RuleRegistry.registerRuleExecutor(executor) + return executor + } + + constructor(clock: Clock = new Clock()) { + super() + this.executor = new EncryptionExecutor(clock) + } + + override configure(clientConfig: ClientConfig, config: Map) { + this.executor.configure(clientConfig, config) + } + + override type(): string { + return 'ENCRYPT' + } + + override newTransform(ctx: RuleContext): FieldTransform { + const executorTransform = this.executor.newTransform(ctx) + return new FieldEncryptionExecutorTransform(executorTransform) + } + + override async close(): Promise { + return this.executor.close() + } +} + +export class FieldEncryptionExecutorTransform implements FieldTransform { + private executorTransform: EncryptionExecutorTransform + + constructor(executorTransform: EncryptionExecutorTransform) { + this.executorTransform = executorTransform + } + + async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise { + return await this.executorTransform.transform(ctx, fieldCtx.type, fieldValue) + } +} + diff --git a/schemaregistry/schemaregistry-client.ts b/schemaregistry/schemaregistry-client.ts index aeeccf9e..e752fd0d 100644 --- a/schemaregistry/schemaregistry-client.ts +++ b/schemaregistry/schemaregistry-client.ts @@ -46,6 +46,12 @@ export interface Rule { disabled?: boolean } +export enum RulePhase { + MIGRATION = 'MIGRATION', + DOMAIN = 'DOMAIN', + ENCODING = 'ENCODING', +} + export enum RuleMode { UPGRADE = 'UPGRADE', DOWNGRADE = 'DOWNGRADE', @@ -111,6 +117,7 @@ export interface Metadata { export interface RuleSet { migrationRules?: Rule[]; domainRules?: Rule[]; + encodingRules?: Rule[]; } /** diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index 3d47a9d5..984d8bfb 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -7,7 +7,7 @@ import { Serializer, SerializerConfig } from "./serde"; import { - Client, RuleMode, + Client, RuleMode, RulePhase, SchemaInfo } from "../schemaregistry-client"; import avro, {ForSchemaOptions, Type, types} from "avsc"; @@ -93,7 +93,9 @@ export class AvroSerializer extends Serializer implements AvroSerde { const subject = this.subjectName(topic, info) msg = await this.executeRules( subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps)) - const msgBytes = avroType.toBuffer(msg) + let msgBytes = avroType.toBuffer(msg) + msgBytes = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } @@ -186,6 +188,8 @@ export class AvroDeserializer extends Deserializer implements AvroSerde { const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers) payload = payload.subarray(bytesRead) const subject = this.subjectName(topic, info) + payload = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null) const readerMeta = await this.getReaderSchema(subject) let migrations: Migration[] = [] if (readerMeta != null) { diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index 860a70f3..93736f80 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -7,7 +7,7 @@ import { Serializer, SerializerConfig } from "./serde"; import { - Client, RuleMode, + Client, RuleMode, RulePhase, SchemaInfo } from "../schemaregistry-client"; import Ajv, {ErrorObject} from "ajv"; @@ -109,13 +109,15 @@ export class JsonSerializer extends Serializer implements JsonSerde { const [schemaId, info] = await this.getSchemaId(JSON_TYPE, topic, msg, schema) const subject = this.subjectName(topic, info) msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null) - const msgBytes = Buffer.from(JSON.stringify(msg)) if ((this.conf as JsonSerdeConfig).validate) { const validate = await this.toValidateFunction(info) if (validate != null && !validate(msg)) { throw new SerializationError('Invalid message') } } + let msgBytes = Buffer.from(JSON.stringify(msg)) + msgBytes = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } @@ -198,6 +200,8 @@ export class JsonDeserializer extends Deserializer implements JsonSerde { const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers) payload = payload.subarray(bytesRead) const subject = this.subjectName(topic, info) + payload = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null) const readerMeta = await this.getReaderSchema(subject) let migrations: Migration[] = [] if (readerMeta != null) { diff --git a/schemaregistry/serde/protobuf.ts b/schemaregistry/serde/protobuf.ts index 57977e42..401d2601 100644 --- a/schemaregistry/serde/protobuf.ts +++ b/schemaregistry/serde/protobuf.ts @@ -9,7 +9,7 @@ import { SerializerConfig } from "./serde"; import { - Client, Reference, RuleMode, + Client, Reference, RuleMode, RulePhase, SchemaInfo, SchemaMetadata } from "../schemaregistry-client"; @@ -165,7 +165,9 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde { const subject = this.subjectName(topic, info) msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null) schemaId.messageIndexes = this.toMessageIndexArray(messageDesc) - const msgBytes = Buffer.from(toBinary(messageDesc, msg)) + let msgBytes = Buffer.from(toBinary(messageDesc, msg)) + msgBytes = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null) return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } @@ -381,6 +383,8 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde const messageDesc = this.toMessageDescFromIndexes(fd, schemaId.messageIndexes!) const subject = this.subjectName(topic, info) + payload = await this.executeRulesWithPhase( + subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null) const readerMeta = await this.getReaderSchema(subject, 'serialized') const msgBytes = payload diff --git a/schemaregistry/serde/serde.ts b/schemaregistry/serde/serde.ts index 19fefe65..fcc851c9 100644 --- a/schemaregistry/serde/serde.ts +++ b/schemaregistry/serde/serde.ts @@ -2,7 +2,7 @@ import {match} from './wildcard-matcher'; import { Client, Rule, - RuleMode, + RuleMode, RulePhase, RuleSet, SchemaInfo, SchemaMetadata @@ -220,8 +220,15 @@ export abstract class Serde { } async executeRules(subject: string, topic: string, ruleMode: RuleMode, - source: SchemaInfo | null, target: SchemaInfo | null, msg: any, - inlineTags: Map> | null): Promise { + source: SchemaInfo | null, target: SchemaInfo | null, msg: any, + inlineTags: Map> | null): Promise { + return await this.executeRulesWithPhase( + subject, topic, RulePhase.DOMAIN, ruleMode, source, target, msg, inlineTags) + } + + async executeRulesWithPhase(subject: string, topic: string, rulePhase: RulePhase, ruleMode: RuleMode, + source: SchemaInfo | null, target: SchemaInfo | null, msg: any, + inlineTags: Map> | null): Promise { if (msg == null || target == null) { return msg } @@ -234,7 +241,11 @@ export abstract class Serde { rules = source?.ruleSet?.migrationRules?.map(x => x).reverse() break default: - rules = target.ruleSet?.domainRules + if (rulePhase === RulePhase.ENCODING) { + rules = target.ruleSet?.encodingRules + } else { + rules = target.ruleSet?.domainRules + } if (ruleMode === RuleMode.READ) { // Execute read rules in reverse order for symmetry rules = rules?.map(x => x).reverse() @@ -509,21 +520,35 @@ export abstract class Deserializer extends Serde { return null } - hasRules(ruleSet: RuleSet, mode: RuleMode): boolean { + hasRules(ruleSet: RuleSet, phase: RulePhase, mode: RuleMode): boolean { + if (ruleSet == null) { + return false + } + let rules: Rule[] | undefined + switch (phase) { + case RulePhase.MIGRATION: + rules = ruleSet.migrationRules + break + case RulePhase.DOMAIN: + rules = ruleSet.domainRules + break + case RulePhase.ENCODING: + rules = ruleSet.encodingRules + } switch (mode) { case RuleMode.UPGRADE: case RuleMode.DOWNGRADE: - return this.checkRules(ruleSet?.migrationRules, (ruleMode: RuleMode): boolean => + return this.checkRules(rules, (ruleMode: RuleMode): boolean => ruleMode === mode || ruleMode === RuleMode.UPDOWN) case RuleMode.UPDOWN: - return this.checkRules(ruleSet?.migrationRules, (ruleMode: RuleMode): boolean => + return this.checkRules(rules, (ruleMode: RuleMode): boolean => ruleMode === mode) case RuleMode.WRITE: case RuleMode.READ: - return this.checkRules(ruleSet?.domainRules, (ruleMode: RuleMode): boolean => + return this.checkRules(rules, (ruleMode: RuleMode): boolean => ruleMode === mode || ruleMode === RuleMode.WRITEREAD) case RuleMode.WRITEREAD: - return this.checkRules(ruleSet?.domainRules, (ruleMode: RuleMode): boolean => + return this.checkRules(rules, (ruleMode: RuleMode): boolean => ruleMode === mode) } } @@ -576,7 +601,7 @@ export abstract class Deserializer extends Serde { previous = version continue } - if (version.ruleSet != null && this.hasRules(version.ruleSet, migrationMode)) { + if (version.ruleSet != null && this.hasRules(version.ruleSet, RulePhase.MIGRATION, migrationMode)) { let m: Migration if (migrationMode === RuleMode.UPGRADE) { m = { @@ -620,7 +645,9 @@ export abstract class Deserializer extends Serde { async executeMigrations(migrations: Migration[], subject: string, topic: string, msg: any): Promise { for (let migration of migrations) { // TODO fix source, target? - msg = await this.executeRules(subject, topic, migration.ruleMode, migration.source, migration.target, msg, null) + msg = await this.executeRulesWithPhase( + subject, topic, RulePhase.MIGRATION, migration.ruleMode, + migration.source, migration.target, msg, null) } return msg } diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index dd26eb7b..a07d8e99 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -17,7 +17,7 @@ import { } from "../../schemaregistry-client"; import {LocalKmsDriver} from "../../rules/encryption/localkms/local-driver"; import { - Clock, + Clock, EncryptionExecutor, FieldEncryptionExecutor } from "../../rules/encryption/encrypt-executor"; import {GcpKmsDriver} from "../../rules/encryption/gcpkms/gcp-driver"; @@ -318,6 +318,7 @@ class FakeClock extends Clock { } } +const encryptionExecutor = EncryptionExecutor.registerWithClock(new FakeClock()) const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock()) CelExecutor.register() CelFieldExecutor.register() @@ -480,7 +481,7 @@ describe('AvroSerializer', () => { } }; const ser = new AvroSerializer(client, SerdeType.VALUE, serConfig); - const dekClient = fieldEncryptionExecutor.client!; + const dekClient = fieldEncryptionExecutor.executor.client!; const encRule: Rule = { name: 'test-encrypt', @@ -518,7 +519,7 @@ describe('AvroSerializer', () => { } }; const deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig); - fieldEncryptionExecutor.client = dekClient; + fieldEncryptionExecutor.executor.client = dekClient; const obj2 = await deser.deserialize(topic, bytes); expect(obj2.color).toEqual(obj.color); }) @@ -1060,7 +1061,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1106,7 +1107,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.intField).toEqual(obj.intField); expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); @@ -1130,6 +1131,68 @@ describe('AvroSerializer', () => { expect(obj2.stringField).not.toEqual("hi"); expect(obj2.bytesField).not.toEqual(Buffer.from([1, 2])); }) + it('payload encryption', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = encryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT_PAYLOAD', + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + encodingRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + encryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) it('deterministic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -1143,7 +1206,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1190,7 +1253,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.intField).toEqual(obj.intField); expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); @@ -1227,7 +1290,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1273,7 +1336,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.intField).toEqual(obj.intField); expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); @@ -1283,7 +1346,7 @@ describe('AvroSerializer', () => { }) it('basic encryption with dek rotation', async () => { const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock()); - (fieldEncryptionExecutor.clock as FakeClock).fixedNow = Date.now() + (fieldEncryptionExecutor.executor.clock as FakeClock).fixedNow = Date.now() let conf: ClientConfig = { baseURLs: [baseURL], cacheCapacity: 1000 @@ -1296,7 +1359,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1342,7 +1405,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.intField).toEqual(obj.intField); expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); @@ -1354,7 +1417,7 @@ describe('AvroSerializer', () => { expect(1).toEqual(dek.version); // advance time by 2 days - (fieldEncryptionExecutor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 + (fieldEncryptionExecutor.executor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 bytes = await ser.serialize(topic, obj) @@ -1372,7 +1435,7 @@ describe('AvroSerializer', () => { expect(2).toEqual(dek.version); // advance time by 2 days - (fieldEncryptionExecutor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 + (fieldEncryptionExecutor.executor.clock as FakeClock).fixedNow += 2 * 24 * 60 * 60 * 1000 bytes = await ser.serialize(topic, obj) @@ -1432,7 +1495,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! await dekClient.registerKek("kek1", "local-kms", "mykey", false) const encryptedDek = "07V2ndh02DA73p+dTybwZFm7DKQSZN1tEwQh+FoX1DZLk4Yj2LLu4omYjp/84tAg3BYlkfGSz+zZacJHIE4=" @@ -1486,7 +1549,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! await dekClient.registerKek("kek1", "local-kms", "mykey", false) const encryptedDek = "YSx3DTlAHrmpoDChquJMifmPntBzxgRVdMzgYL82rgWBKn7aUSnG+WIu9ozBNS3y2vXd++mBtK07w4/W/G6w0da39X9hfOVZsGnkSvry/QRht84V8yz3dqKxGMOK5A==" @@ -1540,7 +1603,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! await dekClient.registerKek("kek1", "local-kms", "mykey", false) const encryptedDek = "W/v6hOQYq1idVAcs1pPWz9UUONMVZW4IrglTnG88TsWjeCjxmtRQ4VaNe/I5dCfm2zyY9Cu0nqdvqImtUk4=" @@ -1563,7 +1626,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let info: SchemaInfo = { schemaType: 'AVRO', @@ -1624,7 +1687,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.otherField.intField).toEqual(nested.intField); expect(obj2.otherField.doubleField).toBeCloseTo(nested.doubleField, 0.001); @@ -1645,7 +1708,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1691,7 +1754,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.intField).toEqual(obj.intField); expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); @@ -1712,7 +1775,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1752,7 +1815,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.arrayField).toEqual([ 'hello' ]); expect(obj2.mapField).toEqual({ 'key': 'world' }); @@ -1771,7 +1834,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1811,7 +1874,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.arrayField).toEqual([ 'hello' ]); expect(obj2.mapField).toEqual({ 'key': 'world' }); @@ -1830,7 +1893,7 @@ describe('AvroSerializer', () => { } } let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -1870,7 +1933,7 @@ describe('AvroSerializer', () => { } } let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.emails[0].email).toEqual('john@acme.com'); }) diff --git a/schemaregistry/test/serde/json.spec.ts b/schemaregistry/test/serde/json.spec.ts index 45624944..81623752 100644 --- a/schemaregistry/test/serde/json.spec.ts +++ b/schemaregistry/test/serde/json.spec.ts @@ -15,7 +15,7 @@ import { SchemaRegistryClient } from "../../schemaregistry-client"; import {LocalKmsDriver} from "../../rules/encryption/localkms/local-driver"; -import {FieldEncryptionExecutor} from "../../rules/encryption/encrypt-executor"; +import {EncryptionExecutor, FieldEncryptionExecutor} from "../../rules/encryption/encrypt-executor"; import { JsonDeserializer, JsonDeserializerConfig, JsonSerializer, @@ -26,6 +26,7 @@ import stringify from "json-stringify-deterministic"; import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor"; import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; +const encryptionExecutor = EncryptionExecutor.register() const fieldEncryptionExecutor = FieldEncryptionExecutor.register() JsonataExecutor.register() LocalKmsDriver.register() @@ -436,7 +437,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -482,7 +483,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) @@ -493,6 +494,64 @@ describe('JsonSerializer', () => { obj2 = await deser.deserialize(topic, bytes) expect(obj2).not.toEqual(obj); }) + it('payload encryption', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = encryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT_PAYLOAD', + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + encodingRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.executor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) it('basic encryption 2020-12', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -506,7 +565,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -552,7 +611,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) @@ -576,7 +635,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -623,7 +682,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) @@ -647,7 +706,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -693,7 +752,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) @@ -710,7 +769,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let info: SchemaInfo = { schemaType: 'JSON', @@ -769,7 +828,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) @@ -786,7 +845,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -826,7 +885,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.arrayField).toEqual([ 'hello' ]); expect(obj2.objectField.stringField).toEqual('world'); @@ -845,7 +904,7 @@ describe('JsonSerializer', () => { } } let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -885,7 +944,7 @@ describe('JsonSerializer', () => { } } let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2.arrayField).toEqual([ 'hello' ]); expect(obj2.objectField.stringField).toEqual('world'); diff --git a/schemaregistry/test/serde/protobuf.spec.ts b/schemaregistry/test/serde/protobuf.spec.ts index 74806ee0..9aa19bb2 100644 --- a/schemaregistry/test/serde/protobuf.spec.ts +++ b/schemaregistry/test/serde/protobuf.spec.ts @@ -13,7 +13,7 @@ import { SchemaRegistryClient } from "../../schemaregistry-client"; import {LocalKmsDriver} from "../../rules/encryption/localkms/local-driver"; -import {FieldEncryptionExecutor} from "../../rules/encryption/encrypt-executor"; +import {EncryptionExecutor, FieldEncryptionExecutor} from "../../rules/encryption/encrypt-executor"; import {AuthorSchema, file_test_schemaregistry_serde_example, PizzaSchema} from "./test/example_pb"; import {create, toBinary} from "@bufbuild/protobuf"; import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt"; @@ -26,6 +26,7 @@ import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; import {LinkedListSchema} from "./test/cycle_pb"; import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; +const encryptionExecutor = EncryptionExecutor.register() const fieldEncryptionExecutor = FieldEncryptionExecutor.register() LocalKmsDriver.register() @@ -199,7 +200,7 @@ describe('ProtobufSerializer', () => { } let ser = new ProtobufSerializer(client, SerdeType.VALUE, serConfig) ser.registry.add(AuthorSchema) - let dekClient = fieldEncryptionExecutor.client! + let dekClient = fieldEncryptionExecutor.executor.client! let encRule: Rule = { name: 'test-encrypt', @@ -249,7 +250,7 @@ describe('ProtobufSerializer', () => { } } let deser = new ProtobufDeserializer(client, SerdeType.VALUE, deserConfig) - fieldEncryptionExecutor.client = dekClient + fieldEncryptionExecutor.executor.client = dekClient let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) @@ -260,4 +261,66 @@ describe('ProtobufSerializer', () => { obj2 = await deser.deserialize(topic, bytes) expect(obj2).not.toEqual(obj); }) + it('payload encryption', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: ProtobufSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret' + } + } + let ser = new ProtobufSerializer(client, SerdeType.VALUE, serConfig) + ser.registry.add(AuthorSchema) + let dekClient = encryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT_PAYLOAD', + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + encodingRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'PROTOBUF', + schema: Buffer.from(toBinary(FileDescriptorProtoSchema, file_test_schemaregistry_serde_example.proto)).toString('base64'), + ruleSet + } + + await client.register(subject, info, false) + + let obj = create(AuthorSchema, { + name: 'Kafka', + id: 123, + picture: Buffer.from([1, 2]), + works: ['The Castle', 'The Trial'], + piiOneof: { + case: 'oneofString', + value: 'oneof' + } + }) + let bytes = await ser.serialize(topic, obj) + + let deserConfig: ProtobufDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new ProtobufDeserializer(client, SerdeType.VALUE, deserConfig) + fieldEncryptionExecutor.executor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2).toEqual(obj) + }) })