Skip to content

Commit

Permalink
fix: add stream creation time in get stats api
Browse files Browse the repository at this point in the history
  • Loading branch information
gurjotkaur20 committed Jan 22, 2024
1 parent 3d8ddeb commit adbae20
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
17 changes: 16 additions & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,17 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let hash_map = STREAM_INFO.read().unwrap();
let stream_creation_time = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?
.created_at;

let time = Utc::now();

let stats = serde_json::json!({
"stream": stream_name,
"creation_time": stream_creation_time,
"time": time,
"ingestion": {
"count": stats.events,
Expand Down Expand Up @@ -307,7 +314,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
if let Err(err) = storage.create_stream(&stream_name).await {
return Err(CreateStreamError::Storage { stream_name, err });
}
metadata::STREAM_INFO.add_stream(stream_name.to_string());

let stream_meta = CONFIG
.storage()
.get_object_store()
.get_stream_metadata(&stream_name)
.await;
let created_at = stream_meta.unwrap().created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);

Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::Local;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
Expand All @@ -43,6 +44,7 @@ pub struct LogStreamMetadata {
pub schema: HashMap<String, Arc<Field>>,
pub alerts: Alerts,
pub cache_enabled: bool,
pub created_at: String,
}

// It is very unlikely that panic will occur when dealing with metadata.
Expand Down Expand Up @@ -126,9 +128,14 @@ impl StreamInfo {
})
}

pub fn add_stream(&self, stream_name: String) {
pub fn add_stream(&self, stream_name: String, created_at: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
created_at: if created_at.is_empty() {
Local::now().to_rfc3339()
} else {
created_at.clone()
},
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down Expand Up @@ -162,6 +169,7 @@ impl StreamInfo {
schema,
alerts,
cache_enabled: meta.cache_enabled,
created_at: meta.created_at,
};

let mut map = self.write().expect(LOCK_EXPECT);
Expand Down

0 comments on commit adbae20

Please sign in to comment.