diff --git a/libs/utils/src/main/java/com/akto/jobs/JobExecutor.java b/libs/utils/src/main/java/com/akto/jobs/JobExecutor.java index 7cd519e2ec..b9ce6c69ed 100644 --- a/libs/utils/src/main/java/com/akto/jobs/JobExecutor.java +++ b/libs/utils/src/main/java/com/akto/jobs/JobExecutor.java @@ -2,22 +2,37 @@ import com.akto.dao.context.Context; import com.akto.dao.jobs.JobsDao; +import com.akto.dao.notifications.SlackWebhooksDao; import com.akto.dto.jobs.Job; import com.akto.dto.jobs.JobParams; import com.akto.dto.jobs.JobStatus; import com.akto.dto.jobs.ScheduleType; +import com.akto.dto.notifications.SlackWebhook; import com.akto.jobs.exception.RetryableJobException; import com.akto.log.LoggerMaker; +import com.akto.notifications.slack.CustomTextAlert; import com.mongodb.client.model.Filters; import com.mongodb.client.model.FindOneAndUpdateOptions; import com.mongodb.client.model.ReturnDocument; import com.mongodb.client.model.Updates; +import com.slack.api.Slack; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.ArrayList; +import java.util.List; import org.bson.types.ObjectId; +import org.slf4j.helpers.MessageFormatter; +import org.slf4j.event.Level; public abstract class JobExecutor { private static final LoggerMaker logger = new LoggerMaker(JobExecutor.class); protected final Class paramClass; + private static final Slack SLACK_INSTANCE = Slack.getInstance(); + private static final ExecutorService SLACK_EXECUTOR = Executors.newSingleThreadExecutor(); + + private final List jobLogs = new ArrayList<>(); public JobExecutor(Class paramClass) { this.paramClass = paramClass; @@ -27,6 +42,7 @@ public final void execute(Job job) { ObjectId jobId = job.getId(); logger.info("Executing job: {}", job); Job executedJob; + String errorMessage = null; try { runJob(job); executedJob = logSuccess(jobId); @@ -35,10 +51,21 @@ public final void execute(Job job) { executedJob = reScheduleJob(job); logger.error("Error occurred while executing the job. Re-scheduling the job. {}", executedJob, rex); } catch (Exception e) { + errorMessage = e.getMessage(); executedJob = logFailure(jobId, e); logger.error("Error occurred while executing the job. Not re-scheduling. {}", executedJob, e); } handleRecurringJob(executedJob); + String capturedLogs = ""; + if (!jobLogs.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (String line : jobLogs) { + sb.append(line).append('\n'); + } + capturedLogs = sb.toString(); + jobLogs.clear(); + } + sendSlackAlert(job, errorMessage, capturedLogs); } private Job logSuccess(ObjectId id) { @@ -100,6 +127,32 @@ protected void updateJobParams(Job job, T params) { ); } + protected void logAndCollect(Level level, String message, Object... vars) { + try { + switch (level) { + case ERROR: + logger.error(message, vars); + break; + case WARN: + logger.warn(message, vars); + break; + case DEBUG: + logger.debug(message, vars); + break; + default: + logger.info(message, vars); + break; + } + if (level == Level.ERROR) { + vars = MessageFormatter.trimmedCopy(vars); + } + String formatted = MessageFormatter.arrayFormat(message, vars).getMessage(); + jobLogs.add("[" + level + "]: " + formatted); + } catch (Exception e) { + logger.error("Error logging message: " + message, e); + } + } + private Job reScheduleJob(Job job) { int now = Context.now(); return JobsDao.instance.getMCollection().findOneAndUpdate( @@ -133,6 +186,38 @@ private void handleRecurringJob(Job job) { ); } + private void sendSlackAlert(Job job, String errorMessage, String capturedLogs) { + int targetAccountId = job.getAccountId(); + SLACK_EXECUTOR.submit(() -> { + Context.accountId.set(1000000); + SlackWebhook slackWebhook = SlackWebhooksDao.instance.findOne(Filters.empty()); + if (targetAccountId == 1723492815 && slackWebhook != null) { // send slack alerts only for account + try { + StringBuilder message = new StringBuilder(); + + message.append("Job ") + .append(errorMessage == null ? "completed successfully. " : "failed. ") + .append("Name: ").append(job.getJobParams().getJobType()) + .append(" | Account: ").append(targetAccountId) + .append(" | JobId: ").append(job.getId().toHexString()); + + if (errorMessage != null) { + message.append(" | Error: ").append(errorMessage); + } + + if (capturedLogs != null && !capturedLogs.isEmpty()) { + message.append("\n\nLogs:\n").append(capturedLogs); + } + + CustomTextAlert customTextAlert = new CustomTextAlert(message.toString()); + SLACK_INSTANCE.send(slackWebhook.getWebhook(), customTextAlert.toJson()); + } catch (Exception e) { + logger.error("Error sending slack alert", e); + } + } + }); + } + protected abstract void runJob(Job job) throws Exception; } diff --git a/libs/utils/src/main/java/com/akto/jobs/executors/TicketSyncJobExecutor.java b/libs/utils/src/main/java/com/akto/jobs/executors/TicketSyncJobExecutor.java index 2cdd466d5d..f332e82243 100644 --- a/libs/utils/src/main/java/com/akto/jobs/executors/TicketSyncJobExecutor.java +++ b/libs/utils/src/main/java/com/akto/jobs/executors/TicketSyncJobExecutor.java @@ -13,6 +13,7 @@ import com.akto.jobs.JobExecutor; import com.akto.jobs.utils.JobConstants; import com.akto.log.LoggerMaker; +import org.slf4j.event.Level; import com.akto.util.Constants; import com.akto.util.enums.GlobalEnums.TestRunIssueStatus; import com.akto.util.http_util.CoreHTTPClient; @@ -75,14 +76,14 @@ protected void runJob(Job job) throws Exception { List eligibleAktoIssues = TestingRunIssuesDao.instance.findAll(filter); if (eligibleJiraTickets.isEmpty() && eligibleAktoIssues.isEmpty()) { - logger.info("No eligible issues found for syncing"); + logAndCollect(Level.INFO, "No eligible issues found for syncing"); params.setLastSyncedAt(Context.now()); updateJobParams(job, params); return; } if (eligibleJiraTickets.isEmpty()) { - logger.info("No Akto issues to be updated. Updating {} Jira tickets.", + logAndCollect(Level.INFO, "No Akto issues to be updated. Updating {} Jira tickets.", eligibleAktoIssues.size()); updateJiraTickets(jira, eligibleAktoIssues, aktoToJiraStatusMappings, job); params.setLastSyncedAt(Context.now()); @@ -91,7 +92,7 @@ protected void runJob(Job job) throws Exception { } if (eligibleAktoIssues.isEmpty()) { - logger.info("No Jira Tickets to be updated. Updating {} Akto issues.", + logAndCollect(Level.INFO, "No Jira Tickets to be updated. Updating {} Akto issues.", eligibleJiraTickets.size()); Bson query = Filters.and( Filters.eq(TestingRunIssues.TICKET_PROJECT_KEY, projectKey), @@ -134,7 +135,7 @@ protected void runJob(Job job) throws Exception { // Handle remaining Jira tickets that don't have corresponding Akto issues if (!eligibleJiraTickets.isEmpty()) { - logger.info("Found {} Jira tickets without corresponding Akto issues", eligibleJiraTickets.size()); + logAndCollect(Level.INFO, "Found {} Jira tickets without corresponding Akto issues", eligibleJiraTickets.size()); // Fetch Akto issues by ticket IDs List ticketIds = new ArrayList<>(eligibleJiraTickets.keySet()); Bson query = Filters.and( @@ -145,7 +146,7 @@ protected void runJob(Job job) throws Exception { List remainingAktoIssues = TestingRunIssuesDao.instance.findAll(query); if (!remainingAktoIssues.isEmpty()) { - logger.info("Found {} Akto issues for remaining Jira tickets", remainingAktoIssues.size()); + logAndCollect(Level.INFO, "Found {} Akto issues for remaining Jira tickets", remainingAktoIssues.size()); updateAktoIssues(remainingAktoIssues, eligibleJiraTickets, jiraToAktoStatusMappings, job); } } @@ -177,11 +178,11 @@ private void updateJiraTickets(JiraIntegration jira, List issu // Get the corresponding Jira statuses for this Akto status List jiraStatuses = aktoToJiraStatusMappings.get(aktoStatus); if (jiraStatuses == null || jiraStatuses.isEmpty()) { - logger.warn("No Jira status mapping found for Akto status: {}", aktoStatus); + logAndCollect(Level.INFO, "No Jira status mapping found for Akto status: {}", aktoStatus); continue; } - logger.debug("Found {} statues mapped with akto status {}. Using the first one", jiraStatuses.size(), + logAndCollect(Level.DEBUG, "Found {} jira statuses mapped with akto status {}. Using the first one", jiraStatuses.size(), aktoStatus); // Use the first mapped status as the target @@ -204,8 +205,8 @@ private void updateJiraTickets(JiraIntegration jira, List issu updateJobHeartbeat(job); if (transitionsMap.isEmpty()) { - logger.info("No transitions found for issues with Akto status: {} to Jira status: {}", - aktoStatus, targetJiraStatus); + logAndCollect(Level.INFO, "No transitions found for issues with Akto status: {} to Jira status: {}", + aktoStatus, targetJiraStatus); continue; } @@ -224,21 +225,22 @@ private void updateJiraTickets(JiraIntegration jira, List issu } retryCount++; if (retryCount < maxRetries) { - logger.info("Retrying bulk transition (attempt {}/{})", retryCount + 1, maxRetries); + logAndCollect(Level.INFO, "Retrying bulk transition (attempt {}/{})", retryCount + 1, maxRetries); Thread.sleep(2000L * retryCount); // Exponential backoff } } catch (Exception e) { retryCount++; if (retryCount < maxRetries) { - logger.error("Error during bulk transition, retrying (attempt {}/{}): {}", - retryCount + 1, maxRetries, e.getMessage(), e); + logAndCollect(Level.ERROR, + "Error during bulk transition, retrying (attempt {}/{}): {}", retryCount + 1, + maxRetries, e.getMessage(), e); Thread.sleep(2000L * retryCount); // Exponential backoff } } } if (success) { - logger.info("Successfully transitioned {} Jira tickets to status: {}. ticketIds: {}", + logAndCollect(Level.DEBUG, "Successfully transitioned {} Jira tickets to status: {}. ticketIds: {}", issueKeys.size(), targetJiraStatus, issueKeys); // Update last updated timestamp in Akto List> writeModels = new ArrayList<>(); @@ -252,19 +254,18 @@ private void updateJiraTickets(JiraIntegration jira, List issu if (!writeModels.isEmpty()) { TestingRunIssuesDao.instance.getMCollection().bulkWrite(writeModels); - logger.info("Updated last updated timestamp for {} Akto issues", writeModels.size()); } } else { - logger.error("Failed to transition Jira tickets to status: {}. ticketIds: {}", targetJiraStatus, + logAndCollect(Level.ERROR, "Failed to transition Jira tickets to status: {}. ticketIds: {}", targetJiraStatus, issueKeys); } } catch (Exception e) { - logger.error("Error getting transitions or performing bulk transition for Akto status: {} to Jira status: {}", - aktoStatus, targetJiraStatus, e); + logAndCollect(Level.ERROR, "Error getting transitions or performing bulk transition for Akto status: {} to Jira status: {}", + aktoStatus, targetJiraStatus, e); } } } catch (Exception e) { - logger.error("Error updating Jira tickets: {}", e.getMessage(), e); + logAndCollect(Level.ERROR, "Error updating Jira tickets: {}", e.getMessage(), e); } } @@ -281,32 +282,32 @@ private void updateAktoIssues(List aktoIssues, Map aktoIssues, Map(query, update)); } catch (Exception e) { - logger.error("Error processing Akto issue {}: {}", issue.getId(), e.getMessage()); + logAndCollect(Level.ERROR, "Error processing Akto issue {}: {}", issue.getId(), e.getMessage()); } } if (!writeModelList.isEmpty()) { TestingRunIssuesDao.instance.getMCollection().bulkWrite(writeModelList); - logger.info("Updated {} Akto issues out of {}", writeModelList.size(), aktoIssues.size()); + logAndCollect(Level.INFO, "Updated {} Akto issues out of {}", writeModelList.size(), aktoIssues.size()); } else { - logger.info("No Akto issues to update"); + logAndCollect(Level.INFO, "No Akto issues to update"); } } catch (Exception e) { - logger.error("Error updating Akto issues: {}", e.getMessage(), e); + logAndCollect(Level.ERROR, "Error updating Akto issues: {}", e.getMessage(), e); } updateJobHeartbeat(job); @@ -366,7 +367,7 @@ private Map> getIssueStatusMappings(JiraIntegration jiraInt // Validate that all TestRunIssueStatus values have mappings for (TestRunIssueStatus status : TestRunIssueStatus.values()) { if (!statusMappings.containsKey(status.name())) { - logger.warn("No Jira status mapping found for Akto status: {}", status.name()); + logAndCollect(Level.INFO, "No Jira status mapping found for Akto status: {}", status.name()); } } @@ -396,7 +397,7 @@ private Map fetchJiraTicketsWithPagination(JiraIntegratio updatedAfter); allResults.putAll(pageResults); } catch (Exception e) { - logger.error("Error fetching Jira tickets.", e); + logAndCollect(Level.ERROR, "Error fetching Jira tickets.", e); throw e; }