diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 0cba267bb..ab9116eb7 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::{EventFormat, Metadata, Tags}; +use super::{EventFormat, LogSource, Metadata, Tags}; use crate::{ metadata::SchemaVersion, utils::{arrow::get_field, json::flatten_json_body}, @@ -52,7 +52,7 @@ impl EventFormat for Event { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, - log_source: &str, + log_source: &LogSource, ) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> { let data = flatten_json_body( self.data, diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 0c65d1402..593e82f1e 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -43,6 +43,38 @@ type Tags = String; type Metadata = String; type EventSchema = Vec<Arc<Field>>; +/// Source of the logs, used to perform special processing for certain sources +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub enum LogSource { + // AWS Kinesis sends logs in the format of a json array + Kinesis, + // OpenTelemetry sends logs according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1 + OtelLogs, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto + OtelMetrics, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 + OtelTraces, + #[default] + // Json object or array + Json, + Custom(String), +} + +impl From<&str> for LogSource { + fn from(s: &str) -> Self { + match s { + "kinesis" => LogSource::Kinesis, + "otel-logs" => LogSource::OtelLogs, + "otel-metrics" => LogSource::OtelMetrics, + "otel-traces" => LogSource::OtelTraces, + custom => LogSource::Custom(custom.to_owned()), + } + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { @@ -54,7 +86,7 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, - log_source: &str, + log_source: &LogSource, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>; @@ -65,7 +97,7 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, - log_source: &str, + log_source: &LogSource, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data( storage_schema, diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0961f6b6d..60d3eab93 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -20,16 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; +use crate::event::format::LogSource; use crate::event::{ self, error::EventError, format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::{ - LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES, - STREAM_NAME_HEADER_KEY, -}; +use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; @@ -95,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< metadata: String::default(), }; // For internal streams, use old schema - event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")? + event.into_recordbatch(&schema, None, None, SchemaVersion::V0, &LogSource::default())? }; event::Event { rb, @@ -127,8 +125,8 @@ pub async fn handle_otel_logs_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - let log_source = log_source.to_str().unwrap(); - if log_source != LOG_SOURCE_OTEL_LOGS { + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelLogs { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-logs for ingesting otel logs" ))); @@ -142,7 +140,7 @@ pub async fn handle_otel_logs_ingestion( let mut json = flatten_otel_logs(&logs); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, log_source).await?; + push_logs(&stream_name, &req, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -161,8 +159,8 @@ pub async fn handle_otel_metrics_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - let log_source = log_source.to_str().unwrap(); - if log_source != LOG_SOURCE_OTEL_METRICS { + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelMetrics { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-metrics for ingesting otel metrics" ))); @@ -175,7 +173,7 @@ pub async fn handle_otel_metrics_ingestion( let mut json = flatten_otel_metrics(metrics); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, log_source).await?; + push_logs(&stream_name, &req, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -195,8 +193,8 @@ pub async fn handle_otel_traces_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - let log_source = log_source.to_str().unwrap(); - if log_source != LOG_SOURCE_OTEL_TRACES { + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelTraces { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-traces for ingesting otel traces" ))); @@ -209,7 +207,7 @@ pub async fn handle_otel_traces_ingestion( let mut json = flatten_otel_traces(&traces); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, log_source).await?; + push_logs(&stream_name, &req, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -371,7 +369,7 @@ mod tests { use serde_json::json; use crate::{ - event, + event::{self, format::LogSource}, handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, metadata::SchemaVersion, }; @@ -420,7 +418,7 @@ mod tests { None, None, SchemaVersion::V0, - "", + &LogSource::default(), ) .unwrap(); @@ -471,7 +469,7 @@ mod tests { None, None, SchemaVersion::V0, - "", + &LogSource::default(), ) .unwrap(); @@ -505,8 +503,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = - into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -538,7 +544,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -556,8 +571,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = - into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -576,7 +599,7 @@ mod tests { None, None, SchemaVersion::V0, - "" + &LogSource::default() ) .is_err()) } @@ -608,7 +631,7 @@ mod tests { None, None, SchemaVersion::V0, - "", + &LogSource::default(), ) .unwrap(); @@ -665,7 +688,7 @@ mod tests { None, None, SchemaVersion::V0, - "", + &LogSource::default(), ) .unwrap(); @@ -715,8 +738,16 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (rb, _) = - into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -765,7 +796,16 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -800,7 +840,7 @@ mod tests { None, None, SchemaVersion::V0, - "", + &LogSource::default(), ) .unwrap(); @@ -881,7 +921,7 @@ mod tests { None, None, SchemaVersion::V1, - "", + &LogSource::default(), ) .unwrap(); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3bafca463..1ddd207d0 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -27,12 +27,12 @@ use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ - format::{self, EventFormat}, + format::{self, EventFormat, LogSource}, Event, }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, @@ -44,26 +44,30 @@ pub async fn flatten_and_push_logs( body: Bytes, stream_name: &str, ) -> Result<(), PostError> { - let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { - push_logs(stream_name, &req, &body, "").await?; - return Ok(()); - }; - let log_source = log_source.to_str().unwrap(); - if log_source == LOG_SOURCE_KINESIS { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &req, &body, "").await?; + let log_source = req + .headers() + .get(LOG_SOURCE_KEY) + .map(|h| h.to_str().unwrap_or("")) + .map(LogSource::from) + .unwrap_or_default(); + + match log_source { + LogSource::Kinesis => { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name, &req, &body, &LogSource::default()).await?; + } + } + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::Invalid(anyhow!( + "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + ))); + } + _ => { + push_logs(stream_name, &req, &body, &log_source).await?; } - } else if log_source.contains("otel") { - return Err(PostError::Invalid(anyhow!( - "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" - ))); - } else { - tracing::warn!("Unknown log source: {}", log_source); - push_logs(stream_name, &req, &body, "").await?; } - Ok(()) } @@ -71,7 +75,7 @@ pub async fn push_logs( stream_name: &str, req: &HttpRequest, body: &Bytes, - log_source: &str, + log_source: &LogSource, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; @@ -143,7 +147,7 @@ pub fn into_event_batch( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, - log_source: &str, + log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index c5e7f2e6a..e18990800 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -45,17 +45,5 @@ const TRINO_USER: &str = "x-trino-user"; // constants for log Source values for known sources and formats const LOG_SOURCE_KINESIS: &str = "kinesis"; -// OpenTelemetry sends logs according to the specification as explained here -// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1 -const LOG_SOURCE_OTEL_LOGS: &str = "otel-logs"; - -// OpenTelemetry sends traces according to the specification as explained here -// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 -const LOG_SOURCE_OTEL_METRICS: &str = "otel-metrics"; - -// OpenTelemetry sends traces according to the specification as explained here -// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto -const LOG_SOURCE_OTEL_TRACES: &str = "otel-traces"; - // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; diff --git a/src/kafka.rs b/src/kafka.rs index bb35cc863..b917eca83 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; +use crate::event::format::LogSource; use crate::option::CONFIG; use crate::{ event::{ @@ -198,7 +199,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { static_schema_flag.as_ref(), time_partition.as_ref(), schema_version, - "", + &LogSource::default(), ) .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 46f81a193..fd6992c72 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -21,6 +21,7 @@ use std::num::NonZeroU32; use serde_json; use serde_json::Value; +use crate::event::format::LogSource; use crate::metadata::SchemaVersion; pub mod flatten; @@ -32,9 +33,13 @@ pub fn flatten_json_body( custom_partition: Option<&String>, schema_version: SchemaVersion, validation_required: bool, - log_source: &str, + log_source: &LogSource, ) -> Result<Value, anyhow::Error> { - let mut nested_value = if schema_version == SchemaVersion::V1 && !log_source.contains("otel") { + let mut nested_value = if schema_version == SchemaVersion::V1 + && matches!( + log_source, + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces + ) { flatten::generic_flattening(body)? } else { body @@ -58,7 +63,7 @@ pub fn convert_array_to_object( time_partition_limit: Option<NonZeroU32>, custom_partition: Option<&String>, schema_version: SchemaVersion, - log_source: &str, + log_source: &LogSource, ) -> Result<Vec<Value>, anyhow::Error> { let data = flatten_json_body( body,