Skip to content

Commit

Permalink
fix: stats response
Browse files Browse the repository at this point in the history
FIX: When Migrated from Standalone Mode stats updated
CHORE: Remove Dead Code
  • Loading branch information
Eshanatnight committed Mar 25, 2024
1 parent 6167e49 commit c37b093
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 33 deletions.
24 changes: 15 additions & 9 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{metadata, validator};

use self::error::{CreateStreamError, StreamError};

use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats};
use super::modal::query_server::{self, IngestionStats, QueriedStats, QueryServer, StorageStats};

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
Expand Down Expand Up @@ -284,19 +284,19 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

if CONFIG.parseable.mode == Mode::Query {
let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?;
let stats = serde_json::to_value(stats).unwrap();
return Ok((web::Json(stats), StatusCode::OK));
}
let ingestor_stats = if CONFIG.parseable.mode == Mode::Query {
Some(query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?)
} else {
None
};

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let time = Utc::now();
let qstats = match &stream_meta.first_event_at {
let stats = match &stream_meta.first_event_at {
Some(first_event_at) => {
let ingestion_stats = IngestionStats::new(
stats.events,
Expand Down Expand Up @@ -336,10 +336,16 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
)
}
};
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
ingestor_stats.push(stats);
QueryServer::merge_quried_stats(ingestor_stats)
} else {
stats
};

let out_stats = serde_json::to_value(qstats).unwrap();
let stats = serde_json::to_value(stats).unwrap();

Ok((web::Json(out_stats), StatusCode::OK))
Ok((web::Json(stats), StatusCode::OK))
}

// Check if the first_event_at is empty
Expand Down
17 changes: 1 addition & 16 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ impl QueryServer {

migration::run_migration(&CONFIG).await?;

// when do we do this
let storage = CONFIG.storage().get_object_store();
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
log::warn!("could not populate local metadata. {:?}", e);
Expand All @@ -253,25 +252,12 @@ impl QueryServer {
analytics::init_analytics_scheduler();
}

// spawn the sync thread
// tokio::spawn(Self::sync_ingester_metadata());

self.start(prometheus, CONFIG.parseable.openid.clone())
.await?;

Ok(())
}

#[allow(dead_code)]
async fn sync_ingester_metadata() {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 / 10));
loop {
interval.tick().await;
// dbg!("Tick");
Self::get_ingester_info().await.unwrap();
}
}

// forward the request to all ingesters to keep them in sync
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
Expand Down Expand Up @@ -326,7 +312,7 @@ impl QueryServer {
/// get the cumulative stats from all ingesters
pub async fn fetch_stats_from_ingesters(
stream_name: &str,
) -> Result<QueriedStats, StreamError> {
) -> Result<Vec<QueriedStats>, StreamError> {
let mut stats = Vec::new();

let ingester_infos = Self::get_ingester_info().await.map_err(|err| {
Expand Down Expand Up @@ -373,7 +359,6 @@ impl QueryServer {
}
}
}
let stats = Self::merge_quried_stats(stats);

Ok(stats)
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl StreamInfo {

for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema_for_the_first_time(&stream.name).await?;
let schema = storage.get_schema_on_server_start(&stream.name).await?;
let meta = storage.get_stream_metadata(&stream.name).await?;

let schema = update_schema_from_staging(&stream.name, schema);
Expand Down
18 changes: 11 additions & 7 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub trait ObjectStorage: Sync + 'static {
.await
}

async fn get_schema_for_the_first_time(
async fn get_schema_on_server_start(
&self,
stream_name: &str,
) -> Result<Schema, ObjectStorageError> {
Expand Down Expand Up @@ -218,12 +218,16 @@ pub trait ObjectStorage: Sync + 'static {
STREAM_METADATA_FILE_NAME,
]))
.await?;
self.put_stream_manifest(
stream_name,
&serde_json::from_slice::<ObjectStoreFormat>(&bytes)
.expect("parseable config is valid json"),
)
.await?;

let mut config = serde_json::from_slice::<ObjectStoreFormat>(&bytes)
.expect("parseable config is valid json");

if CONFIG.parseable.mode == Mode::Ingest {
config.stats = Stats::default();
config.snapshot.manifest_list = vec![];
}

self.put_stream_manifest(stream_name, &config).await?;
bytes
}
};
Expand Down

0 comments on commit c37b093

Please sign in to comment.