Skip to content

DGS-21268 Add support for full payload encryption #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 81 additions & 26 deletions schemaregistry/rules/encryption/encrypt-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
MAGIC_BYTE_V0,
RuleContext,
RuleError,
RuleExecutor,
} from "../../serde/serde";
import {RuleMode,} from "../../schemaregistry-client";
import {DekClient, Dek, DekRegistryClient, Kek} from "./dekregistry/dekregistry-client";
Expand Down Expand Up @@ -61,29 +62,29 @@ export class Clock {
}
}

export class FieldEncryptionExecutor extends FieldRuleExecutor {
export class EncryptionExecutor implements RuleExecutor {
config: Map<string, string> | null = null
client: DekClient | null = null
clock: Clock

/**
* Register the field encryption executor with the rule registry.
*/
static register(): FieldEncryptionExecutor {
static register(): EncryptionExecutor {
return this.registerWithClock(new Clock())
}

static registerWithClock(clock: Clock): FieldEncryptionExecutor {
const executor = new FieldEncryptionExecutor(clock)
static registerWithClock(clock: Clock): EncryptionExecutor {
const executor = new EncryptionExecutor(clock)
RuleRegistry.registerRuleExecutor(executor)
return executor
}

constructor(clock: Clock = new Clock()) {
super()
this.clock = clock
}

override configure(clientConfig: ClientConfig, config: Map<string, string>) {
configure(clientConfig: ClientConfig, config: Map<string, string>) {
if (this.client != null) {
if (!deepEqual(this.client.config(), clientConfig)) {
throw new RuleError('executor already configured')
Expand All @@ -110,20 +111,23 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor {
}
}

override type(): string {
return 'ENCRYPT'
type(): string {
return 'ENCRYPT_PAYLOAD'
}

override newTransform(ctx: RuleContext): FieldTransform {
async transform(ctx: RuleContext, msg: any): Promise<any> {
const transform = this.newTransform(ctx)
return await transform.transform(ctx, FieldType.BYTES, msg)
}

newTransform(ctx: RuleContext): EncryptionExecutorTransform {
const cryptor = this.getCryptor(ctx)
const kekName = this.getKekName(ctx)
const dekExpiryDays = this.getDekExpiryDays(ctx)
const transform =
new FieldEncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays)
return transform
return new EncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays)
}

override async close(): Promise<void> {
async close(): Promise<void> {
if (this.client != null) {
await this.client.close()
}
Expand All @@ -135,8 +139,7 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor {
if (dekAlgorithmStr != null) {
dekAlgorithm = DekFormat[dekAlgorithmStr as keyof typeof DekFormat]
}
const cryptor = new Cryptor(dekAlgorithm)
return cryptor
return new Cryptor(dekAlgorithm)
}

private getKekName(ctx: RuleContext): string {
Expand Down Expand Up @@ -269,15 +272,15 @@ export class Cryptor {
}
}

export class FieldEncryptionExecutorTransform implements FieldTransform {
private executor: FieldEncryptionExecutor
export class EncryptionExecutorTransform {
private executor: EncryptionExecutor
private cryptor: Cryptor
private kekName: string
private kek: Kek | null = null
private dekExpiryDays: number

constructor(
executor: FieldEncryptionExecutor,
executor: EncryptionExecutor,
cryptor: Cryptor,
kekName: string,
dekExpiryDays: number,
Expand Down Expand Up @@ -481,15 +484,15 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
(now - dek.ts!) / MILLIS_IN_DAY >= this.dekExpiryDays
}

async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
async transform(ctx: RuleContext, fieldType: FieldType, fieldValue: any): Promise<any> {
if (fieldValue == null) {
return null
}
switch (ctx.ruleMode) {
case RuleMode.WRITE: {
let plaintext = this.toBytes(fieldCtx.type, fieldValue)
let plaintext = this.toBytes(fieldType, fieldValue)
if (plaintext == null) {
throw new RuleError(`type ${fieldCtx.type} not supported for encryption`)
throw new RuleError(`type ${fieldType} not supported for encryption`)
}
let version: number | null = null
if (this.isDekRotated()) {
Expand All @@ -501,18 +504,18 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
if (this.isDekRotated()) {
ciphertext = this.prefixVersion(dek.version!, ciphertext)
}
if (fieldCtx.type === FieldType.STRING) {
if (fieldType === FieldType.STRING) {
return ciphertext.toString('base64')
} else {
return this.toObject(fieldCtx.type, ciphertext)
return this.toObject(fieldType, ciphertext)
}
}
case RuleMode.READ: {
let ciphertext
if (fieldCtx.type === FieldType.STRING) {
if (fieldType === FieldType.STRING) {
ciphertext = Buffer.from(fieldValue, 'base64')
} else {
ciphertext = this.toBytes(fieldCtx.type, fieldValue)
ciphertext = this.toBytes(fieldType, fieldValue)
}
if (ciphertext == null) {
return fieldValue
Expand All @@ -528,7 +531,7 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
let dek = await this.getOrCreateDek(ctx, version)
let keyMaterialBytes = await this.executor.client!.getDekKeyMaterialBytes(dek)
let plaintext = await this.cryptor.decrypt(keyMaterialBytes!, ciphertext)
return this.toObject(fieldCtx.type, plaintext)
return this.toObject(fieldType, plaintext)
}
default:
throw new RuleError(`unsupported rule mode ${ctx.ruleMode}`)
Expand Down Expand Up @@ -586,3 +589,55 @@ function getKmsClient(config: Map<string, string>, kek: Kek): KmsClient {
}
return kmsClient
}

export class FieldEncryptionExecutor extends FieldRuleExecutor {
executor: EncryptionExecutor

/**
* Register the field encryption executor with the rule registry.
*/
static register(): FieldEncryptionExecutor {
return this.registerWithClock(new Clock())
}

static registerWithClock(clock: Clock): FieldEncryptionExecutor {
const executor = new FieldEncryptionExecutor(clock)
RuleRegistry.registerRuleExecutor(executor)
return executor
}

constructor(clock: Clock = new Clock()) {
super()
this.executor = new EncryptionExecutor(clock)
}

override configure(clientConfig: ClientConfig, config: Map<string, string>) {
this.executor.configure(clientConfig, config)
}

override type(): string {
return 'ENCRYPT'
}

override newTransform(ctx: RuleContext): FieldTransform {
const executorTransform = this.executor.newTransform(ctx)
return new FieldEncryptionExecutorTransform(executorTransform)
}

override async close(): Promise<void> {
return this.executor.close()
}
}

export class FieldEncryptionExecutorTransform implements FieldTransform {
private executorTransform: EncryptionExecutorTransform

constructor(executorTransform: EncryptionExecutorTransform) {
this.executorTransform = executorTransform
}

async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
return await this.executorTransform.transform(ctx, fieldCtx.type, fieldValue)
}
}

7 changes: 7 additions & 0 deletions schemaregistry/schemaregistry-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ export interface Rule {
disabled?: boolean
}

export enum RulePhase {
MIGRATION = 'MIGRATION',
DOMAIN = 'DOMAIN',
ENCODING = 'ENCODING',
}

export enum RuleMode {
UPGRADE = 'UPGRADE',
DOWNGRADE = 'DOWNGRADE',
Expand Down Expand Up @@ -111,6 +117,7 @@ export interface Metadata {
export interface RuleSet {
migrationRules?: Rule[];
domainRules?: Rule[];
encodingRules?: Rule[];
}

/**
Expand Down
8 changes: 6 additions & 2 deletions schemaregistry/serde/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Serializer, SerializerConfig
} from "./serde";
import {
Client, RuleMode,
Client, RuleMode, RulePhase,
SchemaInfo
} from "../schemaregistry-client";
import avro, {ForSchemaOptions, Type, types} from "avsc";
Expand Down Expand Up @@ -93,7 +93,9 @@ export class AvroSerializer extends Serializer implements AvroSerde {
const subject = this.subjectName(topic, info)
msg = await this.executeRules(
subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps))
const msgBytes = avroType.toBuffer(msg)
let msgBytes = avroType.toBuffer(msg)
msgBytes = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
}

Expand Down Expand Up @@ -186,6 +188,8 @@ export class AvroDeserializer extends Deserializer implements AvroSerde {
const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers)
payload = payload.subarray(bytesRead)
const subject = this.subjectName(topic, info)
payload = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
const readerMeta = await this.getReaderSchema(subject)
let migrations: Migration[] = []
if (readerMeta != null) {
Expand Down
8 changes: 6 additions & 2 deletions schemaregistry/serde/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Serializer, SerializerConfig
} from "./serde";
import {
Client, RuleMode,
Client, RuleMode, RulePhase,
SchemaInfo
} from "../schemaregistry-client";
import Ajv, {ErrorObject} from "ajv";
Expand Down Expand Up @@ -109,13 +109,15 @@ export class JsonSerializer extends Serializer implements JsonSerde {
const [schemaId, info] = await this.getSchemaId(JSON_TYPE, topic, msg, schema)
const subject = this.subjectName(topic, info)
msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null)
const msgBytes = Buffer.from(JSON.stringify(msg))
if ((this.conf as JsonSerdeConfig).validate) {
const validate = await this.toValidateFunction(info)
if (validate != null && !validate(msg)) {
throw new SerializationError('Invalid message')
}
}
let msgBytes = Buffer.from(JSON.stringify(msg))
msgBytes = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
}

Expand Down Expand Up @@ -198,6 +200,8 @@ export class JsonDeserializer extends Deserializer implements JsonSerde {
const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers)
payload = payload.subarray(bytesRead)
const subject = this.subjectName(topic, info)
payload = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
const readerMeta = await this.getReaderSchema(subject)
let migrations: Migration[] = []
if (readerMeta != null) {
Expand Down
8 changes: 6 additions & 2 deletions schemaregistry/serde/protobuf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SerializerConfig
} from "./serde";
import {
Client, Reference, RuleMode,
Client, Reference, RuleMode, RulePhase,
SchemaInfo,
SchemaMetadata
} from "../schemaregistry-client";
Expand Down Expand Up @@ -165,7 +165,9 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde {
const subject = this.subjectName(topic, info)
msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null)
schemaId.messageIndexes = this.toMessageIndexArray(messageDesc)
const msgBytes = Buffer.from(toBinary(messageDesc, msg))
let msgBytes = Buffer.from(toBinary(messageDesc, msg))
msgBytes = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
}

Expand Down Expand Up @@ -381,6 +383,8 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
const messageDesc = this.toMessageDescFromIndexes(fd, schemaId.messageIndexes!)

const subject = this.subjectName(topic, info)
payload = await this.executeRulesWithPhase(
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
const readerMeta = await this.getReaderSchema(subject, 'serialized')

const msgBytes = payload
Expand Down
Loading