Skip to content

Commit

Permalink
feat(engine): add property and check for transaction isolation level …
Browse files Browse the repository at this point in the history
…on datasource initialisation

Related to #4516
  • Loading branch information
joaquinfelici committed Aug 9, 2024
1 parent e4c392c commit e1c9d54
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public abstract class ProcessEngineConfiguration {
*/
public static final String AUTHORIZATION_CHECK_REVOKE_NEVER = "never";

/**
* The default transaction isolation level when no property is configured
*/
public static final String TRANSACTION_ISOLATION_LEVEL_DEFAULT = "readCommitted";

/**
* This mode only checks for {@link Authorization#AUTH_TYPE_REVOKE revoke} authorizations if at least
* one revoke authorization currently exits for the current user or one of the groups the user is a member
Expand Down Expand Up @@ -252,6 +257,7 @@ public abstract class ProcessEngineConfiguration {
protected boolean transactionsExternallyManaged = false;
/** the number of seconds the jdbc driver will wait for a response from the database */
protected Integer jdbcStatementTimeout;
protected String transactionIsolationLevel = TRANSACTION_ISOLATION_LEVEL_DEFAULT;
protected boolean jdbcBatchProcessing = true;

protected int defaultNumberOfRetries = JobEntity.DEFAULT_RETRIES;
Expand Down Expand Up @@ -567,6 +573,14 @@ public ProcessEngineConfiguration setDatabaseVendor(String databaseVendor) {
return this;
}

public String getTransactionIsolationLevel() {
return transactionIsolationLevel;
}

public void setTransactionIsolationLevel(String transactionIsolationLevel) {
this.transactionIsolationLevel = transactionIsolationLevel;
}

public String getDatabaseVersion() {
return databaseVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,19 @@ public NotValidException logErrorNoTTLConfigured() {
+ "* Set a default historyTimeToLive as a global process engine configuration\n"
+ "* (Not recommended) Deactivate the enforceTTL config to disable this check"));
}

public void logNullIsolationLevel(String defaultIsolationLevel) {
logInfo("019",
"Property transactionIsolationLevel has a null or empty value and the global database configuration is different from '{}'."
+ "This may produce deadlocks or other unexpected behaviors.",
defaultIsolationLevel);
}

