This package contains the KafkaMessage__c
sObject and related Apex logic in order to receive JSON payloads representing
changes from the Kafka CDC pipeline. A trigger on the KafkaMessage__c
sObject will create enqueue asynchronous processing
request through the asynchronous processing framework that is part of the crm-platform-base package.
The framework depends on two custom metadata objects in order to dynamically instruct the application how to handle the message payload.
Binding between the asynchronous processing request (AsyncRequest__c
) type created by this package and the KafkaMessageAsyncJob
class
in order to instruct the asynchronous processing framework to call the KafkaMessageAsyncJob
class in order to handle
requests originating from this package.
Binding between the KafkaMessagec.Topicc field and an Apex handler class for a given Topic in order to instruct the application on how to handle a message payload related to a specific Kafka topic.
- An external application inserts a record or batch or records into the KafkaMessage__c sObject
- A trigger on the KafkaMessagec object insert one record into the AsyncRequestc object for each batch of up to 200 KafkaMessage__c records created in a single transaction, representing a a request for asynchronous processing of the new messages.
- When the asynchronous processing framework processes the request, the custom metadata binding
AsyncRequestHandlerBinding_mdt
instructs the application to handle the request using theKafkaMessageAsyncJob
Apex class. - If noAsyncRequestHandlerBinding_mdt
record is found corresponding to the "Kafka Message" AsyncRequestType**c value, theAsyncRequest**c
record is updated with an error. - The
KafkaMessageAsyncJob
queries for the relevant KafkaMessagec records by the Ids stored in the async processing request and queries theKafkaMessageHandlerBinding_mdt
custom metadata object for registered bindings betweenKafkaMessage**c.Topic**c
values and corresponding Apex classes to handle payloads corresponding to Topicc values. - If noKafkaMessageHandlerBinding_mdt
record is found corresponding to theTopic__c
value, the relevantKafkaMessage__c
record is updated with an error. The message kan then be retried after the error has been addressed. - The Apex class registered by the
KafkaMessageHandlerBinding_mdt
binding executes the business logic corresponding to theTopic__c
value. - If an execption occurs, the relevantKafkaMessage__c
record is updated with an error. The message kan then be retried after the error has been addressed.
To process incoming kafka messages in a synchronous context the following pattern should be followed:
- Definition of a separate platform event with the exact data model as i.e. defined here.
- Create a trigger and separate trigger handler to process the incoming events.
- The processing itself should be implemented using the IKafkaMessageConsumer interface such that error handling can be performed easily storing failed events as KafkaMessage__c records. An example of this can be veiwed here where doEventTransform performs the transformation from the custom event to the KafkaMessage__c model and the failed events are stored as KafkaMessage__c records in an error status.