diff --git a/apps/mini-runtime/pom.xml b/apps/mini-runtime/pom.xml index 8760947e57..c843712ea3 100644 --- a/apps/mini-runtime/pom.xml +++ b/apps/mini-runtime/pom.xml @@ -53,6 +53,16 @@ protobuf-java 4.29.2 + + org.eclipse.jetty + jetty-server + 9.4.53.v20231009 + + + org.eclipse.jetty + jetty-servlet + 9.4.53.v20231009 + com.akto.libs.utils diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java index 288f9aa109..27b4be8473 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java @@ -39,13 +39,69 @@ import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.BufferedReader; + // Import protobuf classes import com.akto.proto.generated.threat_detection.message.http_response_param.v1.HttpResponseParam; import com.akto.proto.generated.threat_detection.message.http_response_param.v1.StringList; public class Main { - private Consumer consumer; + public static class DataUploadServlet extends HttpServlet { + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + response.setContentType("application/json"); + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Access-Control-Allow-Methods", "POST"); + response.setHeader("Access-Control-Allow-Headers", "Content-Type"); + + try { + StringBuilder jsonBuffer = new StringBuilder(); + BufferedReader reader = request.getReader(); + String line; + while ((line = reader.readLine()) != null) { + jsonBuffer.append(line); + } + + String jsonData = jsonBuffer.toString(); + if (jsonData == null || jsonData.trim().isEmpty()) { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + response.getWriter().write("{\"error\":\"Empty request body\"}"); + return; + } + + processUploadedData(jsonData); + + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().write("{\"status\":\"success\",\"message\":\"Data processed successfully\"}"); + + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Error processing uploaded data: " + e.getMessage()); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + response.getWriter().write("{\"error\":\"Internal server error\"}"); + } + } + + @Override + protected void doOptions(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Access-Control-Allow-Methods", "POST"); + response.setHeader("Access-Control-Allow-Headers", "Content-Type"); + response.setStatus(HttpServletResponse.SC_OK); + } + } + public static final String GROUP_NAME = "group_name"; public static final String VXLAN_ID = "vxlanId"; public static final String VPC_CIDR = "vpc_cidr"; @@ -107,7 +163,57 @@ public static boolean tryForCollectionName(String message) { return ret; } + private static void processUploadedData(String jsonData) { + try { + if (tryForCollectionName(jsonData)) { + loggerMaker.info("Processed collection name data"); + return; + } + + Gson gson = new Gson(); + Map map = gson.fromJson(jsonData, Map.class); + List> batchData = (List>) map.get("batchData"); + + for (Map batchDataItem : batchData) { + String json = gson.toJson(batchDataItem, Map.class); + + HttpResponseParams httpResponseParams = HttpCallParser.parseKafkaMessage(json); + if (httpResponseParams == null) { + loggerMaker.error("Failed to parse uploaded data - invalid JSON format"); + return; + } + + List dataList = Collections.singletonList(httpResponseParams); + processData(dataList); + loggerMaker.info("Successfully processed uploaded data"); + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Error in processUploadedData: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + private static void startHttpServer() { + try { + int port = Integer.parseInt(System.getenv().getOrDefault("HTTP_PORT", "8080")); + Server server = new Server(port); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + context.addServlet(new ServletHolder(new DataUploadServlet()), "/upload"); + + server.start(); + loggerMaker.infoAndAddToDb("HTTP server started on port " + port); + loggerMaker.infoAndAddToDb("Data upload endpoint available at: /upload"); + + server.join(); + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Failed to start HTTP server: " + e.getMessage()); + throw new RuntimeException(e); + } + } public static void insertRuntimeFilters() { RuntimeFilterDao.instance.initialiseFilters(); @@ -253,7 +359,6 @@ public static String getLogTopicName() { // REFERENCE: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html (But how do we Exit?) public static void main(String[] args) { - //String mongoURI = System.getenv("AKTO_MONGO_CONN");; String configName = System.getenv("AKTO_CONFIG_NAME"); String topicName = getTopicName(); String kafkaBrokerUrl = System.getenv().getOrDefault("AKTO_KAFKA_BROKER_URL","kafka1:19092"); @@ -263,21 +368,6 @@ public static void main(String[] args) { kafkaBrokerUrl = "127.0.0.1:29092"; } final String brokerUrlFinal = kafkaBrokerUrl; - String groupIdConfig = System.getenv("AKTO_KAFKA_GROUP_ID_CONFIG") != null - ? System.getenv("AKTO_KAFKA_GROUP_ID_CONFIG") - : "asdf"; - boolean syncImmediately = false; - boolean fetchAllSTI = true; - Map accountInfoMap = new HashMap<>(); - - boolean isDashboardInstance = false; - if (isDashboardInstance) { - syncImmediately = true; - fetchAllSTI = false; - } - int maxPollRecordsConfig = Integer.parseInt(System.getenv("AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG") != null - ? System.getenv("AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG") - : "100"); AccountSettings aSettings = dataActor.fetchAccountSettings(); if (aSettings == null) { @@ -301,8 +391,6 @@ public static void main(String[] args) { initializeRuntime(); - String centralKafkaTopicName = AccountSettings.DEFAULT_CENTRAL_KAFKA_TOPIC_NAME; - buildKafka(); buildProtobufKafkaProducer(brokerUrlFinal); scheduler.scheduleAtFixedRate(new Runnable() { @@ -330,26 +418,8 @@ public void run() { apiConfig = new APIConfig(configName,"access-token", 1, 10_000_000, sync_threshold_time); // this sync threshold time is used for deleting sample data } - final Main main = new Main(); - Properties properties = Main.configProperties(kafkaBrokerUrl, groupIdConfig, maxPollRecordsConfig); - main.consumer = new KafkaConsumer<>(properties); - - final Thread mainThread = Thread.currentThread(); - final AtomicBoolean exceptionOnCommitSync = new AtomicBoolean(false); - Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { - main.consumer.wakeup(); - try { - if (!exceptionOnCommitSync.get()) { - mainThread.join(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Error e){ - loggerMaker.errorAndAddToDb("Error in main thread: "+ e.getMessage()); - } - // Close protobuf producer if (protobufKafkaProducer != null) { protobufKafkaProducer.close(); @@ -361,33 +431,6 @@ public void run() { scheduler.scheduleAtFixedRate(()-> { try { - - Map metrics = main.consumer.metrics(); - - for (Map.Entry entry : metrics.entrySet()) { - MetricName key = entry.getKey(); - Metric value = entry.getValue(); - - if(key.name().equals("records-lag-max")){ - double val = value.metricValue().equals(Double.NaN) ? 0d: (double) value.metricValue(); - AllMetrics.instance.setKafkaRecordsLagMax((float) val); - } - if(key.name().equals("records-consumed-rate")){ - double val = value.metricValue().equals(Double.NaN) ? 0d: (double) value.metricValue(); - AllMetrics.instance.setKafkaRecordsConsumedRate((float) val); - } - - if(key.name().equals("fetch-latency-avg")){ - double val = value.metricValue().equals(Double.NaN) ? 0d: (double) value.metricValue(); - AllMetrics.instance.setKafkaFetchAvgLatency((float) val); - } - - if(key.name().equals("bytes-consumed-rate")){ - double val = value.metricValue().equals(Double.NaN) ? 0d: (double) value.metricValue(); - AllMetrics.instance.setKafkaBytesConsumedRate((float) val); - } - } - if (checkPg) { long dbSizeInMb = clientLayer.fetchTotalSize(); AllMetrics.instance.setPgDataSizeInMb(dbSizeInMb); @@ -399,75 +442,6 @@ public void run() { }, 0, 1, TimeUnit.MINUTES); - Map httpCallParserMap = new HashMap<>(); - - // sync infra metrics thread - // ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - // KafkaHealthMetricSyncTask task = new KafkaHealthMetricSyncTask(main.consumer); - // executor.scheduleAtFixedRate(task, 2, 60, TimeUnit.SECONDS); - - long lastSyncOffset = 0; - - String kafkaUrl = kafkaBrokerUrl; - executorService.schedule(new Runnable() { - public void run() { - try { - loggerMaker.infoAndAddToDb("Starting traffic log consumer"); - String logTopicName = getLogTopicName(); - - Properties logConsumerProps = Main.configProperties(kafkaUrl, groupIdConfig + LOG_GROUP_ID,maxPollRecordsConfig); - KafkaConsumer logConsumer = new KafkaConsumer<>(logConsumerProps); - long lastLogSyncOffset = 0; - try { - logConsumer.subscribe(Collections.singletonList(logTopicName)); - loggerMaker.infoAndAddToDb("Second consumer subscribed to " + logTopicName); - while (true) { - ConsumerRecords records = logConsumer.poll(Duration.ofMillis(10000)); - try { - logConsumer.commitSync(); - } catch (Exception e) { - throw e; - } - for (ConsumerRecord record : records) { - try { - lastLogSyncOffset++; - TrafficProducerLog trafficProducerLog = SampleParser.parseLogMessage(record.value()); - if (trafficProducerLog == null || trafficProducerLog.getMessage() == null) { - loggerMaker.errorAndAddToDb("Traffic producer log is null"); - continue; - } - String message = String.format("[TRAFFIC_PRODUCER] [%s] %s", trafficProducerLog.getSource(), trafficProducerLog.getMessage()); - - if (trafficProducerLog.getLogType() != null && trafficProducerLog.getLogType().equalsIgnoreCase("ERROR")) { - loggerMaker.errorAndAddToDb(message); - } else if (trafficProducerLog.getLogType() != null && trafficProducerLog.getLogType().equalsIgnoreCase("DEBUG")) { - loggerMaker.debug(message); - } else { - loggerMaker.infoAndAddToDb(message); - } - - if (lastLogSyncOffset % 100 == 0) { - loggerMaker.info("Committing log offset at position: " + lastLogSyncOffset); - } - - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error while parsing traffic producer log kafka message " + e); - continue; - } - } - } - } catch (WakeupException ignored) { - // Shutdown - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error in second topic consumer"); - } finally { - logConsumer.close(); - } - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error while starting traffic log consumer"); - } - } - }, 0, TimeUnit.SECONDS); // schedule MCP sync job for 24 hours loggerMaker.info("Scheduling MCP Sync Job"); @@ -500,8 +474,8 @@ public void run() { if(isDbMergingModeEnabled()){ runDBMaintenanceJob(apiConfig); }else{ - kafkaSubscribeAndProcess(topicName, syncImmediately, fetchAllSTI, accountInfoMap, isDashboardInstance, centralKafkaTopicName, - apiConfig, main, exceptionOnCommitSync, httpCallParserMap, lastSyncOffset); + loggerMaker.infoAndAddToDb("Starting HTTP server instead of Kafka consumer..."); + startHttpServer(); } } @@ -509,104 +483,8 @@ public static boolean isDbMergingModeEnabled(){ return System.getenv().getOrDefault("DB_MERGING_MODE", "false").equalsIgnoreCase("true"); } - /** - * Main method of mini runtime where traffic kafka topic consumer does processing. - */ - private static void kafkaSubscribeAndProcess(String topicName, boolean syncImmediately, boolean fetchAllSTI, - Map accountInfoMap, boolean isDashboardInstance, String centralKafkaTopicName, - APIConfig apiConfig, final Main main, final AtomicBoolean exceptionOnCommitSync, - Map httpCallParserMap, long lastSyncOffset) { - try { - main.consumer.subscribe(Arrays.asList(topicName)); - loggerMaker.infoAndAddToDb("Consumer subscribed to topic: " + topicName); - while (true) { - ConsumerRecords records = main.consumer.poll(Duration.ofMillis(10000)); - try { - main.consumer.commitSync(); - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error while committing offset: " + e.getMessage()); - throw e; - } - long start = System.currentTimeMillis(); - // TODO: what happens if exception - Map> responseParamsToAccountMap = new HashMap<>(); - bulkParseTrafficToResponseParams(lastSyncOffset, records, responseParamsToAccountMap); - - handleResponseParams(responseParamsToAccountMap, - accountInfoMap, - isDashboardInstance, - httpCallParserMap, - apiConfig, - fetchAllSTI, - syncImmediately, - centralKafkaTopicName); - AllMetrics.instance.setRuntimeProcessLatency(System.currentTimeMillis()-start); - loggerMaker.info("Processed " + responseParamsToAccountMap.size() + " accounts in " + (System.currentTimeMillis()-start) + " ms"); - } - - } catch (WakeupException ignored) { - // nothing to catch. This exception is called from the shutdown hook. - loggerMaker.error("Kafka consumer closed due to wakeup exception"); - } catch (Exception e) { - exceptionOnCommitSync.set(true); - printL(e); - loggerMaker.errorAndAddToDb(e, "Error in main runtime: " + e.getMessage()); - e.printStackTrace(); - System.exit(0); - } finally { - loggerMaker.warn("Closing kafka consumer for topic: " + topicName); - main.consumer.close(); - } - } - - private static void bulkParseTrafficToResponseParams(long lastSyncOffset, ConsumerRecords records, - Map> responseParamsToAccountMap) { - for (ConsumerRecord r: records) { - HttpResponseParams httpResponseParams; - try { - - printL(r.value()); - AllMetrics.instance.setRuntimeKafkaRecordCount(1); - AllMetrics.instance.setRuntimeKafkaRecordSize(r.value().length()); - - lastSyncOffset++; - if (DataControlFetcher.stopIngestionFromKafka()) { - continue; - } - - if (lastSyncOffset % 100 == 0) { - loggerMaker.info("Committing offset at position: " + lastSyncOffset); - } - - if (tryForCollectionName(r.value())) { - continue; - } - httpResponseParams = HttpCallParser.parseKafkaMessage(r.value()); - if (httpResponseParams == null) { - loggerMaker.error("httpresponse params was skipped due to invalid json requestBody"); - continue; - } - HttpRequestParams requestParams = httpResponseParams.getRequestParams(); - String debugHost = Utils.printDebugHostLog(httpResponseParams); - if (debugHost != null) { - loggerMaker.infoAndAddToDb("Found debug host: " + debugHost + " in url: " + requestParams.getMethod() + " " + requestParams.getURL()); - } - if (Utils.printDebugUrlLog(requestParams.getURL())) { - loggerMaker.infoAndAddToDb("Found debug url: " + requestParams.getURL()); - } - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error while parsing kafka message " + e); - continue; - } - String accountId = httpResponseParams.getAccountId(); - if (!responseParamsToAccountMap.containsKey(accountId)) { - responseParamsToAccountMap.put(accountId, new ArrayList<>()); - } - responseParamsToAccountMap.get(accountId).add(httpResponseParams); - } - } /** * This method is used to run the postgres db sample data merging job.