diff --git a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaConfiguration.kt b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaConfiguration.kt index 7587851..aa1c400 100644 --- a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaConfiguration.kt +++ b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaConfiguration.kt @@ -1,7 +1,12 @@ package io.holunda.axon.camunda +import com.fasterxml.jackson.databind.ObjectMapper +import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler import io.holunda.axon.camunda.ingress.CamundaEventMessageHandler import org.axonframework.config.Configurer +import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl +import org.camunda.bpm.engine.impl.jobexecutor.JobHandlerConfiguration +import org.camunda.bpm.engine.impl.persistence.entity.MessageEntity import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.ComponentScan @@ -22,3 +27,13 @@ class AxonCamundaConfiguration { } } +fun ProcessEngineConfigurationImpl.createCustomJob(configuration: JobHandlerConfiguration, objectMapper: ObjectMapper): Unit = this.commandExecutorTxRequired.execute { context -> + context.jobManager.send( + MessageEntity() + .apply { + this.jobHandlerConfigurationRaw = objectMapper.writeValueAsString(configuration) + this.jobHandlerType = CamundaEventCorrelatingJobHandler.TYPE + } + ) +} + diff --git a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaEnginePlugin.kt b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaEnginePlugin.kt index c740c2a..b63933b 100644 --- a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaEnginePlugin.kt +++ b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/AxonCamundaEnginePlugin.kt @@ -1,5 +1,6 @@ package io.holunda.axon.camunda +import com.fasterxml.jackson.databind.ObjectMapper import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler import mu.KLogging import org.camunda.bpm.engine.impl.cfg.AbstractProcessEnginePlugin @@ -7,12 +8,15 @@ import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl import org.springframework.stereotype.Component @Component -class AxonCamundaEnginePlugin : AbstractProcessEnginePlugin() { +class AxonCamundaEnginePlugin( + val objectMapper: ObjectMapper +) : AbstractProcessEnginePlugin() { companion object : KLogging() override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) { logger.info { "AXON-CAMUNDA-001: Axon Camunda Plugin initialized." } - processEngineConfiguration.customJobHandlers = (processEngineConfiguration.customJobHandlers ?: mutableListOf()) + CamundaEventCorrelatingJobHandler() + processEngineConfiguration.customJobHandlers = + (processEngineConfiguration.customJobHandlers ?: mutableListOf()) + CamundaEventCorrelatingJobHandler(objectMapper) } } diff --git a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandler.kt b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandler.kt index 7661ed9..be6dd56 100644 --- a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandler.kt +++ b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandler.kt @@ -1,16 +1,21 @@ package io.holunda.axon.camunda.ingress +import com.fasterxml.jackson.databind.ObjectMapper import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandlerConfiguration.Companion.fromCanonicalString import mu.KLogging +import org.camunda.bpm.engine.RuntimeService import org.camunda.bpm.engine.impl.interceptor.CommandContext import org.camunda.bpm.engine.impl.jobexecutor.JobHandler import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity import org.camunda.bpm.engine.impl.persistence.entity.JobEntity +import org.camunda.bpm.engine.runtime.ProcessInstance /** * Job handler for asynchronous delivery of events (as a separate job). */ -class CamundaEventCorrelatingJobHandler : JobHandler { +class CamundaEventCorrelatingJobHandler( + private val objectMapper: ObjectMapper +) : JobHandler { companion object : KLogging() { const val TYPE = "axon-event-correlation" @@ -30,31 +35,47 @@ class CamundaEventCorrelatingJobHandler : JobHandler(configuration.correlationVariableName to configuration.correlationId)) - .let { - if (configuration.local) { - it.setVariablesLocal(configuration.variables) - } else { - it.setVariables(configuration.variables) - } - } - .correlate() - } else { - logger.warn { "Skipping correlation of ${configuration.eventName}: id ${configuration.correlationId}, found ${instances.size} instances for ${configuration.processDefinitionKey} instead of 1." } + + correlate(instances, configuration, runtimeService) + } else if (configuration.correlationKeys.isNotEmpty()) { + val query = runtimeService + .createProcessInstanceQuery() + .processDefinitionKey(configuration.processDefinitionKey) + + configuration.correlationKeys.forEach { + query.variableValueEquals(it.key, it.value) } - } else { + correlate(query.list(), configuration, runtimeService) + } else { val subscriptions = runtimeService .createEventSubscriptionQuery() .eventName(configuration.eventName) @@ -62,7 +83,6 @@ class CamundaEventCorrelatingJobHandler : JobHandler 0) { - runtimeService .createSignalEvent(configuration.eventName) .setVariables(configuration.variables) @@ -73,7 +93,31 @@ class CamundaEventCorrelatingJobHandler : JobHandler, + configuration: CamundaEventCorrelatingJobHandlerConfiguration, + runtimeService: RuntimeService + ) { + if (instances.size == 1) { + logger.debug { "Correlation found: keys ${configuration.correlationKeys}, correlating ${configuration.eventName} message using it with instance of ${configuration.processDefinitionKey}." } + runtimeService + .createMessageCorrelation(configuration.eventName) + .processInstanceVariablesEqual(configuration.correlationKeys) + .let { + if (configuration.local) { + it.setVariablesLocal(configuration.variables) + } else { + it.setVariables(configuration.variables) + } + } + .correlate() + } else { + logger.warn { "Skipping correlation of ${configuration.eventName}: keys ${configuration.correlationKeys}, found ${instances.size} instances for ${configuration.processDefinitionKey} instead of 1." } + } + } + + override fun newConfiguration(canonicalString: String): CamundaEventCorrelatingJobHandlerConfiguration = + fromCanonicalString(objectMapper = objectMapper, value = canonicalString) override fun onDelete(configuration: CamundaEventCorrelatingJobHandlerConfiguration, jobEntity: JobEntity) { diff --git a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandlerConfiguration.kt b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandlerConfiguration.kt index 72bd7c0..05b3610 100644 --- a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandlerConfiguration.kt +++ b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventCorrelatingJobHandlerConfiguration.kt @@ -9,19 +9,18 @@ data class CamundaEventCorrelatingJobHandlerConfiguration( val processDefinitionKey: String, val eventName: String, val local: Boolean = false, - val variables: Map, - val correlationVariableName: String?, - val correlationId: Any?, + val variables: Map = mapOf(), + val businessKey: String? = null, + val correlationKeys: Map = mapOf(), + val startProcess: Boolean = false ) : JobHandlerConfiguration { companion object { - val objectMapper: ObjectMapper = jacksonObjectMapper() - - fun fromCanonicalString(value: String): CamundaEventCorrelatingJobHandlerConfiguration { + fun fromCanonicalString(objectMapper: ObjectMapper = jacksonObjectMapper(), value: String): CamundaEventCorrelatingJobHandlerConfiguration { return objectMapper.readValue(value) } } - override fun toCanonicalString(): String = objectMapper.writeValueAsString(this) + override fun toCanonicalString(): String = throw UnsupportedOperationException() } diff --git a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventMessageHandler.kt b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventMessageHandler.kt index b2ee8cf..53c51a4 100644 --- a/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventMessageHandler.kt +++ b/axon-camunda/src/main/kotlin/io/holunda/axon/camunda/ingress/CamundaEventMessageHandler.kt @@ -1,15 +1,15 @@ package io.holunda.axon.camunda.ingress +import com.fasterxml.jackson.databind.ObjectMapper import io.holunda.axon.camunda.AxonCamundaProperties import io.holunda.axon.camunda.config.CamundaAxonEventCommandFactoryRegistry import io.holunda.axon.camunda.config.CamundaEvent +import io.holunda.axon.camunda.createCustomJob import io.holunda.axon.camunda.extractCorrelationId -import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler.Companion.TYPE import mu.KLogging import org.axonframework.eventhandling.EventMessage import org.axonframework.eventhandling.EventMessageHandler import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl -import org.camunda.bpm.engine.impl.persistence.entity.MessageEntity import org.springframework.stereotype.Component /** @@ -19,7 +19,8 @@ import org.springframework.stereotype.Component class CamundaEventMessageHandler( private val registry: CamundaAxonEventCommandFactoryRegistry, private val axonCamundaProperties: AxonCamundaProperties, - private val processEngineConfigurationImpl: ProcessEngineConfigurationImpl + private val processEngineConfigurationImpl: ProcessEngineConfigurationImpl, + private val objectMapper: ObjectMapper ) : EventMessageHandler { companion object : KLogging() @@ -55,9 +56,7 @@ class CamundaEventMessageHandler( processDefinitionKey = processDefinitionKey, eventName = camundaEvent.name, variables = camundaEvent.variables, - local = camundaEvent.local, - correlationVariableName = null, - correlationId = null + local = camundaEvent.local ) ) } @@ -70,8 +69,7 @@ class CamundaEventMessageHandler( eventName = camundaEvent.name, variables = camundaEvent.variables, local = camundaEvent.local, - correlationVariableName = camundaEvent.correlationVariableName, - correlationId = correlationId + correlationKeys = mapOf(Pair(camundaEvent.correlationVariableName, correlationId)) ) ) } else { @@ -80,16 +78,7 @@ class CamundaEventMessageHandler( } private fun createMessageJob(configuration: CamundaEventCorrelatingJobHandlerConfiguration) { - processEngineConfigurationImpl.commandExecutorTxRequired.execute { context -> - context.jobManager.send( - MessageEntity() - .apply { - this.jobHandlerConfiguration = configuration - this.jobHandlerType = TYPE - } - ) - } - + processEngineConfigurationImpl.createCustomJob(configuration, objectMapper) } }