Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(job-executor): Adding helpful debug logs to improve job acquisition and job execution logging. Issue#2666. #4217

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

prakashpalanisamy
Copy link

@prakashpalanisamy prakashpalanisamy commented Mar 22, 2024

This code change adds additional logging to Camunda Job Executor and scoped to the Job Acquisition and Job Execution.
je

#2666

@CLAassistant
Copy link

CLAassistant commented Mar 22, 2024

CLA assistant check
All committers have signed the CLA.

@yanavasileva
Copy link
Member

Hi @prakashpalanisamy,

Thank you for raising this.
We will have a look at it and get back to you.

Best,
Yana

Copy link
Member

@yanavasileva yanavasileva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakashpalanisamy, nice work!
I left a few comments for the code formatting (check all of the code for missing spaces) and covering the non-Spring use cases.

In addition to the changes, could you have have a look at adding a few test cases for the logging. The logging rule can be helpful for them. You can see examples here:

}
}

public void logJobExecutionInfo(ProcessEngineImpl engine, int executionQueueSize, int executionQueueCapacity, int maxExecutionThreads, int activeExecutionThreads) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ The code is called only for Spring use cases. Have you considered how to log the same data for non-Spring scenario? As I don't think the ticket is created only for Spring use cases.
Hint:

protected int queueSize = 3;
protected int corePoolSize = 3;
protected int maxPoolSize = 10;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakashpalanisamy
Copy link
Author

@yanavasileva , Thank you for taking your time to review. Ill look at the comments and update you. Thanks!!

@yanavasileva
Copy link
Member

Closing due to inactivity. The PR can be reopened again when you get back to the topic.

@prakashpalanisamy
Copy link
Author

Hi @yanavasileva Requesting you to please reopen the PR. I would like to continue the contribution to this PR. Thank you!

@tasso94 tasso94 reopened this Jun 28, 2024
@brianwarner brianwarner force-pushed the 2666-job-acquisition-execution-logging-improvement branch from b8380ab to 1622149 Compare July 11, 2024 16:13
@prakashpalanisamy
Copy link
Author

@yanavasileva , hope you are doing good. I have updated the PR with additional commits. Can you please take a look into it when you get some time. Thanks!!

@yanavasileva
Copy link
Member

I will try to have a look and provide input later this week.

@prakashpalanisamy
Copy link
Author

@yanavasileva Please let me know on the comments once you have finished reviewing, I will have them addressed. Thank you! :)

@yanavasileva yanavasileva self-requested a review August 14, 2024 13:21
Copy link
Member

@yanavasileva yanavasileva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Formatting is off almost everywhere. You can find the latest formatters here:
https://github.com/camunda/camunda-bpm-platform/tree/master/settings
👍 Changes look good so far, thank you for considering my feedback.
I added two more notes below. I will validate the added tests, run some further scenarios, and get back to you with my findings.

Thank you for your patience.

Comment on lines +256 to +266
public void availableThreadsCalculationError() {
logWarn(
"039",
"Arithmetic exception occurred while computing remaining available thread count for logging.");
}

public void totalQueueCapacityCalculationError() {
logWarn(
"040",
"Arithmetic exception occurred while computing total queue capacity for logging.");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Change severity to debug since the related logs are also logged with it + formatting:

Suggested change
public void availableThreadsCalculationError() {
logWarn(
"039",
"Arithmetic exception occurred while computing remaining available thread count for logging.");
}
public void totalQueueCapacityCalculationError() {
logWarn(
"040",
"Arithmetic exception occurred while computing total queue capacity for logging.");
}
public void availableThreadsCalculationError() {
logDebug("039", "Arithmetic exception occurred while computing remaining available thread count for logging.");
}
public void totalQueueCapacityCalculationError() {
logDebug("040", "Arithmetic exception occurred while computing total queue capacity for logging.");
}

public void numJobsInQueue(String processEngine, int numJobsInQueue, int maxQueueSize) {
logDebug(
"038",
"Jobs currently in queue to be executed for the process engine '{}' is {} out of the max queue size : {}", processEngine, numJobsInQueue, maxQueueSize);
Copy link
Member

@yanavasileva yanavasileva Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Adjust the wording for better readability:

Suggested change
"Jobs currently in queue to be executed for the process engine '{}' is {} out of the max queue size : {}", processEngine, numJobsInQueue, maxQueueSize);
"Jobs currently in queue to be executed for the process engine '{}': {} (out of the max queue size : {});

}
}

