diff --git a/server/src/catalog.rs b/server/src/catalog.rs index f8adad1ca..3ffdd21a1 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -18,7 +18,7 @@ use std::sync::Arc; -use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; use crate::{ @@ -69,33 +69,33 @@ impl ManifestFile for manifest::File { } } +fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { + match file + .columns() + .iter() + .find(|col| col.name == "p_timestamp") + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.max) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } +} + pub async fn update_snapshot( storage: Arc, stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { - fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == "p_timestamp") - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), - } - } - // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; @@ -154,6 +154,58 @@ pub async fn update_snapshot( Ok(()) } +pub async fn remove_manifest_from_snapshot( + storage: Arc, + stream_name: &str, + dates: Vec, +) -> Result<(), ObjectStorageError> { + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + // Filter out items whose manifest_path contains any of the dates_to_delete + manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + + storage.put_snapshot(stream_name, meta).await?; + Ok(()) +} + +pub async fn get_first_event( + storage: Arc, + stream_name: &str, +) -> Result, ObjectStorageError> { + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + if manifests.is_empty() { + log::info!("No manifest found for stream {stream_name}"); + return Err(ObjectStorageError::Custom("No manifest found".to_string())); + } + + let manifest = &manifests[0]; + + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let Some(manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + + if let Some(first_event) = manifest.files.first() { + let (lower_bound, _) = get_file_bounds(first_event); + let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + return Ok(Some(first_event_at)); + } + Ok(None) +} + /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. fn partition_path( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index afd057eda..541b19ae4 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -28,7 +28,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; -use crate::{event, stats}; +use crate::{catalog, event, stats}; use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; @@ -263,13 +263,46 @@ pub async fn get_stats(req: HttpRequest) -> Result return Err(StreamError::StreamNotFound(stream_name)); } + if first_event_at_empty(&stream_name) { + let store = CONFIG.storage().get_object_store(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } + let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let hash_map = STREAM_INFO.read().unwrap(); + let stream_meta = &hash_map + .get(&stream_name) + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let time = Utc::now(); let stats = serde_json::json!({ "stream": stream_name, + "creation_time": &stream_meta.created_at, + "first_event_at": Some(&stream_meta.first_event_at), "time": time, "ingestion": { "count": stats.events, @@ -285,6 +318,17 @@ pub async fn get_stats(req: HttpRequest) -> Result Ok((web::Json(stats), StatusCode::OK)) } +// Check if the first_event_at is empty +pub fn first_event_at_empty(stream_name: &str) -> bool { + let hash_map = STREAM_INFO.read().unwrap(); + if let Some(stream_info) = hash_map.get(stream_name) { + if let Some(first_event_at) = &stream_info.first_event_at { + return first_event_at.is_empty(); + } + } + true +} + fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts @@ -305,7 +349,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> if let Err(err) = storage.create_stream(&stream_name).await { return Err(CreateStreamError::Storage { stream_name, err }); } - metadata::STREAM_INFO.add_stream(stream_name.to_string()); + + let stream_meta = CONFIG + .storage() + .get_object_store() + .get_stream_metadata(&stream_name) + .await; + let created_at = stream_meta.unwrap().created_at; + + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); Ok(()) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b57cc710a..e8a250719 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,6 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; +use chrono::Local; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -43,6 +44,8 @@ pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, pub cache_enabled: bool, + pub created_at: String, + pub first_event_at: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -126,9 +129,27 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String) { + pub fn set_first_event_at( + &self, + stream_name: &str, + first_event_at: Option, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at = first_event_at; + }) + } + + pub fn add_stream(&self, stream_name: String, created_at: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { + created_at: if created_at.is_empty() { + Local::now().to_rfc3339() + } else { + created_at.clone() + }, ..Default::default() }; map.insert(stream_name, metadata); @@ -162,6 +183,8 @@ impl StreamInfo { schema, alerts, cache_enabled: meta.cache_enabled, + created_at: meta.created_at, + first_event_at: meta.first_event_at, }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/storage.rs b/server/src/storage.rs index 975fcf445..63e9577c1 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -69,6 +69,8 @@ pub struct ObjectStoreFormat { pub objectstore_format: String, #[serde(rename = "created-at")] pub created_at: String, + #[serde(rename = "first-event-at")] + pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, pub stats: Stats, @@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), created_at: Local::now().to_rfc3339(), + first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..0d867da94 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&stream_metadata)).await } + async fn put_first_event_at( + &self, + stream_name: &str, + first_event_at: &str, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + let stream_metadata = self.get_object(&path).await?; + let first_event_ts = + serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable"); + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + + stream_metadata["first-event-at"] = first_event_ts; + + self.put_object(&path, to_bytes(&stream_metadata)).await + } + async fn put_metadata( &self, parseable_metadata: &StorageMetadata, diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 82062575a..aedae6a4c 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -187,7 +187,11 @@ mod action { use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::option::CONFIG; + use crate::{ + catalog::{self, remove_manifest_from_snapshot}, + metadata, + option::CONFIG, + }; pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); @@ -206,6 +210,7 @@ mod action { .into_iter() .filter(|date| string_to_date(date) < retain_until) .collect_vec(); + let dates = dates_to_delete.clone(); let delete_tasks = FuturesUnordered::new(); for date in dates_to_delete { @@ -226,6 +231,35 @@ mod action { log::error!("Failed to run delete task {err:?}") } } + + let store = CONFIG.storage().get_object_store(); + let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await; + if let Err(err) = res { + log::error!("Failed to update manifest list in the snapshot {err:?}") + } + + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } } fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {