Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CorrelationKeys, start process, custom ObjectMapper #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package io.holunda.axon.camunda

import com.fasterxml.jackson.databind.ObjectMapper
import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler
import io.holunda.axon.camunda.ingress.CamundaEventMessageHandler
import org.axonframework.config.Configurer
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl
import org.camunda.bpm.engine.impl.jobexecutor.JobHandlerConfiguration
import org.camunda.bpm.engine.impl.persistence.entity.MessageEntity
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.ComponentScan
Expand All @@ -22,3 +27,13 @@ class AxonCamundaConfiguration {
}
}

fun ProcessEngineConfigurationImpl.createCustomJob(configuration: JobHandlerConfiguration, objectMapper: ObjectMapper): Unit = this.commandExecutorTxRequired.execute { context ->
context.jobManager.send(
MessageEntity()
.apply {
this.jobHandlerConfigurationRaw = objectMapper.writeValueAsString(configuration)
this.jobHandlerType = CamundaEventCorrelatingJobHandler.TYPE
}
)
}

Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package io.holunda.axon.camunda

import com.fasterxml.jackson.databind.ObjectMapper
import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler
import mu.KLogging
import org.camunda.bpm.engine.impl.cfg.AbstractProcessEnginePlugin
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl
import org.springframework.stereotype.Component