public void logNonOptimalIsolationLevel(String propertyValue, String defaultIsolationLevel) {
logWarn("020",
"Property transactionIsolationLevel has value '{}' which is different from '{}'. "
+ "This may produce deadlocks or other unexpected behaviors.",
propertyValue,
defaultIsolationLevel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.ArrayList;
Expand All @@ -47,6 +45,7 @@
import javax.sql.DataSource;
import org.apache.ibatis.builder.xml.XMLConfigBuilder;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.datasource.unpooled.UnpooledDataSource;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
Expand Down Expand Up @@ -130,7 +129,6 @@
import org.camunda.bpm.engine.impl.cfg.multitenancy.TenantCommandChecker;
import org.camunda.bpm.engine.impl.cfg.multitenancy.TenantIdProvider;
import org.camunda.bpm.engine.impl.cfg.standalone.StandaloneTransactionContextFactory;
import org.camunda.bpm.engine.impl.cmd.HistoryCleanupCmd;
import org.camunda.bpm.engine.impl.cmmn.CaseServiceImpl;
import org.camunda.bpm.engine.impl.cmmn.deployer.CmmnDeployer;
import org.camunda.bpm.engine.impl.cmmn.entity.repository.CaseDefinitionManager;
Expand Down Expand Up @@ -395,7 +393,7 @@
*/
public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfiguration {

protected final static ConfigurationLogger LOG = ConfigurationLogger.CONFIG_LOGGER;
protected static ConfigurationLogger LOG = ConfigurationLogger.CONFIG_LOGGER;

public static final String DB_SCHEMA_UPDATE_CREATE = "create";
public static final String DB_SCHEMA_UPDATE_DROP_CREATE = "drop-create";
Expand All @@ -416,6 +414,11 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected static final Map<Object, Object> DEFAULT_BEANS_MAP = new HashMap<>();

protected static final String PRODUCT_NAME = "Camunda BPM Runtime";
protected static final Map<String, Integer> TRANSACTION_ISOLATION_LEVEL_MAPPING = Map.of(
"readCommitted", Connection.TRANSACTION_READ_COMMITTED,
"readUncommitted", Connection.TRANSACTION_READ_UNCOMMITTED,
"repeatableRead", Connection.TRANSACTION_REPEATABLE_READ,
"serializable", Connection.TRANSACTION_SERIALIZABLE);

public static SqlSessionFactory cachedSqlSessionFactory;

Expand All @@ -439,7 +442,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig

// Command executor and interceptor stack
/**
* the configurable list which will be {@link #initInterceptorChain(java.util.List) processed} to build the {@link #commandExecutorTxRequired}
* the configurable list which will be {@link #initInterceptorChain(List) processed} to build the {@link #commandExecutorTxRequired}
*/
protected List<CommandInterceptor> customPreCommandInterceptorsTxRequired;
protected List<CommandInterceptor> customPostCommandInterceptorsTxRequired;
Expand Down Expand Up @@ -749,7 +752,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected DmnHistoryEventProducer dmnHistoryEventProducer;

/**
* As an instance of {@link org.camunda.bpm.engine.impl.history.handler.CompositeHistoryEventHandler}
* As an instance of {@link CompositeHistoryEventHandler}
* it contains all the provided history event handlers that process history events.
*/
protected HistoryEventHandler historyEventHandler;
Expand Down Expand Up @@ -1261,7 +1264,7 @@ public void initHistoryCleanup() {
//validate number of threads
if (historyCleanupDegreeOfParallelism < 1 || historyCleanupDegreeOfParallelism > MAX_THREADS_NUMBER) {
throw LOG.invalidPropertyValue("historyCleanupDegreeOfParallelism", String.valueOf(historyCleanupDegreeOfParallelism),
String.format("value for number of threads for history cleanup should be between 1 and %s", HistoryCleanupCmd.MAX_THREADS_NUMBER));
String.format("value for number of threads for history cleanup should be between 1 and %s", MAX_THREADS_NUMBER));
}

if (historyCleanupBatchWindowStartTime != null) {
Expand Down Expand Up @@ -1708,12 +1711,52 @@ protected void initDataSource() {
((PooledDataSource) dataSource).forceCloseAll();
}
}
configureDefaultIsolationLevel();

if (databaseType == null) {
initDatabaseType();
}
}

protected void configureDefaultIsolationLevel() {
if (transactionIsolationLevel == null || transactionIsolationLevel.isBlank()) {
int currentIsolationLevel = getCurrentTransactionIsolationLevel();
if (currentIsolationLevel != Connection.TRANSACTION_READ_COMMITTED) {
LOG.logNullIsolationLevel(TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}
} else if (!TRANSACTION_ISOLATION_LEVEL_MAPPING.containsKey(transactionIsolationLevel)) {
throw LOG.invalidPropertyValue("transactionIsolationLevel", transactionIsolationLevel);
} else {
final int isolationLevel = TRANSACTION_ISOLATION_LEVEL_MAPPING.get(transactionIsolationLevel);
if (isolationLevel != Connection.TRANSACTION_READ_COMMITTED) {
LOG.logNonOptimalIsolationLevel(transactionIsolationLevel, TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}
setDefaultTransactionIsolationLevel(isolationLevel);
}
}

protected Integer getCurrentTransactionIsolationLevel() {
try (Connection connection = dataSource.getConnection()) {
return connection.getTransactionIsolation();
} catch (SQLException e) {
throw LOG.databaseConnectionAccessException(e);
}
}

protected void setDefaultTransactionIsolationLevel(int isolationLevel) {
if (dataSource instanceof PooledDataSource) {
((PooledDataSource) dataSource).setDefaultTransactionIsolationLevel(isolationLevel);
} else if (dataSource instanceof UnpooledDataSource){
((UnpooledDataSource) dataSource).setDefaultTransactionIsolationLevel(isolationLevel);
} else {
try (Connection connection = dataSource.getConnection()) {
connection.setTransactionIsolation(isolationLevel);
} catch (SQLException e) {
throw LOG.databaseConnectionAccessException(e);
}
}
}

protected static Properties databaseTypeMappings = getDefaultDatabaseTypeMappings();
protected static final String MY_SQL_PRODUCT_NAME = "MySQL";
protected static final String MARIA_DB_PRODUCT_NAME = "MariaDB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,39 @@
package org.camunda.bpm.engine.impl.cfg;

import static org.assertj.core.api.Assertions.*;
import static org.camunda.bpm.engine.ProcessEngineConfiguration.TRANSACTION_ISOLATION_LEVEL_DEFAULT;
import static org.camunda.bpm.engine.impl.ProcessEngineLogger.CONFIG_LOGGER;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.sql.Connection;
import org.camunda.bpm.engine.ProcessEngineConfiguration;
import org.camunda.bpm.engine.ProcessEngineException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

public class ProcessEngineConfigurationTest {

private ProcessEngineConfigurationImpl engineConfiguration;
private ConfigurationLogger logger;

@Before
public void setUp() {
this.engineConfiguration = (ProcessEngineConfigurationImpl) ProcessEngineConfiguration.createProcessEngineConfigurationFromResourceDefault();
this.logger = mock(ConfigurationLogger.class);
ProcessEngineConfigurationImpl.LOG = logger;
}

@AfterClass
public static void cleanUp() {
ProcessEngineConfigurationImpl.LOG = CONFIG_LOGGER;
}

@Test
public void shouldEnableStandaloneTasksByDefault() {
// when
Expand All @@ -39,4 +67,81 @@ public void shouldEnableImplicitUpdatesDetectionByDefault() {
// then
assertThat(engineConfiguration.isImplicitVariableUpdateDetectionEnabled()).isTrue();
}

@Test
public void validIsolationLevelPropertyIsSetCorrectly() {
//given
engineConfiguration.setTransactionIsolationLevel("readUncommitted");

//when
engineConfiguration.initDataSource();

//then
assertEquals(Connection.TRANSACTION_READ_UNCOMMITTED, engineConfiguration.getCurrentTransactionIsolationLevel().intValue());
verify(logger).logNonOptimalIsolationLevel("readUncommitted", TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}

@Test
public void validIsolationLevelPropertyFromFileIsSetCorrectly() {
//given
ProcessEngineConfigurationImpl engineConfiguration = (ProcessEngineConfigurationImpl) ProcessEngineConfiguration.createProcessEngineConfigurationFromResource("camunda.cfg.customIsolationLevel.xml");

//when
engineConfiguration.initDataSource();

//then
assertEquals(Connection.TRANSACTION_SERIALIZABLE, engineConfiguration.getCurrentTransactionIsolationLevel().intValue());
verify(logger).logNonOptimalIsolationLevel("serializable", TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}

@Test
public void unsetIsolationLevelPropertyDefaultsToExpectedValue() {
//when
engineConfiguration.initDataSource();

//then
assertEquals("readCommitted", engineConfiguration.getTransactionIsolationLevel());
assertEquals(Connection.TRANSACTION_READ_COMMITTED, engineConfiguration.getCurrentTransactionIsolationLevel().intValue());
verify(logger, times(0)).logNonOptimalIsolationLevel(anyString(), anyString());
}

@Test
public void nullIsolationLevelPropertyIsHandledCorrectly() {
//given
engineConfiguration.setTransactionIsolationLevel(null);

//when
engineConfiguration.initDataSource();

//then
assertEquals(Connection.TRANSACTION_READ_COMMITTED, engineConfiguration.getCurrentTransactionIsolationLevel().intValue());
verify(logger, times(0)).logNullIsolationLevel(TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}

@Test
public void blankIsolationLevelPropertyIsHandledCorrectly() {
//given
engineConfiguration.setTransactionIsolationLevel(" ");

//when
engineConfiguration.initDataSource();

//then
assertEquals(Connection.TRANSACTION_READ_COMMITTED, engineConfiguration.getCurrentTransactionIsolationLevel().intValue());
verify(logger, times(0)).logNullIsolationLevel(TRANSACTION_ISOLATION_LEVEL_DEFAULT);
}

@Test
public void invalidIsolationLevelPropertyIsHandledCorrectly() {
//given
String propertyName = "transactionIsolationLevel";
String propertyValue = "invalid";
ProcessEngineException expectedException = CONFIG_LOGGER.invalidPropertyValue(propertyName, propertyValue);
engineConfiguration.setTransactionIsolationLevel(propertyValue);
when(logger.invalidPropertyValue(anyString(), anyString())).thenReturn(expectedException);

//when then
assertThatThrownBy(() -> engineConfiguration.initDataSource()).isEqualTo(expectedException);
verify(logger).invalidPropertyValue(propertyName, propertyValue);
}
}
52 changes: 52 additions & 0 deletions engine/src/test/resources/camunda.cfg.customIsolationLevel.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="processEngineConfiguration" class="org.camunda.bpm.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">

<property name="jdbcUrl" value="${database.url}" />
<property name="jdbcDriver" value="${database.driver}" />
<property name="jdbcUsername" value="${database.username}" />
<property name="jdbcPassword" value="${database.password}" />

<!-- Database configurations -->
<property name="databaseSchemaUpdate" value="true" />

<!-- Empty beans map to for testing purpose -->
<property name="beans">
<map/>
</property>

<!-- job executor configurations -->
<property name="jobExecutorActivate" value="false" />

<property name="bpmnStacktraceVerbose" value="false" />

<!-- turn off metrics reporter -->
<property name="dbMetricsReporterActivate" value="false" />
<property name="telemetryReporterActivate" value="false" />
<property name="taskMetricsEnabled" value="false" />

<!-- mail server configurations -->
<property name="mailServerPort" value="${mail.server.port}" />
<property name="history" value="${history.level}" />

<property name="authorizationCheckRevokes" value="${authorizationCheckRevokes}"/>

<property name="jdbcBatchProcessing" value="${jdbcBatchProcessing}"/>
<property name="transactionIsolationLevel" value="serializable"/>

<!--<property name="idGenerator" ref="uuidGenerator" />-->

<!-- telemetry configuration -->
<property name="telemetryEndpoint" value="http://localhost:8081/pings"/>

<property name="enforceHistoryTimeToLive" value="false" />

</bean>

<!--<bean id="uuidGenerator" class="org.camunda.bpm.engine.impl.persistence.StrongUuidGenerator" />-->

</beans>

0 comments on commit e1c9d54

Please sign in to comment.