Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions libs/utils/src/main/java/com/akto/jobs/JobExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,37 @@

import com.akto.dao.context.Context;
import com.akto.dao.jobs.JobsDao;
import com.akto.dao.notifications.SlackWebhooksDao;
import com.akto.dto.jobs.Job;
import com.akto.dto.jobs.JobParams;
import com.akto.dto.jobs.JobStatus;
import com.akto.dto.jobs.ScheduleType;
import com.akto.dto.notifications.SlackWebhook;
import com.akto.jobs.exception.RetryableJobException;
import com.akto.log.LoggerMaker;
import com.akto.notifications.slack.CustomTextAlert;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.Updates;
import com.slack.api.Slack;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.ArrayList;
import java.util.List;
import org.bson.types.ObjectId;
import org.slf4j.helpers.MessageFormatter;
import org.slf4j.event.Level;

public abstract class JobExecutor<T extends JobParams> {
private static final LoggerMaker logger = new LoggerMaker(JobExecutor.class);

protected final Class<T> paramClass;
private static final Slack SLACK_INSTANCE = Slack.getInstance();
private static final ExecutorService SLACK_EXECUTOR = Executors.newSingleThreadExecutor();

private final List<String> jobLogs = new ArrayList<>();

public JobExecutor(Class<T> paramClass) {
this.paramClass = paramClass;
Expand All @@ -27,6 +42,7 @@ public final void execute(Job job) {
ObjectId jobId = job.getId();
logger.info("Executing job: {}", job);
Job executedJob;
String errorMessage = null;
try {
runJob(job);
executedJob = logSuccess(jobId);
Expand All @@ -35,10 +51,21 @@ public final void execute(Job job) {
executedJob = reScheduleJob(job);
logger.error("Error occurred while executing the job. Re-scheduling the job. {}", executedJob, rex);
} catch (Exception e) {
errorMessage = e.getMessage();
executedJob = logFailure(jobId, e);
logger.error("Error occurred while executing the job. Not re-scheduling. {}", executedJob, e);
}
handleRecurringJob(executedJob);
String capturedLogs = "";
if (!jobLogs.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (String line : jobLogs) {
sb.append(line).append('\n');
}
capturedLogs = sb.toString();
jobLogs.clear();
}
sendSlackAlert(job, errorMessage, capturedLogs);
}

private Job logSuccess(ObjectId id) {
Expand Down Expand Up @@ -100,6 +127,32 @@ protected void updateJobParams(Job job, T params) {
);
}

protected void logAndCollect(Level level, String message, Object... vars) {
try {
switch (level) {
case ERROR:
logger.error(message, vars);
break;
case WARN:
logger.warn(message, vars);
break;
case DEBUG:
logger.debug(message, vars);
break;
default:
logger.info(message, vars);
break;
}
if (level == Level.ERROR) {
vars = MessageFormatter.trimmedCopy(vars);
}
String formatted = MessageFormatter.arrayFormat(message, vars).getMessage();
jobLogs.add("[" + level + "]: " + formatted);
} catch (Exception e) {
logger.error("Error logging message: " + message, e);
}
}

private Job reScheduleJob(Job job) {
int now = Context.now();
return JobsDao.instance.getMCollection().findOneAndUpdate(
Expand Down Expand Up @@ -133,6 +186,38 @@ private void handleRecurringJob(Job job) {
);
}

private void sendSlackAlert(Job job, String errorMessage, String capturedLogs) {
int targetAccountId = job.getAccountId();
SLACK_EXECUTOR.submit(() -> {
Context.accountId.set(1000000);
SlackWebhook slackWebhook = SlackWebhooksDao.instance.findOne(Filters.empty());
if (targetAccountId == 1723492815 && slackWebhook != null) { // send slack alerts only for account
try {
StringBuilder message = new StringBuilder();

message.append("Job ")
.append(errorMessage == null ? "completed successfully. " : "failed. ")
.append("Name: ").append(job.getJobParams().getJobType())
.append(" | Account: ").append(targetAccountId)
.append(" | JobId: ").append(job.getId().toHexString());

if (errorMessage != null) {
message.append(" | Error: ").append(errorMessage);
}

if (capturedLogs != null && !capturedLogs.isEmpty()) {
message.append("\n\nLogs:\n").append(capturedLogs);
}

CustomTextAlert customTextAlert = new CustomTextAlert(message.toString());
SLACK_INSTANCE.send(slackWebhook.getWebhook(), customTextAlert.toJson());
} catch (Exception e) {
logger.error("Error sending slack alert", e);
}
}
});
}

protected abstract void runJob(Job job) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.akto.jobs.JobExecutor;
import com.akto.jobs.utils.JobConstants;
import com.akto.log.LoggerMaker;
import org.slf4j.event.Level;
import com.akto.util.Constants;
import com.akto.util.enums.GlobalEnums.TestRunIssueStatus;
import com.akto.util.http_util.CoreHTTPClient;
Expand Down Expand Up @@ -75,14 +76,14 @@ protected void runJob(Job job) throws Exception {
List<TestingRunIssues> eligibleAktoIssues = TestingRunIssuesDao.instance.findAll(filter);

if (eligibleJiraTickets.isEmpty() && eligibleAktoIssues.isEmpty()) {
logger.info("No eligible issues found for syncing");
logAndCollect(Level.INFO, "No eligible issues found for syncing");
params.setLastSyncedAt(Context.now());
updateJobParams(job, params);
return;
}

if (eligibleJiraTickets.isEmpty()) {
logger.info("No Akto issues to be updated. Updating {} Jira tickets.",
logAndCollect(Level.INFO, "No Akto issues to be updated. Updating {} Jira tickets.",
eligibleAktoIssues.size());
updateJiraTickets(jira, eligibleAktoIssues, aktoToJiraStatusMappings, job);
params.setLastSyncedAt(Context.now());
Expand All @@ -91,7 +92,7 @@ protected void runJob(Job job) throws Exception {
}

if (eligibleAktoIssues.isEmpty()) {
logger.info("No Jira Tickets to be updated. Updating {} Akto issues.",
logAndCollect(Level.INFO, "No Jira Tickets to be updated. Updating {} Akto issues.",
eligibleJiraTickets.size());
Bson query = Filters.and(
Filters.eq(TestingRunIssues.TICKET_PROJECT_KEY, projectKey),
Expand Down Expand Up @@ -134,7 +135,7 @@ protected void runJob(Job job) throws Exception {

// Handle remaining Jira tickets that don't have corresponding Akto issues
if (!eligibleJiraTickets.isEmpty()) {
logger.info("Found {} Jira tickets without corresponding Akto issues", eligibleJiraTickets.size());
logAndCollect(Level.INFO, "Found {} Jira tickets without corresponding Akto issues", eligibleJiraTickets.size());
// Fetch Akto issues by ticket IDs
List<String> ticketIds = new ArrayList<>(eligibleJiraTickets.keySet());
Bson query = Filters.and(
Expand All @@ -145,7 +146,7 @@ protected void runJob(Job job) throws Exception {
List<TestingRunIssues> remainingAktoIssues = TestingRunIssuesDao.instance.findAll(query);

if (!remainingAktoIssues.isEmpty()) {
logger.info("Found {} Akto issues for remaining Jira tickets", remainingAktoIssues.size());
logAndCollect(Level.INFO, "Found {} Akto issues for remaining Jira tickets", remainingAktoIssues.size());
updateAktoIssues(remainingAktoIssues, eligibleJiraTickets, jiraToAktoStatusMappings, job);
}
}
Expand Down Expand Up @@ -177,11 +178,11 @@ private void updateJiraTickets(JiraIntegration jira, List<TestingRunIssues> issu
// Get the corresponding Jira statuses for this Akto status
List<String> jiraStatuses = aktoToJiraStatusMappings.get(aktoStatus);
if (jiraStatuses == null || jiraStatuses.isEmpty()) {
logger.warn("No Jira status mapping found for Akto status: {}", aktoStatus);
logAndCollect(Level.INFO, "No Jira status mapping found for Akto status: {}", aktoStatus);
continue;
}

logger.debug("Found {} statues mapped with akto status {}. Using the first one", jiraStatuses.size(),
logAndCollect(Level.DEBUG, "Found {} jira statuses mapped with akto status {}. Using the first one", jiraStatuses.size(),
aktoStatus);

// Use the first mapped status as the target
Expand All @@ -204,8 +205,8 @@ private void updateJiraTickets(JiraIntegration jira, List<TestingRunIssues> issu

updateJobHeartbeat(job);
if (transitionsMap.isEmpty()) {
logger.info("No transitions found for issues with Akto status: {} to Jira status: {}",
aktoStatus, targetJiraStatus);
logAndCollect(Level.INFO, "No transitions found for issues with Akto status: {} to Jira status: {}",
aktoStatus, targetJiraStatus);
continue;
}

Expand All @@ -224,21 +225,22 @@ private void updateJiraTickets(JiraIntegration jira, List<TestingRunIssues> issu
}
retryCount++;
if (retryCount < maxRetries) {
logger.info("Retrying bulk transition (attempt {}/{})", retryCount + 1, maxRetries);
logAndCollect(Level.INFO, "Retrying bulk transition (attempt {}/{})", retryCount + 1, maxRetries);
Thread.sleep(2000L * retryCount); // Exponential backoff
}
} catch (Exception e) {
retryCount++;
if (retryCount < maxRetries) {
logger.error("Error during bulk transition, retrying (attempt {}/{}): {}",
retryCount + 1, maxRetries, e.getMessage(), e);
logAndCollect(Level.ERROR,
"Error during bulk transition, retrying (attempt {}/{}): {}", retryCount + 1,
maxRetries, e.getMessage(), e);
Thread.sleep(2000L * retryCount); // Exponential backoff
}
}
}

if (success) {
logger.info("Successfully transitioned {} Jira tickets to status: {}. ticketIds: {}",
logAndCollect(Level.DEBUG, "Successfully transitioned {} Jira tickets to status: {}. ticketIds: {}",
issueKeys.size(), targetJiraStatus, issueKeys);
// Update last updated timestamp in Akto
List<WriteModel<TestingRunIssues>> writeModels = new ArrayList<>();
Expand All @@ -252,19 +254,18 @@ private void updateJiraTickets(JiraIntegration jira, List<TestingRunIssues> issu

if (!writeModels.isEmpty()) {
TestingRunIssuesDao.instance.getMCollection().bulkWrite(writeModels);
logger.info("Updated last updated timestamp for {} Akto issues", writeModels.size());
}
} else {
logger.error("Failed to transition Jira tickets to status: {}. ticketIds: {}", targetJiraStatus,
logAndCollect(Level.ERROR, "Failed to transition Jira tickets to status: {}. ticketIds: {}", targetJiraStatus,
issueKeys);
}
} catch (Exception e) {
logger.error("Error getting transitions or performing bulk transition for Akto status: {} to Jira status: {}",
aktoStatus, targetJiraStatus, e);
logAndCollect(Level.ERROR, "Error getting transitions or performing bulk transition for Akto status: {} to Jira status: {}",
aktoStatus, targetJiraStatus, e);
}
}
} catch (Exception e) {
logger.error("Error updating Jira tickets: {}", e.getMessage(), e);
logAndCollect(Level.ERROR, "Error updating Jira tickets: {}", e.getMessage(), e);
}
}

Expand All @@ -281,32 +282,32 @@ private void updateAktoIssues(List<TestingRunIssues> aktoIssues, Map<String, Bas
try {
BasicDBObject jiraIssue = jiraIssues.get(issue.getTicketId());
if (jiraIssue == null) {
logger.warn("No Jira issue found for ticket ID: {}", issue.getTicketId());
logAndCollect(Level.INFO, "No Jira issue found for ticket ID: {}", issue.getTicketId());
continue;
}

String jiraStatus = jiraIssue.getString("ticketStatus");
if (jiraStatus == null) {
logger.warn("No status found in Jira issue for ticket ID: {}", issue.getTicketId());
logAndCollect(Level.INFO, "No status found in Jira issue for ticket ID: {}", issue.getTicketId());
continue;
}

String aktoStatus = jiraToAktoStatusMapping.get(jiraStatus);
if (aktoStatus == null) {
logger.warn("No Akto status mapping found for Jira status: {} (ticket ID: {})",
jiraStatus, issue.getTicketId());
logAndCollect(Level.INFO, "No Akto status mapping found for Jira status: {} (ticket ID: {})",
jiraStatus, issue.getTicketId());
continue;
}

TestRunIssueStatus status = TestRunIssueStatus.valueOf(aktoStatus);

if (status == issue.getTestRunIssueStatus()) {
logger.info("Skipping update for issue: {}, ticketId: {} as status is already: {}", issue.getId(),
logAndCollect(Level.INFO, "Skipping update for issue: {}, ticketId: {} as status is already: {}", issue.getId(),
issue.getTicketId(), status);
continue;
}

logger.info("Updating issue: {}, ticketId: {} with status: {}. old status: {}", issue.getId(),
logAndCollect(Level.INFO, "Updating issue: {}, ticketId: {} with status: {}. old status: {}", issue.getId(),
issue.getTicketId(), status, issue.getTestRunIssueStatus());

int now = Context.now();
Expand All @@ -317,18 +318,18 @@ private void updateAktoIssues(List<TestingRunIssues> aktoIssues, Map<String, Bas
);
writeModelList.add(new UpdateOneModel<>(query, update));
} catch (Exception e) {
logger.error("Error processing Akto issue {}: {}", issue.getId(), e.getMessage());
logAndCollect(Level.ERROR, "Error processing Akto issue {}: {}", issue.getId(), e.getMessage());
}
}

if (!writeModelList.isEmpty()) {
TestingRunIssuesDao.instance.getMCollection().bulkWrite(writeModelList);
logger.info("Updated {} Akto issues out of {}", writeModelList.size(), aktoIssues.size());
logAndCollect(Level.INFO, "Updated {} Akto issues out of {}", writeModelList.size(), aktoIssues.size());
} else {
logger.info("No Akto issues to update");
logAndCollect(Level.INFO, "No Akto issues to update");
}
} catch (Exception e) {
logger.error("Error updating Akto issues: {}", e.getMessage(), e);
logAndCollect(Level.ERROR, "Error updating Akto issues: {}", e.getMessage(), e);
}

updateJobHeartbeat(job);
Expand Down Expand Up @@ -366,7 +367,7 @@ private Map<String, List<String>> getIssueStatusMappings(JiraIntegration jiraInt
// Validate that all TestRunIssueStatus values have mappings
for (TestRunIssueStatus status : TestRunIssueStatus.values()) {
if (!statusMappings.containsKey(status.name())) {
logger.warn("No Jira status mapping found for Akto status: {}", status.name());
logAndCollect(Level.INFO, "No Jira status mapping found for Akto status: {}", status.name());
}
}

Expand Down Expand Up @@ -396,7 +397,7 @@ private Map<String, BasicDBObject> fetchJiraTicketsWithPagination(JiraIntegratio
updatedAfter);
allResults.putAll(pageResults);
} catch (Exception e) {
logger.error("Error fetching Jira tickets.", e);
logAndCollect(Level.ERROR, "Error fetching Jira tickets.", e);
throw e;
}

Expand Down
Loading