diff --git a/schemaregistry-examples/src/constants.ts b/schemaregistry-examples/src/constants.ts index a30f5881..7e524efc 100644 --- a/schemaregistry-examples/src/constants.ts +++ b/schemaregistry-examples/src/constants.ts @@ -1,4 +1,4 @@ -import { BasicAuthCredentials } from '@confluentinc/schemaregistry'; +import { BasicAuthCredentials, BearerAuthCredentials } from '@confluentinc/schemaregistry'; const issuerEndpointUrl = ''; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token'; const oauthClientId = ''; @@ -22,7 +22,17 @@ const basicAuthCredentials: BasicAuthCredentials = { userInfo: ':', }; +const bearerAuthCredentials: BearerAuthCredentials = { + credentialsSource: 'OAUTHBEARER', + issuerEndpointUrl: issuerEndpointUrl, + clientId: oauthClientId, + clientSecret: oauthClientSecret, + scope: scope, + identityPoolId: identityPoolId, + logicalCluster: schemaRegistryLogicalCluster +}; + export { issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster, - baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials + baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials, bearerAuthCredentials }; \ No newline at end of file diff --git a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts index a83c3564..cc979c81 100644 --- a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts +++ b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts @@ -1,17 +1,21 @@ import { AvroSerializer, AvroDeserializer, AvroSerializerConfig, SerdeType, Serializer, Deserializer, JsonSerializer, JsonDeserializer, JsonSerializerConfig, - ClientConfig, SchemaRegistryClient, SchemaInfo + ClientConfig, SchemaRegistryClient, SchemaInfo, Rule, RuleMode, RuleSet, FieldEncryptionExecutor, AwsKmsDriver } from "@confluentinc/schemaregistry"; -import { localAuthCredentials } from "../constants"; +import { bearerAuthCredentials } from "../constants"; import { v4 } from "uuid"; import { beforeEach, describe, it } from '@jest/globals'; +FieldEncryptionExecutor.register(); +AwsKmsDriver.register(); + const clientConfig: ClientConfig = { - baseURLs: ['http://localhost:8081'], + baseURLs: ["your-base-url"], + isForward: false, cacheCapacity: 512, cacheLatestTtlSecs: 60, - basicAuthCredentials: localAuthCredentials, + bearerAuthCredentials: bearerAuthCredentials, }; const avroSchemaString: string = JSON.stringify({ @@ -24,6 +28,19 @@ const avroSchemaString: string = JSON.stringify({ ], }); +const avroSchemaStringWithTags: string = JSON.stringify({ + type: 'record', + name: 'User', + fields: [ + { name: 'name', type: 'string' }, + { name: 'age', type: 'int' }, + { + name: 'address', type: 'string', + "confluent:tags": ["PII"] + } + ], +}); + const jsonSchemaString: string = JSON.stringify({ "$schema": "http://json-schema.org/draft-07/schema#", "title": "User", @@ -36,12 +53,49 @@ const jsonSchemaString: string = JSON.stringify({ "type": "integer" }, "address": { + "type": "string", + } + }, + "required": ["name", "age", "address"] +}); + +const jsonSchemaStringWithTags: string = JSON.stringify({ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "User", + "type": "object", + "properties": { + "name": { "type": "string" + }, + "age": { + "type": "integer" + }, + "address": { + "type": "string", + "confluent:tags": [ "PII" ] } }, "required": ["name", "age", "address"] }); +let encRule: Rule = { + name: 'EncryptionDemo', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'schemaregistrydemo', + 'encrypt.kms.type': 'aws-kms', + 'encrypt.kms.key.id': 'your-kms-key', + }, + onFailure: 'ERROR,NONE' +}; + +let ruleSet: RuleSet = { + domainRules: [encRule] +}; + const avroSchemaInfo: SchemaInfo = { schema: avroSchemaString, schemaType: 'AVRO' @@ -52,7 +106,19 @@ const jsonSchemaInfo: SchemaInfo = { schemaType: 'JSON' }; -const data: { name: string; age: number; address: string; }[] = []; +const avroSchemaInfoWithRules: SchemaInfo = { + schema: avroSchemaStringWithTags, + schemaType: 'AVRO', + ruleSet: ruleSet +}; + +const jsonSchemaInfoWithRules: SchemaInfo = { + schema: jsonSchemaStringWithTags, + schemaType: 'JSON', + ruleSet: ruleSet +}; + +let data: { name: string; age: number; address: string; }[]; let schemaRegistryClient: SchemaRegistryClient; @@ -66,10 +132,10 @@ function generateData(numRecords: number) { } } -generateData(10000); +const numRecords = 1000; async function serializeAndDeserializeSchemas(serializer: Serializer, deserializer: Deserializer, topic: string) { - Promise.all( + await Promise.all( data.map(async (record) => { const serialized = await serializer.serialize(topic, record); await deserializer.deserialize(topic, serialized); @@ -77,10 +143,12 @@ async function serializeAndDeserializeSchemas(serializer: Serializer, deserializ ); } -describe('Serialization Performance Test', () => { +describe('Concurrent Serialization Performance Test', () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); }); it("Should measure serialization and deserialization performance for JSON", async () => { @@ -91,11 +159,14 @@ describe('Serialization Performance Test', () => { const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + const start = performance.now(); await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); const end = performance.now(); - console.log(`JSON serialization and deserialization took ${end - start} ms`); + console.log(`Concurrent JSON serialization and deserialization took ${end - start} ms`); }); it("Should measure serialization and deserialization performance for Avro", async () => { @@ -103,14 +174,17 @@ describe('Serialization Performance Test', () => { await schemaRegistryClient.register(topic + "-value", avroSchemaInfo); const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; - const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); - const deserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); const start = performance.now(); - await serializeAndDeserializeSchemas(serializer, deserializer, topic); + await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); const end = performance.now(); - console.log(`Avro serialization and deserialization took ${end - start} ms`); + console.log(`Concurrent Avro serialization and deserialization took ${end - start} ms`); }); // it("Should measure serialization and deserialization performance for Protobuf", async () => { @@ -125,3 +199,148 @@ describe('Serialization Performance Test', () => { // console.log(`Protobuf serialization and deserialization took ${end - start} ms`); // }); }); + +describe('Concurrent Serialization Performance Test with Rules', () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); + }); + + it("Should measure serialization and deserialization performance for JSON with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); + const end = performance.now(); + + console.log(`Concurrent JSON serialization and deserialization with rules took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); + const end = performance.now(); + + console.log(`Concurrent Avro serialization and deserialization with rules took ${end - start} ms`); + }); +}); + +describe("Sequential Serialization Performance Test", () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); + }); + + it("Should measure serialization and deserialization performance for JSON", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfo); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await jsonSerializer.serialize(topic, data[i]); + await jsonDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential JSON serialization and deserialization took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfo); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await avroSerializer.serialize(topic, data[i]); + await avroDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential Avro serialization and deserialization took ${end - start} ms`); + }); +}); + +describe("Sequential Serialization Performance Test with Rules", () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); + }); + + it("Should measure serialization and deserialization performance for JSON with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await jsonSerializer.serialize(topic, data[i]); + await jsonDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential JSON serialization and deserialization with rules took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await avroSerializer.serialize(topic, data[i]); + await avroDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential Avro serialization and deserialization with rules took ${end - start} ms`); + }); + +});