From 7f65217e43634a1bdaf067ba2f5f0493438e47c2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 25 Apr 2025 14:32:11 -0400 Subject: [PATCH 1/4] update: add p_format_verified for schema detection if server detects known format, add `p_format_verified=true` to the event if schema detection fails, add `p_format_verified=false` to the event --- src/event/format/known_schema.rs | 21 +++++++++++-- src/event/mod.rs | 1 + src/handlers/http/ingest.rs | 30 ++++++++++++------- src/handlers/http/modal/utils/ingest_utils.rs | 10 +++---- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs index 56aa5f6ee..3d6353cd8 100644 --- a/src/event/format/known_schema.rs +++ b/src/event/format/known_schema.rs @@ -24,6 +24,8 @@ use serde::{Deserialize, Deserializer}; use serde_json::{Map, Value}; use tracing::error; +use crate::event::FORMAT_VERIFY_KEY; + /// Predefined JSON with known textual logging formats const FORMATS_JSON: &str = include_str!("../../../resources/formats.json"); @@ -120,11 +122,21 @@ impl SchemaDefinition { } } + // add `P_FORMAT_VERIFY_KEY` to the object + obj.insert( + FORMAT_VERIFY_KEY.to_string(), + Value::String("true".to_string()), + ); + obj.extend(extracted_fields); return Some(format.fields.clone()); } - + // add `P_FORMAT_VERIFY_KEY` to the object + obj.insert( + FORMAT_VERIFY_KEY.to_string(), + Value::String("false".to_string()), + ); None } } @@ -180,6 +192,7 @@ impl EventProcessor { pub fn extract_from_inline_log( &self, json: &mut Value, + p_custom_fields: &mut HashMap, log_source: &str, extract_log: Option<&str>, ) -> Result, Error> { @@ -197,7 +210,8 @@ impl EventProcessor { if let Some(known_fields) = schema.check_or_extract(event, extract_log) { fields.extend(known_fields); } else { - return Err(Error::Unacceptable(log_source.to_owned())); + // add `P_FORMAT_VERIFY_KEY` to the object + p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string()); } } } @@ -205,7 +219,8 @@ impl EventProcessor { if let Some(known_fields) = schema.check_or_extract(event, extract_log) { return Ok(known_fields); } else { - return Err(Error::Unacceptable(log_source.to_owned())); + // add `P_FORMAT_VERIFY_KEY` to the object + p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string()); } } _ => unreachable!("We don't accept events of the form: {json}"), diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..c60f0d057 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -38,6 +38,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const USER_AGENT_KEY: &str = "p_user_agent"; pub const SOURCE_IP_KEY: &str = "p_src_ip"; pub const FORMAT_KEY: &str = "p_format"; +pub const FORMAT_VERIFY_KEY: &str = "p_format_verified"; #[derive(Clone)] pub struct Event { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 02bab9e97..a742057f9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -81,10 +81,15 @@ pub async fn ingest( return Err(PostError::OtelNotSupported); } + let mut p_custom_fields = get_custom_fields_from_header(&req); + let fields = match &log_source { - LogSource::Custom(src) => { - KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)? - } + LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log( + &mut json, + &mut p_custom_fields, + src, + extract_log, + )?, _ => HashSet::new(), }; @@ -114,7 +119,7 @@ pub async fn ingest( PARSEABLE .add_update_log_source(&stream_name, log_source_entry) .await?; - let p_custom_fields = get_custom_fields_from_header(req); + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) @@ -198,7 +203,7 @@ pub async fn handle_otel_logs_ingestion( .add_update_log_source(&stream_name, log_source_entry) .await?; - let p_custom_fields = get_custom_fields_from_header(req); + let p_custom_fields = get_custom_fields_from_header(&req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -256,7 +261,7 @@ pub async fn handle_otel_metrics_ingestion( .add_update_log_source(&stream_name, log_source_entry) .await?; - let p_custom_fields = get_custom_fields_from_header(req); + let p_custom_fields = get_custom_fields_from_header(&req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -315,7 +320,7 @@ pub async fn handle_otel_traces_ingestion( .add_update_log_source(&stream_name, log_source_entry) .await?; - let p_custom_fields = get_custom_fields_from_header(req); + let p_custom_fields = get_custom_fields_from_header(&req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -363,13 +368,18 @@ pub async fn post_event( .headers() .get(EXTRACT_LOG_KEY) .and_then(|h| h.to_str().ok()); - + let mut p_custom_fields = get_custom_fields_from_header(&req); match &log_source { LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { return Err(PostError::OtelNotSupported) } LogSource::Custom(src) => { - KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?; + KNOWN_SCHEMA_LIST.extract_from_inline_log( + &mut json, + &mut p_custom_fields, + src, + extract_log, + )?; } _ => {} } @@ -386,7 +396,7 @@ pub async fn post_event( }) .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; } - let p_custom_fields = get_custom_fields_from_header(req); + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 7e6029699..9b32b924b 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -145,7 +145,7 @@ async fn push_logs( Ok(()) } -pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap { +pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap { let user_agent = req .headers() .get(USER_AGENT) @@ -217,7 +217,7 @@ mod tests { .insert_header(("x-p-environment", "dev")) .to_http_request(); - let custom_fields = get_custom_fields_from_header(req); + let custom_fields = get_custom_fields_from_header(&req); assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); assert_eq!(custom_fields.get("environment").unwrap(), "dev"); @@ -230,7 +230,7 @@ mod tests { .insert_header((STREAM_NAME_HEADER_KEY, "teststream")) .to_http_request(); - let custom_fields = get_custom_fields_from_header(req); + let custom_fields = get_custom_fields_from_header(&req); assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); assert!(!custom_fields.contains_key(STREAM_NAME_HEADER_KEY)); @@ -243,7 +243,7 @@ mod tests { .insert_header((LOG_SOURCE_KEY, "otel-logs")) .to_http_request(); - let custom_fields = get_custom_fields_from_header(req); + let custom_fields = get_custom_fields_from_header(&req); assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); assert_eq!(custom_fields.get(FORMAT_KEY).unwrap(), "otel-logs"); @@ -255,7 +255,7 @@ mod tests { .insert_header(("x-p-", "empty")) .to_http_request(); - let custom_fields = get_custom_fields_from_header(req); + let custom_fields = get_custom_fields_from_header(&req); assert_eq!(custom_fields.len(), 2); assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), ""); From 1675057d208a735c203bd68d68772711a350cf70 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 26 Apr 2025 02:35:34 -0400 Subject: [PATCH 2/4] update regex for alb-log --- resources/formats.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/formats.json b/resources/formats.json index 5965aa4e1..d2ce815b0 100644 --- a/resources/formats.json +++ b/resources/formats.json @@ -72,7 +72,7 @@ "name": "alb_log", "regex": [ { - "pattern": "^(?http|https|h2|ws|wss) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[^ ]+) (?[\\w\\.:]+):(?\\d+) (?[\\w\\.:]+):(?\\d+) (?[-\\d\\.]+) (?[-\\d\\.]+) (?[-\\d\\.]+) (?\\d+|-) (?\\d+|-) (?\\d+) (?\\d+) (?POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?[^ ]+) (?[^ ]+) (?[^ \\(]+) (?:\\([^\\)]+\\))? (?[\\w-]+) (?[\\w\\.-]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?\\d+) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[\\d\\.:]+) (?\\d+|-) (?[^ ]+) (?[^ ]+) (?[^ ]+)$", + "pattern": "^(?http|https|h2|ws|wss) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[^ ]+) (?[\\w\\.:]+):(?\\d+) (?[\\w\\.:]+):(?\\d+) (?[-\\d\\.]+) (?[-\\d\\.]+) (?[-\\d\\.]+) (?\\d+|-) (?\\d+|-) (?\\d+) (?\\d+) (?POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?[^ ]+) (?[^ ]+) (?.+?) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?\\d+) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[\\d\\.:]+) (?\\d+|-) (?[^ ]+) (?[^ ]+) (?[^ ]+)$", "fields": [ "type", "timestamp", From f217b3d8ffd561dabcb28fe5c3d8866c56a7aa88 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 26 Apr 2025 04:52:56 -0400 Subject: [PATCH 3/4] add regex for alb-log --- resources/formats.json | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/resources/formats.json b/resources/formats.json index d2ce815b0..251c6c566 100644 --- a/resources/formats.json +++ b/resources/formats.json @@ -109,6 +109,44 @@ "classification_reason", "track_id" ] + }, + { + "pattern": "^(?http|https|h2|ws|wss) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[^ ]+) (?[^:]+):(?\\d+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?.*?) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z|[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+) (?[^ ]+|[^ ]*) (?[^ ]+|[^ ]*) (?[^ ]+|[^ ]*) (?[^ ]+|[^ ]*) (?TID_[a-f0-9]+)$", + "fields": [ + "type", + "timestamp", + "elb", + "client_ip", + "client_port", + "target_ip", + "request_processing_time", + "target_processing_time", + "response_processing_time", + "elb_status_code", + "target_status_code", + "received_bytes", + "sent_bytes", + "cs_method", + "cs_uri_whole", + "cs_version", + "user_agent", + "ssl_cipher", + "ssl_protocol", + "target_group_arn", + "trace_id", + "domain_name", + "chosen_cert_arn", + "action_executed", + "request_creation_time", + "redirect_url", + "redirect_proto", + "redirect_port", + "target_ip_port", + "target_status_desc", + "classification", + "classification_reason", + "track_id" + ] } ] }, From 226bb5f1dbe6ffea0a3bc3d4f4cdbce4979161b4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 26 Apr 2025 09:13:12 -0400 Subject: [PATCH 4/4] add regex for alb-log --- resources/formats.json | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/resources/formats.json b/resources/formats.json index 251c6c566..faeb1f7b8 100644 --- a/resources/formats.json +++ b/resources/formats.json @@ -147,6 +147,16 @@ "classification_reason", "track_id" ] + }, + { + "pattern": "^(?\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?[\\d\\.]+) (?\\d+) (?\\d+) (- ){7}(?TID_[a-f0-9]+)$", + "fields": [ + "timestamp", + "client_ip", + "client_port", + "target_port", + "track_id" + ] } ] },