From 56048858365253e50986a8cb205b8fe41b5e4ebf Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Tue, 23 Sep 2025 11:58:19 +0000 Subject: [PATCH 1/7] Add configurable archivalDays (30/60/90) with UI, backend, proto, and cron support --- .../threat_detection/ThreatConfiguration.java | 1 + .../ArchivalConfigComponent.jsx | 85 ++++++ .../ThreatConfiguration.jsx | 6 + .../java/com/akto/threat/backend/Main.java | 9 + .../cron/ArchiveOldMaliciousEventsCron.java | 265 ++++++++++++++++++ .../backend/service/ThreatActorService.java | 25 ++ .../dashboard_service/v1/service.proto | 1 + 7 files changed, 392 insertions(+) create mode 100644 apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java diff --git a/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java b/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java index 81a3065cce..40cad38b0c 100644 --- a/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java +++ b/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java @@ -10,6 +10,7 @@ public class ThreatConfiguration { private Actor actor; private RatelimitConfig ratelimitConfig; + private Integer archivalDays; @lombok.Getter @lombok.Setter diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx new file mode 100644 index 0000000000..bf8da94e39 --- /dev/null +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx @@ -0,0 +1,85 @@ +import { useState, useEffect } from "react"; +import func from "@/util/func" +import { LegacyCard, VerticalStack, Divider, Text, Button, Box } from "@shopify/polaris"; +import api from "../../../pages/threat_detection/api.js"; +import Dropdown from "../../../components/layouts/Dropdown.jsx"; + +const ArchivalConfigComponent = ({ title, description }) => { + const [archivalDays, setArchivalDays] = useState(60); + const [isSaveDisabled, setIsSaveDisabled] = useState(true); + + const fetchData = async () => { + const response = await api.fetchThreatConfiguration(); + const days = response?.threatConfiguration?.archivalDays; + const value = days === 30 || days === 60 || days === 90 ? days : 60; + setArchivalDays(value); + setIsSaveDisabled(true); + }; + + const onSave = async () => { + const payload = { + archivalDays: archivalDays + }; + await api.modifyThreatConfiguration(payload).then(() => { + try { + func.setToast(true, false, "Archival time saved successfully"); + fetchData() + } catch (error) { + func.setToast(true, true, "Error saving archival time"); + } + }); + }; + + useEffect(() => { + fetchData(); + }, []); + + const options = [ + { value: 30, label: "30 days" }, + { value: 60, label: "60 days" }, + { value: 90, label: "90 days" }, + ]; + + function TitleComponent({ title, description }) { + return ( + + {title} + + {description} + + + ) + } + + const onChange = (val) => { + setArchivalDays(val); + setIsSaveDisabled(false); + }; + + return ( + } + primaryFooterAction={{ + content: 'Save', + onAction: onSave, + loading: false, + disabled: isSaveDisabled + }} + > + + + + onChange(val)} + label="Archival Time" + initial={() => archivalDays} + /> + + + + ); +}; + +export default ArchivalConfigComponent; + + diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx index fc8f256711..706974be45 100644 --- a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx @@ -1,6 +1,7 @@ import PageWithMultipleCards from "../../../components/layouts/PageWithMultipleCards" import ThreatActorConfigComponent from "./ThreatActorConfig.jsx" import RatelimitConfigComponent from "./RatelimitConfigComponent.jsx" +import ArchivalConfigComponent from "./ArchivalConfigComponent.jsx" function ThreatConfiguration() { @@ -14,6 +15,11 @@ function ThreatConfiguration() { title={"Rate Limit Configuration"} description={"Configure rate limiting rules to protect your APIs from abuse."} key={"ratelimitConfig"} + />, + ]; diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java index acac265e58..50c006e74c 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java @@ -14,6 +14,7 @@ import com.akto.threat.backend.service.ThreatActorService; import com.akto.threat.backend.service.ThreatApiService; import com.akto.threat.backend.tasks.FlushMessagesToDB; +import com.akto.threat.backend.cron.ArchiveOldMaliciousEventsCron; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.ReadPreference; @@ -73,6 +74,14 @@ public static void main(String[] args) throws Exception { ApiDistributionDataService apiDistributionDataService = new ApiDistributionDataService(threatProtectionMongo); new BackendVerticle(maliciousEventService, threatActorService, threatApiService, apiDistributionDataService).start(); + + try { + logger.infoAndAddToDb("Starting ArchiveOldMaliciousEventsCron for all databases", LoggerMaker.LogDb.RUNTIME); + new ArchiveOldMaliciousEventsCron(threatProtectionMongo).cron(); + logger.infoAndAddToDb("Scheduled ArchiveOldMaliciousEventsCron", LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Error starting ArchiveOldMaliciousEventsCron: " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } } } diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java new file mode 100644 index 0000000000..fdf320dde1 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -0,0 +1,265 @@ +package com.akto.threat.backend.cron; + +import com.akto.log.LoggerMaker; +import com.akto.dao.context.Context; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.Sorts; +import com.mongodb.client.model.WriteModel; +import org.bson.Document; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ArchiveOldMaliciousEventsCron implements Runnable { + + private static final LoggerMaker logger = new LoggerMaker(ArchiveOldMaliciousEventsCron.class); + + private static final String SOURCE_COLLECTION = "malicious_events"; + private static final String DEST_COLLECTION = "archived_malicious_events"; + private static final int BATCH_SIZE = 5000; + private static final long DEFAULT_RETENTION_DAYS = 60L; // default, can be overridden from DB + private static final long MAX_SOURCE_DOCS = 400_000L; // cap size + + private final MongoClient mongoClient; + private final ScheduledExecutorService scheduler; + + public ArchiveOldMaliciousEventsCron(MongoClient mongoClient) { + this.mongoClient = mongoClient; + this.scheduler = Executors.newScheduledThreadPool(1); + } + + public void cron() { + long initialDelaySeconds = 0; + long periodSeconds = Duration.ofHours(6).getSeconds(); + scheduler.scheduleAtFixedRate(this, initialDelaySeconds, periodSeconds, TimeUnit.SECONDS); + logger.infoAndAddToDb("Scheduled ArchiveOldMaliciousEventsCron every 6 hours", LoggerMaker.LogDb.RUNTIME); + } + + @Override + public void run() { + try { + runOnce(); + } catch (Throwable t) { + logger.errorAndAddToDb("Archive cron failed unexpectedly: " + t.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + } + + public void runOnce() { + long nowSeconds = System.currentTimeMillis() / 1000L; // epoch seconds + + try (MongoCursor dbNames = mongoClient.listDatabaseNames().cursor()) { + while (dbNames.hasNext()) { + String dbName = dbNames.next(); + if (shouldSkipDatabase(dbName)) continue; + + Integer accId = null; + try { + try { + accId = Integer.parseInt(dbName); + Context.accountId.set(accId); + } catch (Exception ignore) { + // leave context unset for non-numeric db names + } + processDatabase(dbName, nowSeconds); + } catch (Exception e) { + logger.errorAndAddToDb("Error processing database: " + dbName + " : " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } finally { + Context.resetContextThreadLocals(); + } + } + } + } + + private boolean shouldSkipDatabase(String dbName) { + return dbName == null || dbName.isEmpty() + || "admin".equals(dbName) + || "local".equals(dbName) + || "config".equals(dbName); + } + + private void processDatabase(String dbName, long nowSeconds) { + MongoDatabase db = mongoClient.getDatabase(dbName); + ensureCollectionExists(db, DEST_COLLECTION); + + long retentionDays = fetchRetentionDays(db); + if (!(retentionDays == 30L || retentionDays == 60L || retentionDays == 90L)) { + retentionDays = DEFAULT_RETENTION_DAYS; + } + long threshold = nowSeconds - (retentionDays * 24 * 60 * 60); + + MongoCollection source = db.getCollection(SOURCE_COLLECTION, Document.class); + MongoCollection dest = db.getCollection(DEST_COLLECTION, Document.class); + + int totalMoved = 0; + + while (true) { + List batch = new ArrayList<>(BATCH_SIZE); + try (MongoCursor cursor = source + .find(Filters.lte("detectedAt", threshold)) + .sort(Sorts.ascending("detectedAt")) + .limit(BATCH_SIZE) + .cursor()) { + while (cursor.hasNext()) { + batch.add(cursor.next()); + } + } + + if (batch.isEmpty()) break; + + List> writes = new ArrayList<>(batch.size()); + Set ids = new HashSet<>(); + for (Document doc : batch) { + Object id = doc.get("_id"); + ids.add(id); + writes.add(new ReplaceOneModel<>( + Filters.eq("_id", id), + doc, + new ReplaceOptions().upsert(true) + )); + } + + // Insert into archive asynchronously, deletion proceeds synchronously + final List> writeSnapshot = new ArrayList<>(writes); + this.scheduler.submit(() -> { + try { + dest.bulkWrite(writeSnapshot); + } catch (MongoBulkWriteException bwe) { + logger.errorAndAddToDb("Async bulk write error archiving to " + DEST_COLLECTION + " in db " + dbName + ": " + bwe.getMessage(), LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Async error writing archive batch in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + }); + + try { + long deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); + totalMoved += (int) deleted; + logger.infoAndAddToDb("Archived and deleted " + deleted + " events from db " + dbName, LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Failed to delete archived documents from source in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + + if (batch.size() < BATCH_SIZE) { + break; + } + } + + if (totalMoved > 0) { + logger.infoAndAddToDb("Completed archiving for db " + dbName + ", total moved: " + totalMoved, LoggerMaker.LogDb.RUNTIME); + } + + // Enforce collection size cap by trimming oldest docs beyond 400k. + try { + trimCollectionIfExceedsCap(dbName, source, dest); + } catch (Exception e) { + logger.errorAndAddToDb("Error trimming collection to cap in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + } + + private long fetchRetentionDays(MongoDatabase db) { + try { + MongoCollection cfg = db.getCollection("threat_configuration", Document.class); + Document doc = cfg.find().first(); + if (doc == null) return DEFAULT_RETENTION_DAYS; + Object val = doc.get("archivalDays"); + if (val instanceof Number) { + long days = ((Number) val).longValue(); + if (days == 30L || days == 60L || days == 90L) return days; + } + } catch (Exception e) { + logger.errorAndAddToDb("Failed fetching archivalDays from threat_configuration for db " + db.getName() + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + return DEFAULT_RETENTION_DAYS; + } + + private void ensureCollectionExists(MongoDatabase db, String collectionName) { + boolean exists = false; + try (MongoCursor it = db.listCollectionNames().cursor()) { + while (it.hasNext()) { + if (collectionName.equals(it.next())) { + exists = true; + break; + } + } + } + if (!exists) { + try { + db.createCollection(collectionName); + } catch (Exception e) { + logger.infoAndAddToDb("Tried creating collection: " + collectionName + " -> " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + } + } + + private void trimCollectionIfExceedsCap(String dbName, MongoCollection source, MongoCollection dest) { + long approxCount = source.countDocuments(); + + if (approxCount <= MAX_SOURCE_DOCS) return; + + long toRemove = approxCount - MAX_SOURCE_DOCS; + long totalDeleted = 0L; + logger.infoAndAddToDb("Starting overflow trim in db " + dbName + ": approxCount=" + approxCount + ", toRemove=" + toRemove, LoggerMaker.LogDb.RUNTIME); + + while (toRemove > 0) { + int batch = (int) Math.min(BATCH_SIZE, toRemove); + + List oldestDocs = new ArrayList<>(batch); + try (MongoCursor cursor = source + .find() + .sort(Sorts.ascending("detectedAt")) + .limit(batch) + .cursor()) { + while (cursor.hasNext()) { + oldestDocs.add(cursor.next()); + } + } + + if (oldestDocs.isEmpty()) break; + + final List docsSnapshot = new ArrayList<>(oldestDocs); + this.scheduler.submit(() -> { + try { + dest.insertMany(docsSnapshot); + } catch (Exception e) { + logger.errorAndAddToDb("Async archive insert failed in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + }); + + Set ids = new HashSet<>(); + for (Document d : oldestDocs) { + ids.add(d.get("_id")); + } + long deleted = 0L; + try { + deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); + } catch (Exception e) { + logger.errorAndAddToDb("Overflow trim delete failed in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + + totalDeleted += deleted; + toRemove -= deleted; + + if (deleted < batch) { + break; + } + } + + if (totalDeleted > 0) { + logger.infoAndAddToDb("Completed overflow trim in db " + dbName + ": deleted=" + totalDeleted, LoggerMaker.LogDb.RUNTIME); + } + } +} + + diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java index 16ce6b6e81..9ffc0c9d9d 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java @@ -85,6 +85,18 @@ public ThreatConfiguration fetchThreatConfiguration(String accountId) { if (rateLimitConfig != null) { builder.setRatelimitConfig(rateLimitConfig); } + + // Handle archival days + Integer archivalDays = null; + try { + Object val = doc.get("archivalDays"); + if (val instanceof Number) { + archivalDays = ((Number) val).intValue(); + } + } catch (Exception ignore) {} + if (archivalDays != null) { + builder.setArchivalDays(archivalDays); + } } return builder.build(); } @@ -120,6 +132,13 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon newDoc.append("ratelimitConfig", ratelimitConfigDoc.get("ratelimitConfig")); } } + + { + int val = updatedConfig.getArchivalDays(); + if (val == 30 || val == 60 || val == 90) { + newDoc.append("archivalDays", val); + } + } Document existingDoc = coll.find().first(); @@ -137,6 +156,12 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon if (updatedConfig.hasRatelimitConfig()) { builder.setRatelimitConfig(updatedConfig.getRatelimitConfig()); } + { + int val = updatedConfig.getArchivalDays(); + if (val == 30 || val == 60 || val == 90) { + builder.setArchivalDays(val); + } + } return builder.build(); } diff --git a/protobuf/threat_detection/service/dashboard_service/v1/service.proto b/protobuf/threat_detection/service/dashboard_service/v1/service.proto index e331dddcae..20be51004a 100644 --- a/protobuf/threat_detection/service/dashboard_service/v1/service.proto +++ b/protobuf/threat_detection/service/dashboard_service/v1/service.proto @@ -321,6 +321,7 @@ message RatelimitConfig { message ThreatConfiguration { Actor actor = 1; RatelimitConfig ratelimit_config = 2; + int32 archival_days = 3; } message ApiDistributionDataRequestPayload { From c30ced3e3b5693dd38bcb08d7c07e4343025460e Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Tue, 23 Sep 2025 12:08:25 +0000 Subject: [PATCH 2/7] Add unit tests for ArchiveOldMaliciousEventsCron --- .../ArchiveOldMaliciousEventsCronTest.java | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java diff --git a/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java new file mode 100644 index 0000000000..460e33c761 --- /dev/null +++ b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java @@ -0,0 +1,134 @@ +package com.akto.threat.backend.cron; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class ArchiveOldMaliciousEventsCronTest { + + @Mock + private MongoClient mongoClient; + + @Mock + private MongoDatabase mongoDatabase; + + @Mock + private MongoCollection cfgCollection; + + @Mock + private MongoCollection sourceCollection; + + @Mock + private MongoCollection destCollection; + + private ArchiveOldMaliciousEventsCron cron; + + @Before + public void setup() { + cron = new ArchiveOldMaliciousEventsCron(mongoClient); + } + + @Test + public void testFetchRetentionDays_DefaultWhenMissing() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + when(cfgCollection.find().first()).thenReturn(null); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(60L, days); + } + + @Test + public void testFetchRetentionDays_Valid90() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + Document cfg = new Document("archivalDays", 90); + when(cfgCollection.find().first()).thenReturn(cfg); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(90L, days); + } + + @Test + public void testFetchRetentionDays_InvalidFallsBack() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + Document cfg = new Document("archivalDays", 45); + when(cfgCollection.find().first()).thenReturn(cfg); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(60L, days); + } + + @Test + public void testTrimCollectionIfExceedsCap_DeletesOldest() throws Exception { + when(sourceCollection.countDocuments()).thenReturn(410_000L); + + // Build a small batch of oldest docs + List oldest = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + oldest.add(new Document("_id", "id-" + i).append("detectedAt", i)); + } + + // Mock find().sort().limit().cursor() + com.mongodb.client.FindIterable findIt = mock(com.mongodb.client.FindIterable.class); + when(sourceCollection.find()).thenReturn(findIt); + when(findIt.sort(any())).thenReturn(findIt); + when(findIt.limit(anyInt())).thenReturn(findIt); + MongoCursor cursor = mock(MongoCursor.class); + when(findIt.cursor()).thenReturn(cursor); + // Cursor iteration + when(cursor.hasNext()).thenAnswer(inv -> { + int idx = (int) cursorState[0]; + return idx < oldest.size(); + }); + when(cursor.next()).thenAnswer(inv -> { + int idx = (int) cursorState[0]; + Document d = oldest.get(idx); + cursorState[0] = idx + 1; + return d; + }); + + // deleteMany returns count + when(sourceCollection.deleteMany(any())).thenReturn(new com.mongodb.client.result.DeleteResult() { + @Override public boolean wasAcknowledged() { return true; } + @Override public long getDeletedCount() { return oldest.size(); } + }); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod( + "trimCollectionIfExceedsCap", String.class, MongoCollection.class, MongoCollection.class); + m.setAccessible(true); + // reset cursor state before invoke + cursorState[0] = 0; + m.invoke(cron, "1000", sourceCollection, destCollection); + + verify(sourceCollection, atLeastOnce()).deleteMany(any()); + } + + // mutable state for cursor index in answers + private final Object[] cursorState = new Object[]{0}; +} + + From e8f2f3b02df9f5adcf7db18568b53c5d78343023 Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Wed, 24 Sep 2025 11:17:15 +0000 Subject: [PATCH 3/7] Refactor code --- .../ArchivalConfigComponent.jsx | 14 +- .../cron/ArchiveOldMaliciousEventsCron.java | 131 +++++++++--------- .../backend/service/ThreatActorService.java | 4 +- 3 files changed, 79 insertions(+), 70 deletions(-) diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx index bf8da94e39..2739fafc15 100644 --- a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx @@ -68,12 +68,14 @@ const ArchivalConfigComponent = ({ title, description }) => { - onChange(val)} - label="Archival Time" - initial={() => archivalDays} - /> + + onChange(val)} + label="Archival Time" + initial={() => archivalDays} + /> + diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java index fdf320dde1..76830f20d2 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -31,6 +31,8 @@ public class ArchiveOldMaliciousEventsCron implements Runnable { private static final String DEST_COLLECTION = "archived_malicious_events"; private static final int BATCH_SIZE = 5000; private static final long DEFAULT_RETENTION_DAYS = 60L; // default, can be overridden from DB + private static final long MIN_RETENTION_DAYS = 30L; + private static final long MAX_RETENTION_DAYS = 90L; private static final long MAX_SOURCE_DOCS = 400_000L; // cap size private final MongoClient mongoClient; @@ -73,7 +75,11 @@ public void runOnce() { } catch (Exception ignore) { // leave context unset for non-numeric db names } - processDatabase(dbName, nowSeconds); + if (accId != null) { + processDatabase(dbName, nowSeconds); + } else { + logger.infoAndAddToDb("Skipping archive for db as context wasn't set: " + dbName, LoggerMaker.LogDb.RUNTIME); + } } catch (Exception e) { logger.errorAndAddToDb("Error processing database: " + dbName + " : " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); } finally { @@ -92,12 +98,12 @@ private boolean shouldSkipDatabase(String dbName) { private void processDatabase(String dbName, long nowSeconds) { MongoDatabase db = mongoClient.getDatabase(dbName); - ensureCollectionExists(db, DEST_COLLECTION); + if (!ensureCollectionExists(db, DEST_COLLECTION)) { + logger.infoAndAddToDb("Archive collection missing, skipping db: " + dbName, LoggerMaker.LogDb.RUNTIME); + return; + } long retentionDays = fetchRetentionDays(db); - if (!(retentionDays == 30L || retentionDays == 60L || retentionDays == 90L)) { - retentionDays = DEFAULT_RETENTION_DAYS; - } long threshold = nowSeconds - (retentionDays * 24 * 60 * 60); MongoCollection source = db.getCollection(SOURCE_COLLECTION, Document.class); @@ -106,6 +112,7 @@ private void processDatabase(String dbName, long nowSeconds) { int totalMoved = 0; while (true) { + long iterationStartNanos = System.nanoTime(); List batch = new ArrayList<>(BATCH_SIZE); try (MongoCursor cursor = source .find(Filters.lte("detectedAt", threshold)) @@ -119,37 +126,18 @@ private void processDatabase(String dbName, long nowSeconds) { if (batch.isEmpty()) break; - List> writes = new ArrayList<>(batch.size()); Set ids = new HashSet<>(); for (Document doc : batch) { - Object id = doc.get("_id"); - ids.add(id); - writes.add(new ReplaceOneModel<>( - Filters.eq("_id", id), - doc, - new ReplaceOptions().upsert(true) - )); + ids.add(doc.get("_id")); } - // Insert into archive asynchronously, deletion proceeds synchronously - final List> writeSnapshot = new ArrayList<>(writes); - this.scheduler.submit(() -> { - try { - dest.bulkWrite(writeSnapshot); - } catch (MongoBulkWriteException bwe) { - logger.errorAndAddToDb("Async bulk write error archiving to " + DEST_COLLECTION + " in db " + dbName + ": " + bwe.getMessage(), LoggerMaker.LogDb.RUNTIME); - } catch (Exception e) { - logger.errorAndAddToDb("Async error writing archive batch in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); - } - }); + asyncUpsertToArchive(batch, dest, dbName); - try { - long deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); - totalMoved += (int) deleted; - logger.infoAndAddToDb("Archived and deleted " + deleted + " events from db " + dbName, LoggerMaker.LogDb.RUNTIME); - } catch (Exception e) { - logger.errorAndAddToDb("Failed to delete archived documents from source in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); - } + long deleted = deleteByIds(source, ids, dbName); + totalMoved += (int) deleted; + + long iterationElapsedMs = (System.nanoTime() - iterationStartNanos) / 1_000_000L; + logger.infoAndAddToDb("Archive loop iteration in db " + dbName + ": batch=" + batch.size() + ", deleted=" + deleted + ", tookMs=" + iterationElapsedMs, LoggerMaker.LogDb.RUNTIME); if (batch.size() < BATCH_SIZE) { break; @@ -176,7 +164,10 @@ private long fetchRetentionDays(MongoDatabase db) { Object val = doc.get("archivalDays"); if (val instanceof Number) { long days = ((Number) val).longValue(); - if (days == 30L || days == 60L || days == 90L) return days; + if (days < MIN_RETENTION_DAYS || days > MAX_RETENTION_DAYS) { + return DEFAULT_RETENTION_DAYS; + } + return days; } } catch (Exception e) { logger.errorAndAddToDb("Failed fetching archivalDays from threat_configuration for db " + db.getName() + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); @@ -184,22 +175,15 @@ private long fetchRetentionDays(MongoDatabase db) { return DEFAULT_RETENTION_DAYS; } - private void ensureCollectionExists(MongoDatabase db, String collectionName) { - boolean exists = false; - try (MongoCursor it = db.listCollectionNames().cursor()) { - while (it.hasNext()) { - if (collectionName.equals(it.next())) { - exists = true; - break; - } - } - } - if (!exists) { - try { - db.createCollection(collectionName); - } catch (Exception e) { - logger.infoAndAddToDb("Tried creating collection: " + collectionName + " -> " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); - } + private boolean ensureCollectionExists(MongoDatabase db, String collectionName) { + try { + Document first = db.listCollections(Document.class) + .filter(Filters.eq("name", collectionName)) + .first(); + return first != null; + } catch (Exception e) { + logger.errorAndAddToDb("Error checking existence for collection: " + collectionName + " in db " + db.getName() + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + return false; } } @@ -228,25 +212,13 @@ private void trimCollectionIfExceedsCap(String dbName, MongoCollection if (oldestDocs.isEmpty()) break; - final List docsSnapshot = new ArrayList<>(oldestDocs); - this.scheduler.submit(() -> { - try { - dest.insertMany(docsSnapshot); - } catch (Exception e) { - logger.errorAndAddToDb("Async archive insert failed in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); - } - }); + asyncUpsertToArchive(oldestDocs, dest, dbName); Set ids = new HashSet<>(); for (Document d : oldestDocs) { ids.add(d.get("_id")); } - long deleted = 0L; - try { - deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); - } catch (Exception e) { - logger.errorAndAddToDb("Overflow trim delete failed in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); - } + long deleted = deleteByIds(source, ids, dbName); totalDeleted += deleted; toRemove -= deleted; @@ -260,6 +232,41 @@ private void trimCollectionIfExceedsCap(String dbName, MongoCollection logger.infoAndAddToDb("Completed overflow trim in db " + dbName + ": deleted=" + totalDeleted, LoggerMaker.LogDb.RUNTIME); } } + + private void asyncUpsertToArchive(List docs, MongoCollection dest, String dbName) { + if (docs == null || docs.isEmpty()) return; + List> writes = new ArrayList<>(docs.size()); + for (Document doc : docs) { + Object id = doc.get("_id"); + writes.add(new ReplaceOneModel<>( + Filters.eq("_id", id), + doc, + new ReplaceOptions().upsert(true) + )); + } + final List> writeSnapshot = new ArrayList<>(writes); + this.scheduler.submit(() -> { + try { + dest.bulkWrite(writeSnapshot); + } catch (MongoBulkWriteException bwe) { + logger.errorAndAddToDb("Async bulk write error archiving to " + DEST_COLLECTION + " in db " + dbName + ": " + bwe.getMessage(), LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Async error writing archive batch in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + }); + } + + private long deleteByIds(MongoCollection source, Set ids, String dbName) { + if (ids == null || ids.isEmpty()) return 0L; + try { + long deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); + logger.infoAndAddToDb("Deleted " + deleted + " documents from source in db " + dbName, LoggerMaker.LogDb.RUNTIME); + return deleted; + } catch (Exception e) { + logger.errorAndAddToDb("Failed to delete documents from source in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + return 0L; + } + } } diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java index 9ffc0c9d9d..8a9d87906d 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java @@ -135,7 +135,7 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon { int val = updatedConfig.getArchivalDays(); - if (val == 30 || val == 60 || val == 90) { + if (val >= 30 && val <= 90) { newDoc.append("archivalDays", val); } } @@ -158,7 +158,7 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon } { int val = updatedConfig.getArchivalDays(); - if (val == 30 || val == 60 || val == 90) { + if (val >= 30 && val <= 90) { builder.setArchivalDays(val); } } From 2127982018cfc9c4cd07eafaffdf176fd5b5f2a7 Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Wed, 24 Sep 2025 11:23:06 +0000 Subject: [PATCH 4/7] update unit test --- .../threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java index 460e33c761..a238802367 100644 --- a/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java +++ b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java @@ -73,7 +73,7 @@ public void testFetchRetentionDays_Valid90() throws Exception { public void testFetchRetentionDays_InvalidFallsBack() throws Exception { when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); - Document cfg = new Document("archivalDays", 45); + Document cfg = new Document("archivalDays", 15); when(cfgCollection.find().first()).thenReturn(cfg); Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); From 5475d8145bc343a20eaf22b1ff3e6cb1580492fb Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Wed, 24 Sep 2025 11:49:36 +0000 Subject: [PATCH 5/7] Add cap for deletes in ArchiveOldMaliciousEventsCron --- .../cron/ArchiveOldMaliciousEventsCron.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java index 76830f20d2..f14bd3b82a 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -34,6 +34,7 @@ public class ArchiveOldMaliciousEventsCron implements Runnable { private static final long MIN_RETENTION_DAYS = 30L; private static final long MAX_RETENTION_DAYS = 90L; private static final long MAX_SOURCE_DOCS = 400_000L; // cap size + private static final long MAX_DELETES_PER_ITERATION = 100_000L; // cap per cron iteration private final MongoClient mongoClient; private final ScheduledExecutorService scheduler; @@ -110,6 +111,7 @@ private void processDatabase(String dbName, long nowSeconds) { MongoCollection dest = db.getCollection(DEST_COLLECTION, Document.class); int totalMoved = 0; + long deletesThisIteration = 0L; while (true) { long iterationStartNanos = System.nanoTime(); @@ -135,6 +137,7 @@ private void processDatabase(String dbName, long nowSeconds) { long deleted = deleteByIds(source, ids, dbName); totalMoved += (int) deleted; + deletesThisIteration += deleted; long iterationElapsedMs = (System.nanoTime() - iterationStartNanos) / 1_000_000L; logger.infoAndAddToDb("Archive loop iteration in db " + dbName + ": batch=" + batch.size() + ", deleted=" + deleted + ", tookMs=" + iterationElapsedMs, LoggerMaker.LogDb.RUNTIME); @@ -142,6 +145,11 @@ private void processDatabase(String dbName, long nowSeconds) { if (batch.size() < BATCH_SIZE) { break; } + + if (deletesThisIteration >= MAX_DELETES_PER_ITERATION) { + logger.infoAndAddToDb("Reached delete cap (" + MAX_DELETES_PER_ITERATION + ") for this iteration in db " + dbName + ", stopping further deletes", LoggerMaker.LogDb.RUNTIME); + break; + } } if (totalMoved > 0) { @@ -150,7 +158,11 @@ private void processDatabase(String dbName, long nowSeconds) { // Enforce collection size cap by trimming oldest docs beyond 400k. try { - trimCollectionIfExceedsCap(dbName, source, dest); + if (deletesThisIteration < MAX_DELETES_PER_ITERATION) { + trimCollectionIfExceedsCap(dbName, source, dest); + } else { + logger.infoAndAddToDb("Skipping trim step as delete cap reached in db " + dbName, LoggerMaker.LogDb.RUNTIME); + } } catch (Exception e) { logger.errorAndAddToDb("Error trimming collection to cap in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); } From e37740282ef3591271a568371a565ae73f2d106e Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Wed, 24 Sep 2025 18:24:10 +0000 Subject: [PATCH 6/7] enhance overflow trimming logic --- .../cron/ArchiveOldMaliciousEventsCron.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java index f14bd3b82a..339585464f 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -77,7 +77,7 @@ public void runOnce() { // leave context unset for non-numeric db names } if (accId != null) { - processDatabase(dbName, nowSeconds); + archiveOldMaliciousEvents(dbName, nowSeconds); } else { logger.infoAndAddToDb("Skipping archive for db as context wasn't set: " + dbName, LoggerMaker.LogDb.RUNTIME); } @@ -97,7 +97,7 @@ private boolean shouldSkipDatabase(String dbName) { || "config".equals(dbName); } - private void processDatabase(String dbName, long nowSeconds) { + private void archiveOldMaliciousEvents(String dbName, long nowSeconds) { MongoDatabase db = mongoClient.getDatabase(dbName); if (!ensureCollectionExists(db, DEST_COLLECTION)) { logger.infoAndAddToDb("Archive collection missing, skipping db: " + dbName, LoggerMaker.LogDb.RUNTIME); @@ -204,12 +204,11 @@ private void trimCollectionIfExceedsCap(String dbName, MongoCollection if (approxCount <= MAX_SOURCE_DOCS) return; - long toRemove = approxCount - MAX_SOURCE_DOCS; long totalDeleted = 0L; - logger.infoAndAddToDb("Starting overflow trim in db " + dbName + ": approxCount=" + approxCount + ", toRemove=" + toRemove, LoggerMaker.LogDb.RUNTIME); + logger.infoAndAddToDb("Starting overflow trim in db " + dbName + ": approxCount=" + approxCount + ", overCap=" + (approxCount - MAX_SOURCE_DOCS), LoggerMaker.LogDb.RUNTIME); - while (toRemove > 0) { - int batch = (int) Math.min(BATCH_SIZE, toRemove); + while (true) { + int batch = BATCH_SIZE; List oldestDocs = new ArrayList<>(batch); try (MongoCursor cursor = source @@ -233,11 +232,9 @@ private void trimCollectionIfExceedsCap(String dbName, MongoCollection long deleted = deleteByIds(source, ids, dbName); totalDeleted += deleted; - toRemove -= deleted; - if (deleted < batch) { - break; - } + if (deleted < batch) break; + if (totalDeleted >= MAX_DELETES_PER_ITERATION) break; } if (totalDeleted > 0) { From f0a09bb2544d0462f87c822283a1d33cbd943acf Mon Sep 17 00:00:00 2001 From: soumilsuri Date: Thu, 25 Sep 2025 08:16:03 +0000 Subject: [PATCH 7/7] Refactor archive logic to use InsertOneModel for bulk writes --- .../backend/cron/ArchiveOldMaliciousEventsCron.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java index 339585464f..c8d0516d28 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -8,8 +8,8 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; -import com.mongodb.client.model.ReplaceOneModel; -import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.Sorts; import com.mongodb.client.model.WriteModel; import org.bson.Document; @@ -246,17 +246,12 @@ private void asyncUpsertToArchive(List docs, MongoCollection if (docs == null || docs.isEmpty()) return; List> writes = new ArrayList<>(docs.size()); for (Document doc : docs) { - Object id = doc.get("_id"); - writes.add(new ReplaceOneModel<>( - Filters.eq("_id", id), - doc, - new ReplaceOptions().upsert(true) - )); + writes.add(new InsertOneModel<>(doc)); } final List> writeSnapshot = new ArrayList<>(writes); this.scheduler.submit(() -> { try { - dest.bulkWrite(writeSnapshot); + dest.bulkWrite(writeSnapshot, new BulkWriteOptions().ordered(false)); } catch (MongoBulkWriteException bwe) { logger.errorAndAddToDb("Async bulk write error archiving to " + DEST_COLLECTION + " in db " + dbName + ": " + bwe.getMessage(), LoggerMaker.LogDb.RUNTIME); } catch (Exception e) {