Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add stream creation time in get stats api #632

Merged
98 changes: 75 additions & 23 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::sync::Arc;

use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;

use crate::{
Expand Down Expand Up @@ -69,33 +69,33 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.min)
gurjotkaur20 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

pub async fn update_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
change: manifest::File,
) -> Result<(), ObjectStorageError> {
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
Expand Down Expand Up @@ -154,6 +154,58 @@ pub async fn update_snapshot(
Ok(())
}

pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
dates: Vec<String>,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));

storage.put_snapshot(stream_name, meta).await?;
Ok(())
}

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}

let manifest = &manifests[0];

let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
Ok(None)
}

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
Expand Down
39 changes: 39 additions & 0 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use crate::handlers::{
STREAM_NAME_HEADER_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use crate::{catalog, metadata};

use super::kinesis;
use super::logstream::error::CreateStreamError;
Expand All @@ -49,6 +51,32 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;

if first_event_at_empty(&stream_name) {
gurjotkaur20 marked this conversation as resolved.
Show resolved Hide resolved
let store = CONFIG.storage().get_object_store();
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
if let Err(err) = CONFIG
.storage()
.get_object_store()
.put_first_event_at(&stream_name, &first_event_at)
.await
{
log::error!(
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
stream_name
);
}

if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
}

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Expand Down Expand Up @@ -144,6 +172,17 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
Ok(())
}

// Check if the first_event_at is empty
pub fn first_event_at_empty(stream_name: &str) -> bool {
let hash_map = STREAM_INFO.read().unwrap();
if let Some(stream_info) = hash_map.get(stream_name) {
if let Some(first_event_at) = &stream_info.first_event_at {
return first_event_at.is_empty();
}
}
true
}

#[derive(Debug, thiserror::Error)]
pub enum PostError {
#[error("Stream {0} not found")]
Expand Down
17 changes: 16 additions & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,17 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let time = Utc::now();

let stats = serde_json::json!({
"stream": stream_name,
"creation_time": &stream_meta.created_at,
"first_event_at": Some(&stream_meta.first_event_at),
"time": time,
"ingestion": {
"count": stats.events,
Expand Down Expand Up @@ -305,7 +312,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
if let Err(err) = storage.create_stream(&stream_name).await {
return Err(CreateStreamError::Storage { stream_name, err });
}
metadata::STREAM_INFO.add_stream(stream_name.to_string());

let stream_meta = CONFIG
.storage()
.get_object_store()
.get_stream_metadata(&stream_name)
.await;
let created_at = stream_meta.unwrap().created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);

Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::Local;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
Expand All @@ -43,6 +44,8 @@ pub struct LogStreamMetadata {
pub schema: HashMap<String, Arc<Field>>,
pub alerts: Alerts,
pub cache_enabled: bool,
pub created_at: String,
pub first_event_at: Option<String>,
}

// It is very unlikely that panic will occur when dealing with metadata.
Expand Down Expand Up @@ -126,9 +129,27 @@ impl StreamInfo {
})
}

pub fn add_stream(&self, stream_name: String) {
pub fn set_first_event_at(
&self,
stream_name: &str,
first_event_at: Option<String>,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at = first_event_at;
})
}

pub fn add_stream(&self, stream_name: String, created_at: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
created_at: if created_at.is_empty() {
Local::now().to_rfc3339()
} else {
created_at.clone()
},
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down Expand Up @@ -162,6 +183,8 @@ impl StreamInfo {
schema,
alerts,
cache_enabled: meta.cache_enabled,
created_at: meta.created_at,
first_event_at: meta.first_event_at,
};

let mut map = self.write().expect(LOCK_EXPECT);
Expand Down
3 changes: 3 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct ObjectStoreFormat {
pub objectstore_format: String,
#[serde(rename = "created-at")]
pub created_at: String,
#[serde(rename = "first-event-at")]
pub first_event_at: Option<String>,
pub owner: Owner,
pub permissions: Vec<Permisssion>,
pub stats: Stats,
Expand Down Expand Up @@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat {
version: CURRENT_SCHEMA_VERSION.to_string(),
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
permissions: vec![Permisssion::new("parseable".to_string())],
stats: Stats::default(),
Expand Down
17 changes: 17 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static {
self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_first_event_at(
&self,
stream_name: &str,
first_event_at: &str,
) -> Result<(), ObjectStorageError> {
let path = stream_json_path(stream_name);
let stream_metadata = self.get_object(&path).await?;
let first_event_ts =
serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable");
let mut stream_metadata: serde_json::Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");

stream_metadata["first-event-at"] = first_event_ts;

self.put_object(&path, to_bytes(&stream_metadata)).await
}

async fn put_metadata(
&self,
parseable_metadata: &StorageMetadata,
Expand Down
36 changes: 35 additions & 1 deletion server/src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ mod action {
use itertools::Itertools;
use relative_path::RelativePathBuf;

use crate::option::CONFIG;
use crate::{
catalog::{self, remove_manifest_from_snapshot},
metadata,
option::CONFIG,
};

pub(super) async fn delete(stream_name: String, days: u32) {
log::info!("running retention task - delete for stream={stream_name}");
Expand All @@ -206,6 +210,7 @@ mod action {
.into_iter()
.filter(|date| string_to_date(date) < retain_until)
.collect_vec();
let dates = dates_to_delete.clone();

let delete_tasks = FuturesUnordered::new();
for date in dates_to_delete {
Expand All @@ -226,6 +231,35 @@ mod action {
log::error!("Failed to run delete task {err:?}")
}
}

let store = CONFIG.storage().get_object_store();
let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await;
if let Err(err) = res {
log::error!("Failed to update manifest list in the snapshot {err:?}")
}

if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here to update the first event timestamp after cleanup because after retention cleanup, first event timestamp would be different.

if let Err(err) = CONFIG
.storage()
.get_object_store()
.put_first_event_at(&stream_name, &first_event_at)
.await
{
log::error!(
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
stream_name
);
}

if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
}

fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {
Expand Down
Loading