This package contains the KafkaMessage__c
sObject and related Apex logic in
order to receive JSON payloads representing changes from the Kafka CDC pipeline.
A trigger on the KafkaMessage__c
sObject will enqueue asynchronous processing
requests through the asynchronous processing framework that is part of the
crm-platform-base package.
The framework depends on two custom metadata objects in order to dynamically instruct the application how to handle the message payload.
Binding between the asynchronous processing request (AsyncRequest__c
) type
created by this package and the KafkaMessageAsyncJob
class in order to
instruct the asynchronous processing framework to call the
KafkaMessageAsyncJob
class in order to handle requests originating from this
package.
Binding between the KafkaMessage__c.Topic__c
field and an Apex handler class
for a given Topic in order to instruct the application on how to handle a
message payload related to a specific Kafka topic.
- An external application inserts a record or batch or records into the
KafkaMessage__c
sObject - A trigger on the
KafkaMessage__c
object insert one record into theAsyncRequest__c
object for each batch of up to 200KafkaMessage__c
records created in a single transaction, representing a request for asynchronous processing of the new messages. - When the asynchronous processing framework processes the request, the custom
metadata binding
AsyncRequestHandlerBinding__mdt
instructs the application to handle the request using theKafkaMessageAsyncJob
Apex class. - If noAsyncRequestHandlerBinding__mdt
record is found corresponding to the "Kafka Message"AsyncRequestType__c
value, theAsyncRequest__c
record is updated with an error. - The
KafkaMessageAsyncJob
queries for the relevantKafkaMessage__c
records by the Ids stored in the async processing request and queries theKafkaMessageHandlerBinding__mdt
custom metadata object for registered bindings betweenKafkaMessage__c.Topic__c
values and corresponding Apex classes to handle payloads corresponding toTopic__c
values. - If noKafkaMessageHandlerBinding__mdt
record is found corresponding to theTopic__c
value, the relevantKafkaMessage__c
record is updated with an error. The message can then be retried after the error has been addressed. i. IfKafkaMessageHandlerBinding__mdt.SandboxOverrideTopic__c
exists, it is its value which will correspond withKafkaMessage__c.Topic__c
in scratch orgs and sandboxes.KafkaMessageHandlerBinding__mdt.Topic__c
will in this case remain unused. - The Apex class registered by the
KafkaMessageHandlerBinding__mdt
binding executes the business logic corresponding to theTopic__c
value. If an exception occurs, the relevantKafkaMessage__c
record is updated with an error. The message can then be retried after the error has been addressed.
To process incoming kafka messages in a synchronous context the following pattern should be followed:
- Definition of a separate platform event with the exact data model as i.e. defined here.
- Create a trigger and separate trigger handler to process the incoming events.
- The processing itself should be implemented using the
IKafkaMessageConsumer
interface such that error handling can be performed easily storing failed events asKafkaMessage__c
records. An example of this can be viewed here where doEventTransform performs the transformation from the custom event to theKafkaMessage__c
model and the failed events are stored asKafkaMessage__c
records in an error status.