public void logJobExecutionInfo(ProcessEngineImpl engine, int executionQueueSize, int executionQueueCapacity, int maxExecutionThreads, int activeExecutionThreads) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@yanavasileva yanavasileva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some further adjustments related to the tests.

Comment on lines +188 to +189
LOG.numJobsInQueue(engine.getName(), executionQueueSize, executionQueueCapacity);
LOG.currentJobExecutions(engine.getName(), activeExecutionThreads);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Let's switch the order, I think it better to log executions first:

Suggested change
LOG.numJobsInQueue(engine.getName(), executionQueueSize, executionQueueCapacity);
LOG.currentJobExecutions(engine.getName(), activeExecutionThreads);
LOG.currentJobExecutions(engine.getName(), activeExecutionThreads);
LOG.numJobsInQueue(engine.getName(), executionQueueSize, executionQueueCapacity);

Comment on lines +43 to +52
@After
public void tearDown() {
List<Job> jobs = managementService.createJobQuery().processDefinitionKey("simpleAsyncProcess").list();

// remove simple async process jobs
for (Job job : jobs) {
managementService.deleteJob(job.getId());
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ This is taken care of during the test destroy and deletion of the deployment.

Suggested change
@After
public void tearDown() {
List<Job> jobs = managementService.createJobQuery().processDefinitionKey("simpleAsyncProcess").list();
// remove simple async process jobs
for (Job job : jobs) {
managementService.deleteJob(job.getId());
}
}

Comment on lines +8 to +13
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;
import org.junit.After;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ See above, not needed

Suggested change
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;
import org.junit.After;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;

Comment on lines +60 to +62

// given max-job-acquisition threshold to three
processEngineConfiguration.getJobExecutor().setMaxJobsPerAcquisition(3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ No need of this:

Suggested change
// given max-job-acquisition threshold to three
processEngineConfiguration.getJobExecutor().setMaxJobsPerAcquisition(3);

Comment on lines +32 to +41
private RuntimeService runtimeService;
private ManagementService managementService;
private ProcessEngineConfigurationImpl processEngineConfiguration;

@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
managementService = engineRule.getProcessEngine().getManagementService();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Clean up:

Suggested change
private RuntimeService runtimeService;
private ManagementService managementService;
private ProcessEngineConfigurationImpl processEngineConfiguration;
@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
managementService = engineRule.getProcessEngine().getManagementService();
}
protected RuntimeService runtimeService;
protected ProcessEngineConfigurationImpl processEngineConfiguration;
@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
}


import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import org.camunda.bpm.engine.ManagementService;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import org.camunda.bpm.engine.ManagementService;

Comment on lines +5 to +15
import org.camunda.bpm.engine.ManagementService;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.jobexecutor.CallerRunsRejectedJobsHandler;
import org.camunda.bpm.engine.impl.jobexecutor.DefaultJobExecutor;
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;
import org.junit.After;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up:

Suggested change
import org.camunda.bpm.engine.ManagementService;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.jobexecutor.CallerRunsRejectedJobsHandler;
import org.camunda.bpm.engine.impl.jobexecutor.DefaultJobExecutor;
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;
import org.junit.After;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.jobexecutor.CallerRunsRejectedJobsHandler;
import org.camunda.bpm.engine.impl.jobexecutor.DefaultJobExecutor;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.commons.testing.ProcessEngineLoggingRule;

import org.junit.rules.RuleChain;

import java.util.List;
import java.util.concurrent.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Apply project's formatter:

Suggested change
import java.util.concurrent.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Comment on lines +36 to +60
private ManagementService managementService;
private ProcessEngineConfigurationImpl processEngineConfiguration;

@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
managementService = engineRule.getProcessEngine().getManagementService();
}

