From bc730eed9edef7673a48b4b59b6da07fc4bfe4d4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhacloudsurfex@users.noreply.github.com> Date: Thu, 7 Mar 2024 11:26:06 +0530 Subject: [PATCH] feat: allow historical data ingestion based on user defined time (#683) This PR adds enhancement to use a user provided timestamp for partition in ingesting logs instead of using server time. User needs to add custom header X-P-Time-Partition (optional) at stream creation api to allow ingestion/query using timestamp column from the log data instead of server time p_timestamp This is time_partition field name is stored in stream.json and in memory STREAM_INFO in ingest api. Server checks if timestamp column name exists in the log event, if not, throw exception. Also, checks if timestamp value can be parsed into datetime, if not, throw exception arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time parquet file name gets a random number attached to it. This is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column. Fixes #671 Fixes #685 --- server/src/catalog.rs | 99 ++++++---- server/src/catalog/manifest.rs | 3 +- server/src/event.rs | 15 +- server/src/event/format.rs | 4 +- server/src/event/writer.rs | 29 +-- server/src/event/writer/file_writer.rs | 43 ++--- server/src/handlers.rs | 2 +- server/src/handlers/http/ingest.rs | 203 +++++++++++---------- server/src/handlers/http/logstream.rs | 36 +++- server/src/handlers/http/query.rs | 3 +- server/src/metadata.rs | 11 +- server/src/query.rs | 83 ++++++--- server/src/query/stream_schema_provider.rs | 98 ++++++---- server/src/storage.rs | 3 + server/src/storage/object_storage.rs | 21 ++- server/src/storage/staging.rs | 30 +-- server/src/utils/json.rs | 10 + 17 files changed, 436 insertions(+), 257 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 3ffdd21a1..f44159612 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -23,6 +23,7 @@ use relative_path::RelativePathBuf; use crate::{ catalog::manifest::Manifest, + event::DEFAULT_TIMESTAMP_KEY, query::PartialTimeFilter, storage::{ObjectStorage, ObjectStorageError}, }; @@ -69,25 +70,46 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == "p_timestamp") - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.max) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), +fn get_file_bounds( + file: &manifest::File, + partition_column: String, +) -> (DateTime, DateTime) { + if partition_column == DEFAULT_TIMESTAMP_KEY { + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .as_ref() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.max) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } + } else { + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .as_ref() + .unwrap() + { + column::TypedStatistics::String(stats) => ( + stats.min.parse::>().unwrap(), + stats.max.parse::>().unwrap(), + ), + _ => unreachable!(), + } } } @@ -97,10 +119,19 @@ pub async fn update_snapshot( change: manifest::File, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; - - let (lower_bound, _) = get_file_bounds(&change); + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; + let time_partition = meta.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(&change, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; let pos = manifests.iter().position(|item| { item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); @@ -109,8 +140,10 @@ pub async fn update_snapshot( // This updates an existing file so there is no need to create a snapshot entry. if let Some(pos) = pos { let info = &mut manifests[pos]; - let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - let Some(mut manifest) = storage.get_manifest(&path).await? else { + let manifest_path = + partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + + let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else { return Err(ObjectStorageError::UnhandledError( "Manifest found in snapshot but not in object-storage" .to_string() @@ -118,7 +151,7 @@ pub async fn update_snapshot( )); }; manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; + storage.put_manifest(&manifest_path, manifest).await?; } else { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound @@ -148,7 +181,7 @@ pub async fn update_snapshot( time_upper_bound: upper_bound, }; manifests.push(new_snapshot_entriy); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; } Ok(()) @@ -160,13 +193,13 @@ pub async fn remove_manifest_from_snapshot( dates: Vec, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; Ok(()) } @@ -175,8 +208,8 @@ pub async fn get_first_event( stream_name: &str, ) -> Result, ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); @@ -199,7 +232,7 @@ pub async fn get_first_event( }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(first_event); + let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); return Ok(Some(first_event_at)); } diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index bafed3dd5..ad5b32422 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -112,7 +112,6 @@ pub fn create_from_parquet_file( let columns = column_statistics(row_groups); manifest_file.columns = columns.into_values().collect(); let mut sort_orders = sort_order(row_groups); - if let Some(last_sort_order) = sort_orders.pop() { if sort_orders .into_iter() @@ -155,7 +154,7 @@ fn sort_order( }) .collect_vec(); - sort_orders.push(sort_order) + sort_orders.push(sort_order); } sort_orders } diff --git a/server/src/event.rs b/server/src/event.rs index 62db832bf..c517cdbda 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -26,10 +26,10 @@ use itertools::Itertools; use std::sync::Arc; -use crate::metadata; - use self::error::EventError; pub use self::writer::STREAM_WRITERS; +use crate::metadata; +use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -42,6 +42,7 @@ pub struct Event { pub origin_format: &'static str, pub origin_size: u64, pub is_first_event: bool, + pub parsed_timestamp: NaiveDateTime, } // Events holds the schema related to a each event for a single log stream @@ -54,7 +55,12 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event(&self.stream_name, &key, self.rb.clone())?; + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + )?; metadata::STREAM_INFO.update_stats( &self.stream_name, @@ -81,8 +87,9 @@ impl Event { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?; + STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; Ok(()) } } diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 55f8e106a..cd11e440b 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -53,14 +53,14 @@ pub trait EventFormat: Sized { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_METADATA_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_METADATA_KEY )); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_TIMESTAMP_KEY diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..f1befd061 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -30,7 +30,7 @@ use crate::utils; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; -use chrono::Utc; +use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -48,6 +48,7 @@ impl Writer { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -56,7 +57,8 @@ impl Writer { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - self.disk.push(stream_name, schema_key, &rb)?; + self.disk + .push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) } @@ -72,15 +74,18 @@ impl WriterTable { stream_name: &str, schema_key: &str, record: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } None => { drop(hashmap_guard); @@ -88,13 +93,15 @@ impl WriterTable { // check for race condition // if map contains entry then just if let Some(writer) = map.get(stream_name) { - writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record)?; + writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 9ff62c5c3..47590a119 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -17,13 +17,13 @@ * */ -use std::collections::HashMap; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; - use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; +use chrono::NaiveDateTime; use derive_more::{Deref, DerefMut}; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; use crate::storage::staging::StorageDir; @@ -44,27 +44,17 @@ impl FileWriter { stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { - match self.get_mut(schema_key) { - Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; - } - // entry is not present thus we create it - None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; - self.insert( - schema_key.to_owned(), - ArrowWriter { - file_path: path, - writer, - }, - ); - } - }; + let (path, writer) = + init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + self.insert( + schema_key.to_owned(), + ArrowWriter { + file_path: path, + writer, + }, + ); Ok(()) } @@ -80,10 +70,10 @@ fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); - + let path = dir.path_by_current_time(schema_key, parsed_timestamp); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; @@ -94,6 +84,5 @@ fn init_new_stream_writer_file( stream_writer .write(record) .map_err(StreamWriterError::Writer)?; - Ok((path, stream_writer)) } diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 81beea0bd..456f5fc68 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,7 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; - +const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2a3281843..1f09e9a07 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,14 +16,6 @@ * */ -use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; -use arrow_schema::Field; -use bytes::Bytes; -use http::StatusCode; -use serde_json::Value; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; - use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; @@ -32,11 +24,20 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::utils::json::convert_array_to_object; +use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use arrow_schema::Field; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use http::StatusCode; +use serde_json::Value; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; - // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -94,45 +95,109 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb, is_first_event) = { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - into_event_batch(req, body, schema)? - }; - - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event, + let glob_storage = CONFIG.storage().get_object_store(); + let object_store_format = glob_storage + .get_object_store_format(&stream_name) + .await + .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + let time_partition = object_store_format.time_partition; + let body_val: Value = serde_json::from_slice(&body)?; + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + let stream = stream_name.clone(); + let (rb, is_first_event) = get_stream_schema(stream.clone(), req, body_val)?; + event::Event { + rb, + stream_name: stream, + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + .process() + .await?; + } else { + let data = convert_array_to_object(body_val.clone())?; + for value in data { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + if body_timestamp.is_some() { + if body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .is_ok() + { + parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + let (rb, is_first_event) = + get_stream_schema(stream_name.clone(), req.clone(), value)?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + .process() + .await?; + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "field {} is not in the correct datetime format", + body_timestamp.unwrap().to_owned().as_str().unwrap() + )))); + } + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "ingestion failed as field {} is not part of the log", + time_partition.unwrap() + )))); + } + } } - .process() - .await?; - Ok(()) } +fn get_stream_schema( + stream_name: String, + req: HttpRequest, + body: Value, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name))? + .schema + .clone(); + into_event_batch(req, body, schema) +} + fn into_event_batch( req: HttpRequest, - body: Bytes, + body: Value, schema: HashMap>, -) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { +) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; - let size = body.len(); - let body: Value = serde_json::from_slice(&body)?; + let event = format::json::Event { data: body, tags, metadata, }; let (rb, is_first) = event.into_recordbatch(schema)?; - Ok((size, rb, is_first)) + + Ok((rb, is_first)) } // Check if the stream exists and create a new stream if doesn't exist @@ -140,7 +205,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + super::logstream::create_stream(stream_name.to_string(), "").await?; Ok(()) } @@ -192,7 +257,6 @@ mod tests { types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, }; use arrow_schema::{DataType, Field}; - use bytes::Bytes; use serde_json::json; use crate::{ @@ -239,14 +303,8 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); - assert_eq!(size, 28); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( @@ -285,12 +343,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -322,8 +375,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -355,10 +407,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -376,8 +425,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -389,12 +437,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .is_err()) + assert!(into_event_batch(req, json, HashMap::default(),).is_err()) } #[test] @@ -417,12 +460,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -470,12 +508,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -523,8 +556,7 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -564,12 +596,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 5); @@ -614,10 +641,7 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -645,12 +669,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e31a4d44d..20fd6330e 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,16 +23,15 @@ use actix_web::{web, HttpRequest, Responder}; use chrono::Utc; use serde_json::Value; +use self::error::{CreateStreamError, StreamError}; use crate::alerts::Alerts; +use crate::handlers::TIME_PARTITION_KEY; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::retention::Retention; -use crate::storage::{LogStream, StorageDir}; +use crate::storage::{retention::Retention, LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use self::error::{CreateStreamError, StreamError}; - pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -110,6 +109,15 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { + let time_partition = if let Some((_, time_partition_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_KEY) + { + time_partition_name.to_str().unwrap() + } else { + "" + }; let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -121,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } else { - create_stream(stream_name).await?; + create_stream(stream_name, time_partition).await?; } Ok(("log stream created", StatusCode::OK)) @@ -328,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> { +pub async fn create_stream( + stream_name: String, + time_partition: &str, +) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name).await { + if let Err(err) = storage.create_stream(&stream_name, time_partition).await { return Err(CreateStreamError::Storage { stream_name, err }); } @@ -343,9 +354,14 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> .get_object_store() .get_stream_metadata(&stream_name) .await; - let created_at = stream_meta.unwrap().created_at; - - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); + let stream_meta = stream_meta.unwrap(); + let created_at = stream_meta.created_at; + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + ); Ok(()) } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..d7896b944 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -56,7 +56,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, + pub time_partition: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -142,13 +143,18 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String, created_at: String) { + pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { created_at: if created_at.is_empty() { Local::now().to_rfc3339() } else { - created_at.clone() + created_at + }, + time_partition: if time_partition.is_empty() { + None + } else { + Some(time_partition) }, ..Default::default() }; @@ -185,6 +191,7 @@ impl StreamInfo { cache_enabled: meta.cache_enabled, created_at: meta.created_at, first_event_at: meta.first_event_at, + time_partition: meta.time_partition, }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..120143f6a 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -43,14 +43,15 @@ use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; use self::error::ExecuteError; - use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; +use crate::metadata::STREAM_INFO; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -102,9 +103,12 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute( + &self, + stream_name: String, + ) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan()) + .execute_logical_plan(self.final_logical_plan(stream_name)) .await?; let fields = df @@ -120,7 +124,7 @@ impl Query { } /// return logical plan with all time filters applied through - fn final_logical_plan(&self) -> LogicalPlan { + fn final_logical_plan(&self, stream_name: String) -> LogicalPlan { let filters = self.filter_tag.clone().and_then(tag_filter); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat @@ -133,6 +137,7 @@ impl Query { self.start.naive_utc(), self.end.naive_utc(), filters, + stream_name, ); LogicalPlan::Explain(Explain { verbose: plan.verbose, @@ -144,7 +149,13 @@ impl Query { logical_optimization_succeeded: plan.logical_optimization_succeeded, }) } - x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters), + x => transform( + x, + self.start.naive_utc(), + self.end.naive_utc(), + filters, + stream_name, + ), } } @@ -195,33 +206,63 @@ fn transform( start_time: NaiveDateTime, end_time: NaiveDateTime, filters: Option, + stream_name: String, ) -> LogicalPlan { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { + let hash_map = STREAM_INFO.read().unwrap(); + let time_partition = hash_map + .get(&stream_name) + .ok_or(DataFusionError::Execution(format!( + "stream not found {}", + stream_name.clone() + )))? + .time_partition + .clone(); + let mut new_filters = vec![]; if !table_contains_any_time_filters(&table) { - let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( - start_time, - )) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - new_filters.push(start_time_filter); - new_filters.push(end_time_filter); + let mut _start_time_filter: Expr; + let mut _end_time_filter: Expr; + match time_partition { + Some(time_partition) => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition.clone(), + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition, + ))); + } + None => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + } + } + + new_filters.push(_start_time_filter); + new_filters.push(_end_time_filter); } if let Some(tag_filters) = filters.clone() { new_filters.push(tag_filters) } - let new_filter = new_filters.into_iter().reduce(and); - if let Some(new_filter) = new_filter { let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 874afca78..3a99b3623 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -40,7 +40,7 @@ use datafusion::{ optimizer::utils::conjunction, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, - prelude::{Column, Expr}, + prelude::Expr, scalar::ScalarValue, }; use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; @@ -287,13 +287,24 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; + let object_store = state + .runtime_env() + .object_store_registry + .get_store(&self.url) + .unwrap(); + let glob_storage = CONFIG.storage().get_object_store(); - let time_filters = extract_primary_filter(filters); + let object_store_format = glob_storage + .get_object_store_format(&self.stream) + .await + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let time_partition = object_store_format.time_partition; + let time_filters = extract_primary_filter(filters, time_partition.clone()); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters) { + if include_now(filters, time_partition) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -306,18 +317,8 @@ impl TableProvider for StandardTableProvider { } }; - let object_store = state - .runtime_env() - .object_store_registry - .get_store(&self.url) - .unwrap(); - let glob_storage = CONFIG.storage().get_object_store(); - // Fetch snapshot - let snapshot = glob_storage - .get_snapshot(&self.stream) - .await - .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let snapshot = object_store_format.snapshot; // Is query timerange is overlapping with older data. if is_overlapping_query(&snapshot.manifest_list, &time_filters) { @@ -492,11 +493,11 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr) -> Option { + fn try_from_expr(expr: &Expr, time_partition: Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; - let (op, time) = extract_timestamp_bound(binexpr)?; + let (op, time) = extract_timestamp_bound(binexpr.clone(), time_partition)?; let value = match op { Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), @@ -509,7 +510,7 @@ impl PartialTimeFilter { Some(value) } - pub fn binary_expr(&self, left: Expr) -> Expr { + pub fn binary_expr_default_timestamp_key(&self, left: Expr) -> Expr { let (op, right) = match self { PartialTimeFilter::Low(Bound::Excluded(time)) => { (Operator::Gt, time.timestamp_millis()) @@ -537,6 +538,26 @@ impl PartialTimeFilter { )) } + pub fn binary_expr_timestamp_partition_key(&self, left: Expr) -> Expr { + let (op, right) = match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => (Operator::Gt, time), + PartialTimeFilter::Low(Bound::Included(time)) => (Operator::GtEq, time), + PartialTimeFilter::High(Bound::Excluded(time)) => (Operator::Lt, time), + PartialTimeFilter::High(Bound::Included(time)) => (Operator::LtEq, time), + PartialTimeFilter::Eq(time) => (Operator::Eq, time), + _ => unimplemented!(), + }; + + Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + op, + Box::new(Expr::Literal(ScalarValue::Utf8(Some(format!( + "{:?}", + right + ))))), + )) + } + fn is_greater_than(&self, other: &NaiveDateTime) -> bool { match self { PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other, @@ -566,14 +587,14 @@ fn is_overlapping_query( .all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc())) } -fn include_now(filters: &[Expr]) -> bool { +fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) .expect("zeroed value is valid") .naive_utc(); - let time_filters = extract_primary_filter(filters); + let time_filters = extract_primary_filter(filters, time_partition); let upper_bound_matches = time_filters.iter().any(|filter| match filter { PartialTimeFilter::High(Bound::Excluded(time)) @@ -598,7 +619,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr) else { + let Some((op, time)) = extract_timestamp_bound(binexpr.clone(), None) else { return false; }; @@ -612,11 +633,22 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_from_lit(expr: &Expr) -> Option { - if let Expr::Literal(value) = expr { +fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option { + let mut column_name: String = String::default(); + if let Expr::Column(column) = *expr.left { + column_name = column.name; + } + if let Expr::Literal(value) = *expr.right { match value { ScalarValue::TimestampMillisecond(Some(value), _) => { - Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) + Some(NaiveDateTime::from_timestamp_millis(value).unwrap()) + } + ScalarValue::Utf8(Some(str_value)) => { + if time_partition.is_some() && column_name == time_partition.unwrap() { + Some(str_value.parse::().unwrap()) + } else { + None + } } _ => None, } @@ -625,14 +657,11 @@ fn extract_from_lit(expr: &Expr) -> Option { } } -fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { - if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) - { - let time = extract_from_lit(&binexpr.right)?; - Some((binexpr.op, time)) - } else { - None - } +fn extract_timestamp_bound( + binexpr: BinaryExpr, + time_partition: Option, +) -> Option<(Operator, NaiveDateTime)> { + Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) } async fn collect_manifest_files( @@ -658,11 +687,14 @@ async fn collect_manifest_files( } // extract start time and end time from filter preficate -fn extract_primary_filter(filters: &[Expr]) -> Vec { +fn extract_primary_filter( + filters: &[Expr], + time_partition: Option, +) -> Vec { let mut time_filters = Vec::new(); filters.iter().for_each(|expr| { let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr); + let time = PartialTimeFilter::try_from_expr(expr, time_partition.clone()); if let Some(time) = time { time_filters.push(time); Ok(VisitRecursion::Stop) diff --git a/server/src/storage.rs b/server/src/storage.rs index b6d17484f..5602b1984 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -82,6 +82,8 @@ pub struct ObjectStoreFormat { pub cache_enabled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub retention: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -126,6 +128,7 @@ impl Default for ObjectStoreFormat { snapshot: Snapshot::default(), cache_enabled: false, retention: None, + time_partition: None, } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..9f016f5a9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,14 +106,22 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + async fn create_stream( + &self, + stream_name: &str, + time_partition: &str, + ) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; + if time_partition.is_empty() { + format.time_partition = None; + } else { + format.time_partition = Some(time_partition.to_string()); + } let format_json = to_bytes(&format); - self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; @@ -292,12 +300,13 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&manifest)).await } - async fn get_snapshot(&self, stream: &str) -> Result { + async fn get_object_store_format( + &self, + stream: &str, + ) -> Result { let path = stream_json_path(stream); let bytes = self.get_object(&path).await?; - Ok(serde_json::from_slice::(&bytes) - .expect("snapshot is valid json") - .snapshot) + Ok(serde_json::from_slice::(&bytes).expect("snapshot is valid json")) } async fn put_snapshot( diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 31c5dffed..c07d0b24b 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -26,7 +26,7 @@ use std::{ }; use arrow_schema::{ArrowError, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -43,7 +43,7 @@ use crate::{ storage::OBJECT_STORE_DATA_GRANULARITY, utils::{self, arrow::merged_reader::MergedReverseRecordReader}, }; - +use rand::Rng; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -76,14 +76,19 @@ impl StorageDir { ) } - fn filename_by_current_time(stream_hash: &str) -> String { - let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) + fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp) } - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash)) + pub fn path_by_current_time( + &self, + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + ) -> PathBuf { + self.data_path.join(Self::filename_by_current_time( + stream_hash, + parsed_timestamp, + )) } pub fn arrow_files(&self) -> Vec { @@ -157,10 +162,13 @@ impl StorageDir { } fn arrow_path_to_parquet(path: &Path) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); + let file_stem = path.file_stem().unwrap().to_str().unwrap(); + let mut rng = rand::thread_rng(); + let random_number: u64 = rng.gen(); + let (_, filename) = file_stem.split_once('.').unwrap(); + let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename); + parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); parquet_path } diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 0f18d4bf7..b448d6860 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -25,6 +25,16 @@ pub fn flatten_json_body(body: serde_json::Value) -> Result Result, anyhow::Error> { + let data = flatten_json_body(body)?; + let value_arr = match data { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + Ok(value_arr) +} + pub fn convert_to_string(value: &Value) -> Value { match value { Value::Null => Value::String("null".to_owned()),