diff --git a/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java b/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java index 81a3065cce..40cad38b0c 100644 --- a/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java +++ b/apps/dashboard/src/main/java/com/akto/action/threat_detection/ThreatConfiguration.java @@ -10,6 +10,7 @@ public class ThreatConfiguration { private Actor actor; private RatelimitConfig ratelimitConfig; + private Integer archivalDays; @lombok.Getter @lombok.Setter diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx new file mode 100644 index 0000000000..2739fafc15 --- /dev/null +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ArchivalConfigComponent.jsx @@ -0,0 +1,87 @@ +import { useState, useEffect } from "react"; +import func from "@/util/func" +import { LegacyCard, VerticalStack, Divider, Text, Button, Box } from "@shopify/polaris"; +import api from "../../../pages/threat_detection/api.js"; +import Dropdown from "../../../components/layouts/Dropdown.jsx"; + +const ArchivalConfigComponent = ({ title, description }) => { + const [archivalDays, setArchivalDays] = useState(60); + const [isSaveDisabled, setIsSaveDisabled] = useState(true); + + const fetchData = async () => { + const response = await api.fetchThreatConfiguration(); + const days = response?.threatConfiguration?.archivalDays; + const value = days === 30 || days === 60 || days === 90 ? days : 60; + setArchivalDays(value); + setIsSaveDisabled(true); + }; + + const onSave = async () => { + const payload = { + archivalDays: archivalDays + }; + await api.modifyThreatConfiguration(payload).then(() => { + try { + func.setToast(true, false, "Archival time saved successfully"); + fetchData() + } catch (error) { + func.setToast(true, true, "Error saving archival time"); + } + }); + }; + + useEffect(() => { + fetchData(); + }, []); + + const options = [ + { value: 30, label: "30 days" }, + { value: 60, label: "60 days" }, + { value: 90, label: "90 days" }, + ]; + + function TitleComponent({ title, description }) { + return ( + + {title} + + {description} + + + ) + } + + const onChange = (val) => { + setArchivalDays(val); + setIsSaveDisabled(false); + }; + + return ( + } + primaryFooterAction={{ + content: 'Save', + onAction: onSave, + loading: false, + disabled: isSaveDisabled + }} + > + + + + + onChange(val)} + label="Archival Time" + initial={() => archivalDays} + /> + + + + + ); +}; + +export default ArchivalConfigComponent; + + diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx index fc8f256711..706974be45 100644 --- a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/pages/settings/threat_configuration/ThreatConfiguration.jsx @@ -1,6 +1,7 @@ import PageWithMultipleCards from "../../../components/layouts/PageWithMultipleCards" import ThreatActorConfigComponent from "./ThreatActorConfig.jsx" import RatelimitConfigComponent from "./RatelimitConfigComponent.jsx" +import ArchivalConfigComponent from "./ArchivalConfigComponent.jsx" function ThreatConfiguration() { @@ -14,6 +15,11 @@ function ThreatConfiguration() { title={"Rate Limit Configuration"} description={"Configure rate limiting rules to protect your APIs from abuse."} key={"ratelimitConfig"} + />, + ]; diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java index acac265e58..50c006e74c 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java @@ -14,6 +14,7 @@ import com.akto.threat.backend.service.ThreatActorService; import com.akto.threat.backend.service.ThreatApiService; import com.akto.threat.backend.tasks.FlushMessagesToDB; +import com.akto.threat.backend.cron.ArchiveOldMaliciousEventsCron; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.ReadPreference; @@ -73,6 +74,14 @@ public static void main(String[] args) throws Exception { ApiDistributionDataService apiDistributionDataService = new ApiDistributionDataService(threatProtectionMongo); new BackendVerticle(maliciousEventService, threatActorService, threatApiService, apiDistributionDataService).start(); + + try { + logger.infoAndAddToDb("Starting ArchiveOldMaliciousEventsCron for all databases", LoggerMaker.LogDb.RUNTIME); + new ArchiveOldMaliciousEventsCron(threatProtectionMongo).cron(); + logger.infoAndAddToDb("Scheduled ArchiveOldMaliciousEventsCron", LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Error starting ArchiveOldMaliciousEventsCron: " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } } } diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java new file mode 100644 index 0000000000..c8d0516d28 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCron.java @@ -0,0 +1,276 @@ +package com.akto.threat.backend.cron; + +import com.akto.log.LoggerMaker; +import com.akto.dao.context.Context; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Sorts; +import com.mongodb.client.model.WriteModel; +import org.bson.Document; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ArchiveOldMaliciousEventsCron implements Runnable { + + private static final LoggerMaker logger = new LoggerMaker(ArchiveOldMaliciousEventsCron.class); + + private static final String SOURCE_COLLECTION = "malicious_events"; + private static final String DEST_COLLECTION = "archived_malicious_events"; + private static final int BATCH_SIZE = 5000; + private static final long DEFAULT_RETENTION_DAYS = 60L; // default, can be overridden from DB + private static final long MIN_RETENTION_DAYS = 30L; + private static final long MAX_RETENTION_DAYS = 90L; + private static final long MAX_SOURCE_DOCS = 400_000L; // cap size + private static final long MAX_DELETES_PER_ITERATION = 100_000L; // cap per cron iteration + + private final MongoClient mongoClient; + private final ScheduledExecutorService scheduler; + + public ArchiveOldMaliciousEventsCron(MongoClient mongoClient) { + this.mongoClient = mongoClient; + this.scheduler = Executors.newScheduledThreadPool(1); + } + + public void cron() { + long initialDelaySeconds = 0; + long periodSeconds = Duration.ofHours(6).getSeconds(); + scheduler.scheduleAtFixedRate(this, initialDelaySeconds, periodSeconds, TimeUnit.SECONDS); + logger.infoAndAddToDb("Scheduled ArchiveOldMaliciousEventsCron every 6 hours", LoggerMaker.LogDb.RUNTIME); + } + + @Override + public void run() { + try { + runOnce(); + } catch (Throwable t) { + logger.errorAndAddToDb("Archive cron failed unexpectedly: " + t.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + } + + public void runOnce() { + long nowSeconds = System.currentTimeMillis() / 1000L; // epoch seconds + + try (MongoCursor dbNames = mongoClient.listDatabaseNames().cursor()) { + while (dbNames.hasNext()) { + String dbName = dbNames.next(); + if (shouldSkipDatabase(dbName)) continue; + + Integer accId = null; + try { + try { + accId = Integer.parseInt(dbName); + Context.accountId.set(accId); + } catch (Exception ignore) { + // leave context unset for non-numeric db names + } + if (accId != null) { + archiveOldMaliciousEvents(dbName, nowSeconds); + } else { + logger.infoAndAddToDb("Skipping archive for db as context wasn't set: " + dbName, LoggerMaker.LogDb.RUNTIME); + } + } catch (Exception e) { + logger.errorAndAddToDb("Error processing database: " + dbName + " : " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } finally { + Context.resetContextThreadLocals(); + } + } + } + } + + private boolean shouldSkipDatabase(String dbName) { + return dbName == null || dbName.isEmpty() + || "admin".equals(dbName) + || "local".equals(dbName) + || "config".equals(dbName); + } + + private void archiveOldMaliciousEvents(String dbName, long nowSeconds) { + MongoDatabase db = mongoClient.getDatabase(dbName); + if (!ensureCollectionExists(db, DEST_COLLECTION)) { + logger.infoAndAddToDb("Archive collection missing, skipping db: " + dbName, LoggerMaker.LogDb.RUNTIME); + return; + } + + long retentionDays = fetchRetentionDays(db); + long threshold = nowSeconds - (retentionDays * 24 * 60 * 60); + + MongoCollection source = db.getCollection(SOURCE_COLLECTION, Document.class); + MongoCollection dest = db.getCollection(DEST_COLLECTION, Document.class); + + int totalMoved = 0; + long deletesThisIteration = 0L; + + while (true) { + long iterationStartNanos = System.nanoTime(); + List batch = new ArrayList<>(BATCH_SIZE); + try (MongoCursor cursor = source + .find(Filters.lte("detectedAt", threshold)) + .sort(Sorts.ascending("detectedAt")) + .limit(BATCH_SIZE) + .cursor()) { + while (cursor.hasNext()) { + batch.add(cursor.next()); + } + } + + if (batch.isEmpty()) break; + + Set ids = new HashSet<>(); + for (Document doc : batch) { + ids.add(doc.get("_id")); + } + + asyncUpsertToArchive(batch, dest, dbName); + + long deleted = deleteByIds(source, ids, dbName); + totalMoved += (int) deleted; + deletesThisIteration += deleted; + + long iterationElapsedMs = (System.nanoTime() - iterationStartNanos) / 1_000_000L; + logger.infoAndAddToDb("Archive loop iteration in db " + dbName + ": batch=" + batch.size() + ", deleted=" + deleted + ", tookMs=" + iterationElapsedMs, LoggerMaker.LogDb.RUNTIME); + + if (batch.size() < BATCH_SIZE) { + break; + } + + if (deletesThisIteration >= MAX_DELETES_PER_ITERATION) { + logger.infoAndAddToDb("Reached delete cap (" + MAX_DELETES_PER_ITERATION + ") for this iteration in db " + dbName + ", stopping further deletes", LoggerMaker.LogDb.RUNTIME); + break; + } + } + + if (totalMoved > 0) { + logger.infoAndAddToDb("Completed archiving for db " + dbName + ", total moved: " + totalMoved, LoggerMaker.LogDb.RUNTIME); + } + + // Enforce collection size cap by trimming oldest docs beyond 400k. + try { + if (deletesThisIteration < MAX_DELETES_PER_ITERATION) { + trimCollectionIfExceedsCap(dbName, source, dest); + } else { + logger.infoAndAddToDb("Skipping trim step as delete cap reached in db " + dbName, LoggerMaker.LogDb.RUNTIME); + } + } catch (Exception e) { + logger.errorAndAddToDb("Error trimming collection to cap in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + } + + private long fetchRetentionDays(MongoDatabase db) { + try { + MongoCollection cfg = db.getCollection("threat_configuration", Document.class); + Document doc = cfg.find().first(); + if (doc == null) return DEFAULT_RETENTION_DAYS; + Object val = doc.get("archivalDays"); + if (val instanceof Number) { + long days = ((Number) val).longValue(); + if (days < MIN_RETENTION_DAYS || days > MAX_RETENTION_DAYS) { + return DEFAULT_RETENTION_DAYS; + } + return days; + } + } catch (Exception e) { + logger.errorAndAddToDb("Failed fetching archivalDays from threat_configuration for db " + db.getName() + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + return DEFAULT_RETENTION_DAYS; + } + + private boolean ensureCollectionExists(MongoDatabase db, String collectionName) { + try { + Document first = db.listCollections(Document.class) + .filter(Filters.eq("name", collectionName)) + .first(); + return first != null; + } catch (Exception e) { + logger.errorAndAddToDb("Error checking existence for collection: " + collectionName + " in db " + db.getName() + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + return false; + } + } + + private void trimCollectionIfExceedsCap(String dbName, MongoCollection source, MongoCollection dest) { + long approxCount = source.countDocuments(); + + if (approxCount <= MAX_SOURCE_DOCS) return; + + long totalDeleted = 0L; + logger.infoAndAddToDb("Starting overflow trim in db " + dbName + ": approxCount=" + approxCount + ", overCap=" + (approxCount - MAX_SOURCE_DOCS), LoggerMaker.LogDb.RUNTIME); + + while (true) { + int batch = BATCH_SIZE; + + List oldestDocs = new ArrayList<>(batch); + try (MongoCursor cursor = source + .find() + .sort(Sorts.ascending("detectedAt")) + .limit(batch) + .cursor()) { + while (cursor.hasNext()) { + oldestDocs.add(cursor.next()); + } + } + + if (oldestDocs.isEmpty()) break; + + asyncUpsertToArchive(oldestDocs, dest, dbName); + + Set ids = new HashSet<>(); + for (Document d : oldestDocs) { + ids.add(d.get("_id")); + } + long deleted = deleteByIds(source, ids, dbName); + + totalDeleted += deleted; + + if (deleted < batch) break; + if (totalDeleted >= MAX_DELETES_PER_ITERATION) break; + } + + if (totalDeleted > 0) { + logger.infoAndAddToDb("Completed overflow trim in db " + dbName + ": deleted=" + totalDeleted, LoggerMaker.LogDb.RUNTIME); + } + } + + private void asyncUpsertToArchive(List docs, MongoCollection dest, String dbName) { + if (docs == null || docs.isEmpty()) return; + List> writes = new ArrayList<>(docs.size()); + for (Document doc : docs) { + writes.add(new InsertOneModel<>(doc)); + } + final List> writeSnapshot = new ArrayList<>(writes); + this.scheduler.submit(() -> { + try { + dest.bulkWrite(writeSnapshot, new BulkWriteOptions().ordered(false)); + } catch (MongoBulkWriteException bwe) { + logger.errorAndAddToDb("Async bulk write error archiving to " + DEST_COLLECTION + " in db " + dbName + ": " + bwe.getMessage(), LoggerMaker.LogDb.RUNTIME); + } catch (Exception e) { + logger.errorAndAddToDb("Async error writing archive batch in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + } + }); + } + + private long deleteByIds(MongoCollection source, Set ids, String dbName) { + if (ids == null || ids.isEmpty()) return 0L; + try { + long deleted = source.deleteMany(Filters.in("_id", ids)).getDeletedCount(); + logger.infoAndAddToDb("Deleted " + deleted + " documents from source in db " + dbName, LoggerMaker.LogDb.RUNTIME); + return deleted; + } catch (Exception e) { + logger.errorAndAddToDb("Failed to delete documents from source in db " + dbName + ": " + e.getMessage(), LoggerMaker.LogDb.RUNTIME); + return 0L; + } + } +} + + diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java index 16ce6b6e81..8a9d87906d 100644 --- a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java @@ -85,6 +85,18 @@ public ThreatConfiguration fetchThreatConfiguration(String accountId) { if (rateLimitConfig != null) { builder.setRatelimitConfig(rateLimitConfig); } + + // Handle archival days + Integer archivalDays = null; + try { + Object val = doc.get("archivalDays"); + if (val instanceof Number) { + archivalDays = ((Number) val).intValue(); + } + } catch (Exception ignore) {} + if (archivalDays != null) { + builder.setArchivalDays(archivalDays); + } } return builder.build(); } @@ -120,6 +132,13 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon newDoc.append("ratelimitConfig", ratelimitConfigDoc.get("ratelimitConfig")); } } + + { + int val = updatedConfig.getArchivalDays(); + if (val >= 30 && val <= 90) { + newDoc.append("archivalDays", val); + } + } Document existingDoc = coll.find().first(); @@ -137,6 +156,12 @@ public ThreatConfiguration modifyThreatConfiguration(String accountId, ThreatCon if (updatedConfig.hasRatelimitConfig()) { builder.setRatelimitConfig(updatedConfig.getRatelimitConfig()); } + { + int val = updatedConfig.getArchivalDays(); + if (val >= 30 && val <= 90) { + builder.setArchivalDays(val); + } + } return builder.build(); } diff --git a/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java new file mode 100644 index 0000000000..a238802367 --- /dev/null +++ b/apps/threat-detection-backend/src/test/java/com/akto/threat/backend/cron/ArchiveOldMaliciousEventsCronTest.java @@ -0,0 +1,134 @@ +package com.akto.threat.backend.cron; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class ArchiveOldMaliciousEventsCronTest { + + @Mock + private MongoClient mongoClient; + + @Mock + private MongoDatabase mongoDatabase; + + @Mock + private MongoCollection cfgCollection; + + @Mock + private MongoCollection sourceCollection; + + @Mock + private MongoCollection destCollection; + + private ArchiveOldMaliciousEventsCron cron; + + @Before + public void setup() { + cron = new ArchiveOldMaliciousEventsCron(mongoClient); + } + + @Test + public void testFetchRetentionDays_DefaultWhenMissing() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + when(cfgCollection.find().first()).thenReturn(null); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(60L, days); + } + + @Test + public void testFetchRetentionDays_Valid90() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + Document cfg = new Document("archivalDays", 90); + when(cfgCollection.find().first()).thenReturn(cfg); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(90L, days); + } + + @Test + public void testFetchRetentionDays_InvalidFallsBack() throws Exception { + when(mongoDatabase.getCollection(eq("threat_configuration"), eq(Document.class))).thenReturn(cfgCollection); + when(cfgCollection.find()).thenReturn(mock(com.mongodb.client.FindIterable.class)); + Document cfg = new Document("archivalDays", 15); + when(cfgCollection.find().first()).thenReturn(cfg); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod("fetchRetentionDays", MongoDatabase.class); + m.setAccessible(true); + long days = (long) m.invoke(cron, mongoDatabase); + assertEquals(60L, days); + } + + @Test + public void testTrimCollectionIfExceedsCap_DeletesOldest() throws Exception { + when(sourceCollection.countDocuments()).thenReturn(410_000L); + + // Build a small batch of oldest docs + List oldest = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + oldest.add(new Document("_id", "id-" + i).append("detectedAt", i)); + } + + // Mock find().sort().limit().cursor() + com.mongodb.client.FindIterable findIt = mock(com.mongodb.client.FindIterable.class); + when(sourceCollection.find()).thenReturn(findIt); + when(findIt.sort(any())).thenReturn(findIt); + when(findIt.limit(anyInt())).thenReturn(findIt); + MongoCursor cursor = mock(MongoCursor.class); + when(findIt.cursor()).thenReturn(cursor); + // Cursor iteration + when(cursor.hasNext()).thenAnswer(inv -> { + int idx = (int) cursorState[0]; + return idx < oldest.size(); + }); + when(cursor.next()).thenAnswer(inv -> { + int idx = (int) cursorState[0]; + Document d = oldest.get(idx); + cursorState[0] = idx + 1; + return d; + }); + + // deleteMany returns count + when(sourceCollection.deleteMany(any())).thenReturn(new com.mongodb.client.result.DeleteResult() { + @Override public boolean wasAcknowledged() { return true; } + @Override public long getDeletedCount() { return oldest.size(); } + }); + + Method m = ArchiveOldMaliciousEventsCron.class.getDeclaredMethod( + "trimCollectionIfExceedsCap", String.class, MongoCollection.class, MongoCollection.class); + m.setAccessible(true); + // reset cursor state before invoke + cursorState[0] = 0; + m.invoke(cron, "1000", sourceCollection, destCollection); + + verify(sourceCollection, atLeastOnce()).deleteMany(any()); + } + + // mutable state for cursor index in answers + private final Object[] cursorState = new Object[]{0}; +} + + diff --git a/protobuf/threat_detection/service/dashboard_service/v1/service.proto b/protobuf/threat_detection/service/dashboard_service/v1/service.proto index e331dddcae..20be51004a 100644 --- a/protobuf/threat_detection/service/dashboard_service/v1/service.proto +++ b/protobuf/threat_detection/service/dashboard_service/v1/service.proto @@ -321,6 +321,7 @@ message RatelimitConfig { message ThreatConfiguration { Actor actor = 1; RatelimitConfig ratelimit_config = 2; + int32 archival_days = 3; } message ApiDistributionDataRequestPayload {