Skip to content

Add concurrent and sequential performance tests with and without CSFLE #154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions schemaregistry-examples/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BasicAuthCredentials } from '@confluentinc/schemaregistry';
import { BasicAuthCredentials, BearerAuthCredentials } from '@confluentinc/schemaregistry';

const issuerEndpointUrl = '<your-issuer-endpoint-url>'; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token';
const oauthClientId = '<your-client-id>';
Expand All @@ -22,7 +22,17 @@ const basicAuthCredentials: BasicAuthCredentials = {
userInfo: '<client-id>:<client-secret>',
};

const bearerAuthCredentials: BearerAuthCredentials = {
credentialsSource: 'OAUTHBEARER',
issuerEndpointUrl: issuerEndpointUrl,
clientId: oauthClientId,
clientSecret: oauthClientSecret,
scope: scope,
identityPoolId: identityPoolId,
logicalCluster: schemaRegistryLogicalCluster
};

export {
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster,
baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials
baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials, bearerAuthCredentials
};
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import {
AvroSerializer, AvroDeserializer, AvroSerializerConfig, SerdeType, Serializer, Deserializer,
JsonSerializer, JsonDeserializer, JsonSerializerConfig,
ClientConfig, SchemaRegistryClient, SchemaInfo
ClientConfig, SchemaRegistryClient, SchemaInfo, Rule, RuleMode, RuleSet, FieldEncryptionExecutor, AwsKmsDriver
} from "@confluentinc/schemaregistry";
import { localAuthCredentials } from "../constants";
import { bearerAuthCredentials } from "../constants";
import { v4 } from "uuid";
import { beforeEach, describe, it } from '@jest/globals';

FieldEncryptionExecutor.register();
AwsKmsDriver.register();

const clientConfig: ClientConfig = {
baseURLs: ['http://localhost:8081'],
baseURLs: ["your-base-url"],
isForward: false,
cacheCapacity: 512,
cacheLatestTtlSecs: 60,
basicAuthCredentials: localAuthCredentials,
bearerAuthCredentials: bearerAuthCredentials,
};

const avroSchemaString: string = JSON.stringify({
Expand All @@ -24,6 +28,19 @@ const avroSchemaString: string = JSON.stringify({
],
});

const avroSchemaStringWithTags: string = JSON.stringify({
type: 'record',
name: 'User',
fields: [
{ name: 'name', type: 'string' },
{ name: 'age', type: 'int' },
{
name: 'address', type: 'string',
"confluent:tags": ["PII"]
}
],
});

const jsonSchemaString: string = JSON.stringify({
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
Expand All @@ -36,12 +53,49 @@ const jsonSchemaString: string = JSON.stringify({
"type": "integer"
},
"address": {
"type": "string",
}
},
"required": ["name", "age", "address"]
});

const jsonSchemaStringWithTags: string = JSON.stringify({
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"name": {
"type": "string"
},
"age": {
"type": "integer"
},
"address": {
"type": "string",
"confluent:tags": [ "PII" ]
}
},
"required": ["name", "age", "address"]
});

let encRule: Rule = {
name: 'EncryptionDemo',
kind: 'TRANSFORM',
mode: RuleMode.WRITEREAD,
type: 'ENCRYPT',
tags: ['PII'],
params: {
'encrypt.kek.name': 'schemaregistrydemo',
'encrypt.kms.type': 'aws-kms',
'encrypt.kms.key.id': 'your-kms-key',
},
onFailure: 'ERROR,NONE'
};

let ruleSet: RuleSet = {
domainRules: [encRule]
};

const avroSchemaInfo: SchemaInfo = {
schema: avroSchemaString,
schemaType: 'AVRO'
Expand All @@ -52,7 +106,19 @@ const jsonSchemaInfo: SchemaInfo = {
schemaType: 'JSON'
};

const data: { name: string; age: number; address: string; }[] = [];
const avroSchemaInfoWithRules: SchemaInfo = {
schema: avroSchemaStringWithTags,
schemaType: 'AVRO',
ruleSet: ruleSet
};

const jsonSchemaInfoWithRules: SchemaInfo = {
schema: jsonSchemaStringWithTags,
schemaType: 'JSON',
ruleSet: ruleSet
};

let data: { name: string; age: number; address: string; }[];

let schemaRegistryClient: SchemaRegistryClient;

Expand All @@ -66,21 +132,23 @@ function generateData(numRecords: number) {
}
}

generateData(10000);
const numRecords = 1000;

