From 51d3a2b89de4aa5f81c97a58a58846c950571cd2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 5 May 2024 12:09:28 +0530 Subject: [PATCH] fix for time partition limit additional header to be provided X-P-Time-Partition-Limit with a value of unsigned integer with ending 'd' eg. 90d for 90 days if not provided, default constraint of 30 days will be applied using this, user can ingest logs older than 30 days as well fixes #752 --- server/src/event/format/json.rs | 2 +- server/src/handlers.rs | 1 + server/src/handlers/http/ingest.rs | 8 ++++- server/src/handlers/http/logstream.rs | 39 ++++++++++++++++++++-- server/src/metadata.rs | 11 +++++-- server/src/storage.rs | 10 ++++-- server/src/storage/object_storage.rs | 6 ++++ server/src/utils/json.rs | 12 +++++-- server/src/utils/json/flatten.rs | 47 +++++++++++++++++---------- 9 files changed, 108 insertions(+), 28 deletions(-) diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index eac24c1da..b2b9f88c9 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -48,7 +48,7 @@ impl EventFormat for Event { static_schema_flag: Option, time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, false)?; + let data = flatten_json_body(self.data, None, None, false)?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 57e4aebcb..d610011cf 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; +const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index edfc75efa..7532ad59e 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -104,6 +104,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; let time_partition = object_store_format.time_partition; + let time_partition_limit = object_store_format.time_partition_limit; let static_schema_flag = object_store_format.static_schema_flag; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); @@ -129,7 +130,11 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .process() .await?; } else { - let data = convert_array_to_object(body_val.clone(), time_partition.clone())?; + let data = convert_array_to_object( + body_val.clone(), + time_partition.clone(), + time_partition_limit, + )?; for value in data { let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); parsed_timestamp = body_timestamp @@ -210,6 +215,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr stream_name.to_string(), "", "", + "", Arc::new(Schema::empty()), ) .await?; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 1496a403c..1fa5a10be 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -21,7 +21,7 @@ use super::base_path_without_preceding_slash; use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; -use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; +use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY}; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; @@ -40,6 +40,7 @@ use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; +use std::num::NonZeroU32; use std::sync::Arc; pub async fn delete(req: HttpRequest) -> Result { @@ -191,6 +192,29 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result().is_err() { + return Err(StreamError::Custom { + msg: "could not convert duration to an unsigned number".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } else { + time_partition_in_days = days; + } + } let static_schema_flag = if let Some((_, static_schema_flag)) = req .headers() .iter() @@ -235,7 +259,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result, ) -> Result<(), CreateStreamError> { @@ -528,6 +560,7 @@ pub async fn create_stream( .create_stream( &stream_name, time_partition, + time_partition_limit, static_schema_flag, schema.clone(), ) @@ -557,6 +590,7 @@ pub async fn create_stream( stream_name.to_string(), created_at, time_partition.to_string(), + time_partition_limit.to_string(), static_schema_flag.to_string(), static_schema, ); @@ -595,6 +629,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, pub time_partition: Option, + pub time_partition_limit: Option, pub static_schema_flag: Option, } @@ -166,6 +166,7 @@ impl StreamInfo { stream_name: String, created_at: String, time_partition: String, + time_partition_limit: String, static_schema_flag: String, static_schema: HashMap>, ) { @@ -181,6 +182,11 @@ impl StreamInfo { } else { Some(time_partition) }, + time_partition_limit: if time_partition_limit.is_empty() { + None + } else { + Some(time_partition_limit) + }, static_schema_flag: if static_schema_flag != "true" { None } else { @@ -237,6 +243,7 @@ impl StreamInfo { created_at: meta.created_at, first_event_at: meta.first_event_at, time_partition: meta.time_partition, + time_partition_limit: meta.time_partition_limit, static_schema_flag: meta.static_schema_flag, }; diff --git a/server/src/storage.rs b/server/src/storage.rs index f1efb5da8..3de62ef4f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -30,6 +30,8 @@ mod s3; pub mod staging; mod store_metadata; +use self::retention::Retention; +pub use self::staging::StorageDir; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; @@ -37,9 +39,6 @@ pub use store_metadata::{ put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, }; -use self::retention::Retention; -pub use self::staging::StorageDir; - // metadata file names in a Stream prefix pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; @@ -94,6 +93,8 @@ pub struct ObjectStoreFormat { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition_limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -109,6 +110,8 @@ pub struct StreamInfo { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition_limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -155,6 +158,7 @@ impl Default for ObjectStoreFormat { cache_enabled: false, retention: None, time_partition: None, + time_partition_limit: None, static_schema_flag: None, } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 3487948bc..b0846d0d8 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -127,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, time_partition: &str, + time_partition_limit: &str, static_schema_flag: &str, schema: Arc, ) -> Result<(), ObjectStorageError> { @@ -139,6 +140,11 @@ pub trait ObjectStorage: Sync + 'static { } else { format.time_partition = Some(time_partition.to_string()); } + if time_partition_limit.is_empty() { + format.time_partition_limit = None; + } else { + format.time_partition_limit = Some(time_partition_limit.to_string()); + } if static_schema_flag != "true" { format.static_schema_flag = None; } else { diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index c55ff362e..ad1572c72 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -24,16 +24,24 @@ pub mod flatten; pub fn flatten_json_body( body: serde_json::Value, time_partition: Option, + time_partition_limit: Option, validation_required: bool, ) -> Result { - flatten::flatten(body, "_", time_partition, validation_required) + flatten::flatten( + body, + "_", + time_partition, + time_partition_limit, + validation_required, + ) } pub fn convert_array_to_object( body: Value, time_partition: Option, + time_partition_limit: Option, ) -> Result, anyhow::Error> { - let data = flatten_json_body(body, time_partition, true)?; + let data = flatten_json_body(body, time_partition, time_partition_limit, true)?; let value_arr = match data { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index b41cec228..82f74a532 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -26,6 +26,7 @@ pub fn flatten( nested_value: Value, separator: &str, time_partition: Option, + time_partition_limit: Option, validation_required: bool, ) -> Result { match nested_value { @@ -34,6 +35,7 @@ pub fn flatten( let validate_time_partition_result = validate_time_partition( Value::Object(nested_dict.clone()), time_partition.clone(), + time_partition_limit.clone(), ); if validate_time_partition_result.is_ok() { let mut map = Map::new(); @@ -52,8 +54,11 @@ pub fn flatten( for _value in &mut arr { let value: Value = _value.clone(); if validation_required { - let validate_time_partition_result = - validate_time_partition(value, time_partition.clone()); + let validate_time_partition_result = validate_time_partition( + value, + time_partition.clone(), + time_partition_limit.clone(), + ); if validate_time_partition_result.is_ok() { let value = std::mem::replace(_value, Value::Null); @@ -85,10 +90,16 @@ pub fn flatten( pub fn validate_time_partition( value: Value, time_partition: Option, + time_partition_limit: Option, ) -> Result { if time_partition.is_none() { Ok(true) } else { + let time_partition_limit: i64 = if let Some(time_partition_limit) = time_partition_limit { + time_partition_limit.parse().unwrap_or(30) + } else { + 30 + }; let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); if body_timestamp.is_some() { if body_timestamp @@ -108,12 +119,14 @@ pub fn validate_time_partition( .unwrap() .naive_utc(); - if parsed_timestamp >= Utc::now().naive_utc() - Duration::days(30) { + if parsed_timestamp >= Utc::now().naive_utc() - Duration::days(time_partition_limit) + { Ok(true) } else { Err(anyhow!(format!( - "field {} value is more than a month old", - time_partition.unwrap() + "field {} value is more than {} days old", + time_partition.unwrap(), + time_partition_limit ))) } } else { @@ -245,19 +258,19 @@ mod tests { #[test] fn flatten_single_key_string() { let obj = json!({"key": "value"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); } #[test] fn flatten_single_key_int() { let obj = json!({"key": 1}); - assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); } #[test] fn flatten_multiple_key_value() { let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); } #[test] @@ -265,7 +278,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key":"value"}}); assert_eq!( json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -274,7 +287,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); assert_eq!( json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -283,7 +296,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); assert_eq!( json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -292,7 +305,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -301,7 +314,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -310,7 +323,7 @@ mod tests { let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -319,7 +332,7 @@ mod tests { let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } @@ -328,14 +341,14 @@ mod tests { let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, false).unwrap() + flatten(obj, ".", None, None, false).unwrap() ); } #[test] fn flatten_mixed_object() { let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".", None, false).is_err()); + assert!(flatten(obj, ".", None, None, false).is_err()); } #[test]