@After
public void tearDown() {
List<Job> simpleAsyncProcessJobs = managementService.createJobQuery().processDefinitionKey("simpleAsyncProcess").list();
List<Job> testProcessJobs = managementService.createJobQuery().processDefinitionKey("testProcess").list();

// remove simple async process jobs
for (Job job : simpleAsyncProcessJobs) {
managementService.deleteJob(job.getId());
}

// remove test process jobs
for (Job job : testProcessJobs) {
managementService.deleteJob(job.getId());
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Removing of the jobs is taken care of the test rule during the deletion of the deployment:

Suggested change
private ManagementService managementService;
private ProcessEngineConfigurationImpl processEngineConfiguration;
@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
managementService = engineRule.getProcessEngine().getManagementService();
}
@After
public void tearDown() {
List<Job> simpleAsyncProcessJobs = managementService.createJobQuery().processDefinitionKey("simpleAsyncProcess").list();
List<Job> testProcessJobs = managementService.createJobQuery().processDefinitionKey("testProcess").list();
// remove simple async process jobs
for (Job job : simpleAsyncProcessJobs) {
managementService.deleteJob(job.getId());
}
// remove test process jobs
for (Job job : testProcessJobs) {
managementService.deleteJob(job.getId());
}
}
private ProcessEngineConfigurationImpl processEngineConfiguration;
@Before
public void init() {
runtimeService = engineRule.getRuntimeService();
processEngineConfiguration = engineRule.getProcessEngineConfiguration();
}

processEngineConfiguration.getJobExecutor().shutdown();

// look for filled queue logs
List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs currently in queue to be executed for the process engine 'default' is 2 out of the max queue size : 2");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Adjust wording:

Suggested change
List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs currently in queue to be executed for the process engine 'default' is 2 out of the max queue size : 2");
List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs currently in queue to be executed for the process engine 'default': 2 (out of the max queue size : 2)");

Copy link
Member

@yanavasileva yanavasileva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ I have ran the tests a couple of times but they are unstable. Could you please iterate on them to make them reliable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Missing license headers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Missing license headers.

List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Attempting to acquire 3 jobs for the process engine 'default'");

// then find the expected logs
assertThat(filteredLogList.size()).isGreaterThan(4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check if flaky.

List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs failed to Lock during Acquisition of jobs for the process engine 'default' : 0");

// then get logs that states there were no faults during job acquisition locks
assertThat(filteredLogList.size()).isGreaterThan(4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check if flaky.

List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs currently in queue to be executed for the process engine 'default' is 2 out of the max queue size : 2");

// then 3 instances of filled queue logs will be available as there is 2 additional threads possible to reach max-pool-size
assertThat(filteredLogList.size()).isEqualTo(3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check if flaky.

List<ILoggingEvent> filteredLogList = loggingRule.getFilteredLog("Jobs currently in execution for the process engine 'default' : 3");

// then one count of 'three jobs in execution' will be available as 2 out of 5 jobs should be queued.
assertThat(filteredLogList.size()).isEqualTo(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is flaky.

@yanavasileva
Copy link
Member

@prakashpalanisamy, heads up, we approach the code freeze of the minor release.
❗ If you want the feature to be included in 7.22.0, please apply the feedback by 16th of September.

@prakashpalanisamy
Copy link
Author

@prakashpalanisamy, heads up, we approach the code freeze of the minor release. ❗ If you want the feature to be included in 7.22.0, please apply the feedback by 16th of September.

Sure @yanavasileva I'll take this up on priority. Was tied up due to personal matters. I may need your inputs on the Tests though. I observed that some of the new logs and already present logs were getting into the log file while the JE gets loaded. sometime twice. So the number of occurrences were varying on my system as well. Hence I was opting for greater than checks. Will test that once more and reach out to you with specifics for directions. Thank you for assisting me with this PR :)

@prakashpalanisamy
Copy link
Author

@yanavasileva - quick question on the formatting. When I apply the formatting, it also suggest changes to prior code that other authors have contributed in the past, on the files that I have changes in. Do you suggest that I apply them along with my changes? else only to the lines that Im contributing and ignoring other formatting suggestions? Thanks!!

@yanavasileva
Copy link
Member

@prakashpalanisamy,

You can mark only your code and format only that. This will be easier to review by me.

@prakashpalanisamy
Copy link
Author

@yanavasileva - I need your directions for the ManagedJobExecutor changes. Since it will be using the app servers' thread pool, should I set it up in wildfly distribution with ManagedExecutorService configuration? if so, any pointers on how to do the configurations? If you could point to any reference links, it will help me.
Basically, once i put the code in, I want to get it up and running and test manually with some process of mine having async scenarios with java delegates.

@yanavasileva
Copy link
Member

I won't be able to provide input on this, we can create a follow up ticket to explore further ManagedJobExecutor. Feel free to apply the rest of the feedback.

For the tests, I also saw that the log entries can vary. Maybe you can do the assertion for minimum occurrences?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants