Skip to content

update: add custom field p_format_verified #1303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion resources/formats.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"name": "alb_log",
"regex": [
{
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>[^ \\(]+) (?:\\([^\\)]+\\))? (?<ssl_cipher>[\\w-]+) (?<ssl_protocol>[\\w\\.-]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>.+?) (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
"fields": [
"type",
"timestamp",
Expand Down Expand Up @@ -109,6 +109,54 @@
"classification_reason",
"track_id"
]
},
{
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[^:]+):(?<client_port>\\d+) (?<target_ip>[^ ]+) (?<request_processing_time>[^ ]+) (?<target_processing_time>[^ ]+) (?<response_processing_time>[^ ]+) (?<elb_status_code>[^ ]+) (?<target_status_code>[^ ]+) (?<received_bytes>[^ ]+) (?<sent_bytes>[^ ]+) (?<cs_method>[^ ]+) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>.*?) (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>[^ ]+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z|[^ ]+) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[^ ]+|[^ ]*) (?<target_status_desc>[^ ]+|[^ ]*) (?<classification>[^ ]+|[^ ]*) (?<classification_reason>[^ ]+|[^ ]*) (?<track_id>TID_[a-f0-9]+)$",
"fields": [
"type",
"timestamp",
"elb",
"client_ip",
"client_port",
"target_ip",
"request_processing_time",
"target_processing_time",
"response_processing_time",
"elb_status_code",
"target_status_code",
"received_bytes",
"sent_bytes",
"cs_method",
"cs_uri_whole",
"cs_version",
"user_agent",
"ssl_cipher",
"ssl_protocol",
"target_group_arn",
"trace_id",
"domain_name",
"chosen_cert_arn",
"action_executed",
"request_creation_time",
"redirect_url",
"redirect_proto",
"redirect_port",
"target_ip_port",
"target_status_desc",
"classification",
"classification_reason",
"track_id"
]
},
{
"pattern": "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<client_ip>[\\d\\.]+) (?<client_port>\\d+) (?<target_port>\\d+) (- ){7}(?<track_id>TID_[a-f0-9]+)$",
"fields": [
"timestamp",
"client_ip",
"client_port",
"target_port",
"track_id"
]
}
]
},
Expand Down
21 changes: 18 additions & 3 deletions src/event/format/known_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use serde::{Deserialize, Deserializer};
use serde_json::{Map, Value};
use tracing::error;

use crate::event::FORMAT_VERIFY_KEY;

/// Predefined JSON with known textual logging formats
const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");

Expand Down Expand Up @@ -120,11 +122,21 @@ impl SchemaDefinition {
}
}

// add `P_FORMAT_VERIFY_KEY` to the object
obj.insert(
FORMAT_VERIFY_KEY.to_string(),
Value::String("true".to_string()),
);

obj.extend(extracted_fields);

return Some(format.fields.clone());
}

// add `P_FORMAT_VERIFY_KEY` to the object
obj.insert(
FORMAT_VERIFY_KEY.to_string(),
Value::String("false".to_string()),
);
None
}
}
Expand Down Expand Up @@ -180,6 +192,7 @@ impl EventProcessor {
pub fn extract_from_inline_log(
&self,
json: &mut Value,
p_custom_fields: &mut HashMap<String, String>,
log_source: &str,
extract_log: Option<&str>,
) -> Result<HashSet<String>, Error> {
Expand All @@ -197,15 +210,17 @@ impl EventProcessor {
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
fields.extend(known_fields);
} else {
return Err(Error::Unacceptable(log_source.to_owned()));
// add `P_FORMAT_VERIFY_KEY` to the object
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
}
}
}
Value::Object(event) => {
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
return Ok(known_fields);
} else {
return Err(Error::Unacceptable(log_source.to_owned()));
// add `P_FORMAT_VERIFY_KEY` to the object
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
}
}
_ => unreachable!("We don't accept events of the form: {json}"),
Expand Down
1 change: 1 addition & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const USER_AGENT_KEY: &str = "p_user_agent";
pub const SOURCE_IP_KEY: &str = "p_src_ip";
pub const FORMAT_KEY: &str = "p_format";
pub const FORMAT_VERIFY_KEY: &str = "p_format_verified";

#[derive(Clone)]
pub struct Event {
Expand Down
30 changes: 20 additions & 10 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ pub async fn ingest(
return Err(PostError::OtelNotSupported);
}

let mut p_custom_fields = get_custom_fields_from_header(&req);

let fields = match &log_source {
LogSource::Custom(src) => {
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?
}
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
)?,
_ => HashSet::new(),
};

Expand Down Expand Up @@ -114,7 +119,7 @@ pub async fn ingest(
PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -198,7 +203,7 @@ pub async fn handle_otel_logs_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -256,7 +261,7 @@ pub async fn handle_otel_metrics_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -315,7 +320,7 @@ pub async fn handle_otel_traces_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -363,13 +368,18 @@ pub async fn post_event(
.headers()
.get(EXTRACT_LOG_KEY)
.and_then(|h| h.to_str().ok());

let mut p_custom_fields = get_custom_fields_from_header(&req);
match &log_source {
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
return Err(PostError::OtelNotSupported)
}
LogSource::Custom(src) => {
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
)?;
}
_ => {}
}
Expand All @@ -386,7 +396,7 @@ pub async fn post_event(
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
Expand Down
10 changes: 5 additions & 5 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn push_logs(
Ok(())
}

pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, String> {
let user_agent = req
.headers()
.get(USER_AGENT)
Expand Down Expand Up @@ -217,7 +217,7 @@ mod tests {
.insert_header(("x-p-environment", "dev"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert_eq!(custom_fields.get("environment").unwrap(), "dev");
Expand All @@ -230,7 +230,7 @@ mod tests {
.insert_header((STREAM_NAME_HEADER_KEY, "teststream"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert!(!custom_fields.contains_key(STREAM_NAME_HEADER_KEY));
Expand All @@ -243,7 +243,7 @@ mod tests {
.insert_header((LOG_SOURCE_KEY, "otel-logs"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert_eq!(custom_fields.get(FORMAT_KEY).unwrap(), "otel-logs");
Expand All @@ -255,7 +255,7 @@ mod tests {
.insert_header(("x-p-", "empty"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.len(), 2);
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "");
Expand Down
Loading