@Component
class AxonCamundaEnginePlugin : AbstractProcessEnginePlugin() {
class AxonCamundaEnginePlugin(
val objectMapper: ObjectMapper
) : AbstractProcessEnginePlugin() {

companion object : KLogging()

override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) {
logger.info { "AXON-CAMUNDA-001: Axon Camunda Plugin initialized." }
processEngineConfiguration.customJobHandlers = (processEngineConfiguration.customJobHandlers ?: mutableListOf()) + CamundaEventCorrelatingJobHandler()
processEngineConfiguration.customJobHandlers =
(processEngineConfiguration.customJobHandlers ?: mutableListOf()) + CamundaEventCorrelatingJobHandler(objectMapper)
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package io.holunda.axon.camunda.ingress

import com.fasterxml.jackson.databind.ObjectMapper
import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandlerConfiguration.Companion.fromCanonicalString
import mu.KLogging
import org.camunda.bpm.engine.RuntimeService
import org.camunda.bpm.engine.impl.interceptor.CommandContext
import org.camunda.bpm.engine.impl.jobexecutor.JobHandler
import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity
import org.camunda.bpm.engine.impl.persistence.entity.JobEntity
import org.camunda.bpm.engine.runtime.ProcessInstance

/**
* Job handler for asynchronous delivery of events (as a separate job).
*/
class CamundaEventCorrelatingJobHandler : JobHandler<CamundaEventCorrelatingJobHandlerConfiguration> {
class CamundaEventCorrelatingJobHandler(
private val objectMapper: ObjectMapper
) : JobHandler<CamundaEventCorrelatingJobHandlerConfiguration> {

companion object : KLogging() {
const val TYPE = "axon-event-correlation"
Expand All @@ -30,39 +35,54 @@ class CamundaEventCorrelatingJobHandler : JobHandler<CamundaEventCorrelatingJobH
) {
val runtimeService = commandContext.processEngineConfiguration.runtimeService

if (configuration.correlationId != null && configuration.correlationVariableName != null) {
if (configuration.startProcess) {
requireNotNull(configuration.businessKey)

val subscriptions = runtimeService.createEventSubscriptionQuery()
.eventName(configuration.eventName)
.eventType("message")
.list()

logger.debug { "Found ${subscriptions.size} subscriptions for message ${configuration.eventName}" }

if (subscriptions.size == 1) {
logger.debug { "Starting process by message ${configuration.eventName} with businessKey ${configuration.businessKey} and variables ${configuration.variables}" }

runtimeService.startProcessInstanceByMessage(
configuration.eventName,
configuration.businessKey,
configuration.variables
)
} else {
logger.warn { "Skipping correlation of ${configuration.eventName}: keys ${configuration.correlationKeys}, found ${subscriptions.size} subscriptions instead of 1." }
}

} else if (configuration.businessKey != null) {
val instances = runtimeService
.createProcessInstanceQuery()
.processDefinitionKey(configuration.processDefinitionKey)
.variableValueEquals(configuration.correlationVariableName, configuration.correlationId)
.processInstanceBusinessKey(configuration.businessKey)
.list()
if (instances.size == 1) {
logger.debug { "Correlation id found ${configuration.correlationId}, correlating ${configuration.eventName} message using it with instance of ${configuration.processDefinitionKey}." }
runtimeService
.createMessageCorrelation(configuration.eventName)
.processInstanceVariablesEqual(mutableMapOf<String, Any>(configuration.correlationVariableName to configuration.correlationId))
.let {
if (configuration.local) {
it.setVariablesLocal(configuration.variables)
} else {
it.setVariables(configuration.variables)
}
}
.correlate()
} else {
logger.warn { "Skipping correlation of ${configuration.eventName}: id ${configuration.correlationId}, found ${instances.size} instances for ${configuration.processDefinitionKey} instead of 1." }

correlate(instances, configuration, runtimeService)
} else if (configuration.correlationKeys.isNotEmpty()) {
val query = runtimeService
.createProcessInstanceQuery()
.processDefinitionKey(configuration.processDefinitionKey)

configuration.correlationKeys.forEach {
query.variableValueEquals(it.key, it.value)
}
} else {

correlate(query.list(), configuration, runtimeService)
} else {
val subscriptions = runtimeService
.createEventSubscriptionQuery()
.eventName(configuration.eventName)
.eventType("signal")
.list()

if (subscriptions.size > 0) {

runtimeService
.createSignalEvent(configuration.eventName)
.setVariables(configuration.variables)
Expand All @@ -73,7 +93,31 @@ class CamundaEventCorrelatingJobHandler : JobHandler<CamundaEventCorrelatingJobH
}
}

override fun newConfiguration(canonicalString: String) = fromCanonicalString(canonicalString)
private fun correlate(
instances: MutableList<ProcessInstance>,
configuration: CamundaEventCorrelatingJobHandlerConfiguration,
runtimeService: RuntimeService
) {
if (instances.size == 1) {
logger.debug { "Correlation found: keys ${configuration.correlationKeys}, correlating ${configuration.eventName} message using it with instance of ${configuration.processDefinitionKey}." }
runtimeService
.createMessageCorrelation(configuration.eventName)
.processInstanceVariablesEqual(configuration.correlationKeys)
.let {
if (configuration.local) {
it.setVariablesLocal(configuration.variables)
} else {
it.setVariables(configuration.variables)
}
}
.correlate()
} else {
logger.warn { "Skipping correlation of ${configuration.eventName}: keys ${configuration.correlationKeys}, found ${instances.size} instances for ${configuration.processDefinitionKey} instead of 1." }
}
}

override fun newConfiguration(canonicalString: String): CamundaEventCorrelatingJobHandlerConfiguration =
fromCanonicalString(objectMapper = objectMapper, value = canonicalString)


override fun onDelete(configuration: CamundaEventCorrelatingJobHandlerConfiguration, jobEntity: JobEntity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ data class CamundaEventCorrelatingJobHandlerConfiguration(
val processDefinitionKey: String,
val eventName: String,
val local: Boolean = false,
val variables: Map<String, Any>,
val correlationVariableName: String?,
val correlationId: Any?,
val variables: Map<String, Any> = mapOf(),
val businessKey: String? = null,
val correlationKeys: Map<String, Any> = mapOf(),
val startProcess: Boolean = false
) : JobHandlerConfiguration {

companion object {

val objectMapper: ObjectMapper = jacksonObjectMapper()

fun fromCanonicalString(value: String): CamundaEventCorrelatingJobHandlerConfiguration {
fun fromCanonicalString(objectMapper: ObjectMapper = jacksonObjectMapper(), value: String): CamundaEventCorrelatingJobHandlerConfiguration {
return objectMapper.readValue(value)
}
}

override fun toCanonicalString(): String = objectMapper.writeValueAsString(this)
override fun toCanonicalString(): String = throw UnsupportedOperationException()
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.holunda.axon.camunda.ingress

import com.fasterxml.jackson.databind.ObjectMapper
import io.holunda.axon.camunda.AxonCamundaProperties
import io.holunda.axon.camunda.config.CamundaAxonEventCommandFactoryRegistry
import io.holunda.axon.camunda.config.CamundaEvent
import io.holunda.axon.camunda.createCustomJob
import io.holunda.axon.camunda.extractCorrelationId
import io.holunda.axon.camunda.ingress.CamundaEventCorrelatingJobHandler.Companion.TYPE
import mu.KLogging
import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventhandling.EventMessageHandler
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl
import org.camunda.bpm.engine.impl.persistence.entity.MessageEntity
import org.springframework.stereotype.Component

/**
Expand All @@ -19,7 +19,8 @@ import org.springframework.stereotype.Component
class CamundaEventMessageHandler(
private val registry: CamundaAxonEventCommandFactoryRegistry,
private val axonCamundaProperties: AxonCamundaProperties,
private val processEngineConfigurationImpl: ProcessEngineConfigurationImpl
private val processEngineConfigurationImpl: ProcessEngineConfigurationImpl,
private val objectMapper: ObjectMapper
) : EventMessageHandler {

companion object : KLogging()
Expand Down Expand Up @@ -55,9 +56,7 @@ class CamundaEventMessageHandler(
processDefinitionKey = processDefinitionKey,
eventName = camundaEvent.name,
variables = camundaEvent.variables,
local = camundaEvent.local,
correlationVariableName = null,
correlationId = null
local = camundaEvent.local
)
)
}
Expand All @@ -70,8 +69,7 @@ class CamundaEventMessageHandler(
eventName = camundaEvent.name,
variables = camundaEvent.variables,
local = camundaEvent.local,
correlationVariableName = camundaEvent.correlationVariableName,
correlationId = correlationId
correlationKeys = mapOf(Pair(camundaEvent.correlationVariableName, correlationId))
)
)
} else {
Expand All @@ -80,16 +78,7 @@ class CamundaEventMessageHandler(
}

private fun createMessageJob(configuration: CamundaEventCorrelatingJobHandlerConfiguration) {
processEngineConfigurationImpl.commandExecutorTxRequired.execute { context ->
context.jobManager.send(
MessageEntity()
.apply {
this.jobHandlerConfiguration = configuration
this.jobHandlerType = TYPE
}
)
}

processEngineConfigurationImpl.createCustomJob(configuration, objectMapper)
}
}