diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 094dbeb1b..4032c92fa 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -26,6 +26,7 @@ use anyhow::{anyhow, Error as AnyError}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; +use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::{metadata::SchemaVersion, utils::arrow::get_field}; @@ -38,7 +39,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"]; type EventSchema = Vec>; /// Source of the logs, used to perform special processing for certain sources -#[derive(Default, Debug, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum LogSource { // AWS Kinesis sends logs in the format of a json array Kinesis, @@ -51,6 +52,8 @@ pub enum LogSource { // OpenTelemetry sends traces according to the specification as explained here // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 OtelTraces, + // Internal Stream format + Pmeta, #[default] // Json object or array Json, @@ -64,7 +67,8 @@ impl From<&str> for LogSource { "otel-logs" => LogSource::OtelLogs, "otel-metrics" => LogSource::OtelMetrics, "otel-traces" => LogSource::OtelTraces, - custom => LogSource::Custom(custom.to_owned()), + "pmeta" => LogSource::Pmeta, + _ => LogSource::Json, } } } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6f3cb3f07..98ceaf693 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -68,7 +68,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result { let mut stream_exists = false; if STREAM_INFO.stream_exists(stream_name) { @@ -288,6 +309,7 @@ pub async fn create_stream_if_not_exists( false, Arc::new(Schema::empty()), stream_type, + log_source, ) .await?; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index cafe56190..ce7517c3a 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -26,7 +26,7 @@ use super::modal::utils::logstream_utils::{ use super::query::update_schema_when_distributed; use crate::alerts::Alerts; use crate::catalog::get_first_event; -use crate::event::format::override_data_type; +use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::{SchemaVersion, STREAM_INFO}; @@ -35,8 +35,8 @@ use crate::option::{Mode, CONFIG}; use crate::rbac::role::Action; use crate::rbac::Users; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; -use crate::storage::StreamType; -use crate::storage::{retention::Retention, StorageDir, StreamInfo}; +use crate::storage::{retention::Retention, StorageDir}; +use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; use crate::{event, stats}; @@ -484,6 +484,7 @@ fn remove_id_from_alerts(value: &mut Value) { } } +#[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, time_partition: &str, @@ -492,6 +493,7 @@ pub async fn create_stream( static_schema_flag: bool, schema: Arc, stream_type: &str, + log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -509,6 +511,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + log_source.clone(), ) .await { @@ -533,6 +536,7 @@ pub async fn create_stream( static_schema, stream_type, SchemaVersion::V1, // New stream + log_source, ); } Err(err) => { @@ -583,6 +587,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result Result<(), StreamError> { - if let Ok(stream_exists) = - create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await + if let Ok(stream_exists) = create_stream_if_not_exists( + INTERNAL_STREAM_NAME, + &StreamType::Internal.to_string(), + LogSource::Pmeta, + ) + .await { if stream_exists { return Ok(()); @@ -894,9 +903,9 @@ pub mod error { mod tests { use crate::handlers::http::logstream::error::StreamError; use crate::handlers::http::logstream::get_stats; + use crate::handlers::http::modal::utils::logstream_utils::fetch_headers_from_put_stream_request; use actix_web::test::TestRequest; use anyhow::bail; - #[actix_web::test] #[should_panic] async fn get_stats_panics_without_logstream() { @@ -915,4 +924,41 @@ mod tests { _ => bail!("expected StreamNotFound error"), } } + + #[actix_web::test] + async fn header_without_log_source() { + let req = TestRequest::default().to_http_request(); + let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + assert_eq!(log_source, crate::event::format::LogSource::Json); + } + + #[actix_web::test] + async fn header_with_known_log_source() { + let mut req = TestRequest::default() + .insert_header(("X-P-Log-Source", "pmeta")) + .to_http_request(); + let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + assert_eq!(log_source, crate::event::format::LogSource::Pmeta); + + req = TestRequest::default() + .insert_header(("X-P-Log-Source", "otel-logs")) + .to_http_request(); + let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); + + req = TestRequest::default() + .insert_header(("X-P-Log-Source", "kinesis")) + .to_http_request(); + let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + assert_eq!(log_source, crate::event::format::LogSource::Kinesis); + } + + #[actix_web::test] + async fn header_with_unknown_log_source() { + let req = TestRequest::default() + .insert_header(("X-P-Log-Source", "teststream")) + .to_http_request(); + let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + assert_eq!(log_source, crate::event::format::LogSource::Json); + } } diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 3543198d0..61a92cd76 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -24,10 +24,11 @@ use bytes::Bytes; use http::StatusCode; use crate::{ + event::format::LogSource, handlers::{ http::logstream::error::{CreateStreamError, StreamError}, - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, - TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, + TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, metadata::{self, SchemaVersion, STREAM_INFO}, option::{Mode, CONFIG}, @@ -48,6 +49,7 @@ pub async fn create_update_stream( static_schema_flag, update_stream_flag, stream_type, + log_source, ) = fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag { @@ -113,6 +115,7 @@ pub async fn create_update_stream( static_schema_flag, schema, &stream_type, + log_source, ) .await?; @@ -167,13 +170,14 @@ async fn validate_and_update_custom_partition( pub fn fetch_headers_from_put_stream_request( req: &HttpRequest, -) -> (String, String, String, bool, bool, String) { +) -> (String, String, String, bool, bool, String, LogSource) { let mut time_partition = String::default(); let mut time_partition_limit = String::default(); let mut custom_partition = String::default(); let mut static_schema_flag = false; let mut update_stream_flag = false; let mut stream_type = StreamType::UserDefined.to_string(); + let mut log_source = LogSource::default(); req.headers().iter().for_each(|(key, value)| { if key == TIME_PARTITION_KEY { time_partition = value.to_str().unwrap().to_string(); @@ -193,6 +197,9 @@ pub fn fetch_headers_from_put_stream_request( if key == STREAM_TYPE_KEY { stream_type = value.to_str().unwrap().to_string(); } + if key == LOG_SOURCE_KEY { + log_source = LogSource::from(value.to_str().unwrap()); + } }); ( @@ -202,6 +209,7 @@ pub fn fetch_headers_from_put_stream_request( static_schema_flag, update_stream_flag, stream_type, + log_source, ) } @@ -378,6 +386,7 @@ pub async fn update_custom_partition_in_stream( Ok(()) } +#[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, time_partition: &str, @@ -386,6 +395,7 @@ pub async fn create_stream( static_schema_flag: bool, schema: Arc, stream_type: &str, + log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -403,6 +413,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + log_source.clone(), ) .await { @@ -427,6 +438,7 @@ pub async fn create_stream( static_schema, stream_type, SchemaVersion::V1, // New stream + log_source, ); } Err(err) => { @@ -476,7 +488,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< let static_schema_flag = stream_metadata.static_schema_flag; let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); let schema_version = stream_metadata.schema_version; - + let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( stream_name.to_string(), stream_metadata.created_at, @@ -487,6 +499,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< static_schema, stream_type, schema_version, + log_source, ); } else { return Ok(false); diff --git a/src/kafka.rs b/src/kafka.rs index 45c7d5220..ec8d6c244 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -34,6 +34,7 @@ use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; use crate::audit::AuditLogBuilder; +use crate::event::format::LogSource; use crate::option::CONFIG; use crate::{ event::{ @@ -180,7 +181,12 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let stream_name = msg.topic(); // stream should get created only if there is an incoming event, not before that - create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + create_stream_if_not_exists( + stream_name, + &StreamType::UserDefined.to_string(), + LogSource::default(), + ) + .await?; let schema = resolve_schema(stream_name)?; let event = format::json::Event { diff --git a/src/metadata.rs b/src/metadata.rs index 94954c4c0..c3ff0fce7 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -30,6 +30,7 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; use crate::catalog::snapshot::ManifestItem; +use crate::event::format::LogSource; use crate::metrics::{ fetch_stats_from_storage, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, @@ -75,6 +76,7 @@ pub struct LogStreamMetadata { pub static_schema_flag: bool, pub hot_tier_enabled: Option, pub stream_type: Option, + pub log_source: LogSource, } // It is very unlikely that panic will occur when dealing with metadata. @@ -271,6 +273,7 @@ impl StreamInfo { static_schema: HashMap>, stream_type: &str, schema_version: SchemaVersion, + log_source: LogSource, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -298,6 +301,7 @@ impl StreamInfo { }, stream_type: Some(stream_type.to_string()), schema_version, + log_source, ..Default::default() }; map.insert(stream_name, metadata); @@ -430,6 +434,7 @@ pub async fn load_stream_metadata_on_server_start( static_schema_flag, hot_tier_enabled, stream_type, + log_source, .. } = if !stream_metadata_value.is_null() { serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap() @@ -466,6 +471,7 @@ pub async fn load_stream_metadata_on_server_start( static_schema_flag, hot_tier_enabled, stream_type, + log_source, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 573800812..21c651aff 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -18,6 +18,7 @@ use crate::{ catalog::snapshot::Snapshot, + event::format::LogSource, metadata::{error::stream_info::MetadataError, SchemaVersion}, stats::FullStats, utils::json::{deserialize_string_as_true, serialize_bool_as_true}, @@ -116,6 +117,8 @@ pub struct ObjectStoreFormat { #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, pub stream_type: Option, + #[serde(default)] + pub log_source: LogSource, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -139,6 +142,7 @@ pub struct StreamInfo { )] pub static_schema_flag: bool, pub stream_type: Option, + pub log_source: LogSource, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -205,6 +209,7 @@ impl Default for ObjectStoreFormat { custom_partition: None, static_schema_flag: false, hot_tier_enabled: None, + log_source: LogSource::default(), } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 7252cfaf0..c9ede4871 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -27,6 +27,7 @@ use super::{ }; use crate::correlation::{CorrelationConfig, CorrelationError}; +use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; @@ -155,6 +156,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { static_schema_flag: bool, schema: Arc, stream_type: &str, + log_source: LogSource, ) -> Result { let format = ObjectStoreFormat { created_at: Local::now().to_rfc3339(), @@ -169,6 +171,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { id: CONFIG.parseable.username.clone(), group: CONFIG.parseable.username.clone(), }, + log_source, ..Default::default() }; let format_json = to_bytes(&format);