async function serializeAndDeserializeSchemas(serializer: Serializer, deserializer: Deserializer, topic: string) {
Promise.all(
await Promise.all(
data.map(async (record) => {
const serialized = await serializer.serialize(topic, record);
await deserializer.deserialize(topic, serialized);
})
);
}

describe('Serialization Performance Test', () => {
describe('Concurrent Serialization Performance Test', () => {

beforeEach(async () => {
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
data = [];
generateData(numRecords);
});

it("Should measure serialization and deserialization performance for JSON", async () => {
Expand All @@ -91,26 +159,32 @@ describe('Serialization Performance Test', () => {
const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig);
const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await jsonSerializer.serialize(topic, data[0]);
await jsonDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic);
const end = performance.now();

console.log(`JSON serialization and deserialization took ${end - start} ms`);
console.log(`Concurrent JSON serialization and deserialization took ${end - start} ms`);
});

it("Should measure serialization and deserialization performance for Avro", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", avroSchemaInfo);

const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };
const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
const deserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await avroSerializer.serialize(topic, data[0]);
await avroDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
await serializeAndDeserializeSchemas(serializer, deserializer, topic);
await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic);
const end = performance.now();

console.log(`Avro serialization and deserialization took ${end - start} ms`);
console.log(`Concurrent Avro serialization and deserialization took ${end - start} ms`);
});

// it("Should measure serialization and deserialization performance for Protobuf", async () => {
Expand All @@ -125,3 +199,148 @@ describe('Serialization Performance Test', () => {
// console.log(`Protobuf serialization and deserialization took ${end - start} ms`);
// });
});

describe('Concurrent Serialization Performance Test with Rules', () => {
beforeEach(async () => {
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
data = [];
generateData(numRecords);
});

it("Should measure serialization and deserialization performance for JSON with rules", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules);

const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true };
const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig);
const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await jsonSerializer.serialize(topic, data[0]);
await jsonDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic);
const end = performance.now();

console.log(`Concurrent JSON serialization and deserialization with rules took ${end - start} ms`);
});

it("Should measure serialization and deserialization performance for Avro with rules", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules);

const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };
const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await avroSerializer.serialize(topic, data[0]);
await avroDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic);
const end = performance.now();

console.log(`Concurrent Avro serialization and deserialization with rules took ${end - start} ms`);
});
});

describe("Sequential Serialization Performance Test", () => {
beforeEach(async () => {
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
data = [];
generateData(numRecords);
});

it("Should measure serialization and deserialization performance for JSON", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", jsonSchemaInfo);

const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true };
const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig);
const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await jsonSerializer.serialize(topic, data[0]);
await jsonDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
for (let i = 0; i < numRecords; i++) {
const serialized = await jsonSerializer.serialize(topic, data[i]);
await jsonDeserializer.deserialize(topic, serialized);
}
const end = performance.now();

console.log(`Sequential JSON serialization and deserialization took ${end - start} ms`);
});

it("Should measure serialization and deserialization performance for Avro", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", avroSchemaInfo);

const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };
const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await avroSerializer.serialize(topic, data[0]);
await avroDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
for (let i = 0; i < numRecords; i++) {
const serialized = await avroSerializer.serialize(topic, data[i]);
await avroDeserializer.deserialize(topic, serialized);
}
const end = performance.now();

console.log(`Sequential Avro serialization and deserialization took ${end - start} ms`);
});
});

describe("Sequential Serialization Performance Test with Rules", () => {
beforeEach(async () => {
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
data = [];
generateData(numRecords);
});

it("Should measure serialization and deserialization performance for JSON with rules", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules);

const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true };
const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig);
const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await jsonSerializer.serialize(topic, data[0]);
await jsonDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
for (let i = 0; i < numRecords; i++) {
const serialized = await jsonSerializer.serialize(topic, data[i]);
await jsonDeserializer.deserialize(topic, serialized);
}
const end = performance.now();

console.log(`Sequential JSON serialization and deserialization with rules took ${end - start} ms`);
});

it("Should measure serialization and deserialization performance for Avro with rules", async () => {
const topic = v4();
await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules);

const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };
const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});

const firstSerialized = await avroSerializer.serialize(topic, data[0]);
await avroDeserializer.deserialize(topic, firstSerialized);

const start = performance.now();
for (let i = 0; i < numRecords; i++) {
const serialized = await avroSerializer.serialize(topic, data[i]);
await avroDeserializer.deserialize(topic, serialized);
}
const end = performance.now();

console.log(`Sequential Avro serialization and deserialization with rules took ${end - start} ms`);
});

});