Skip to content

Commit

Permalink
fix: sync streams on ingest server start
Browse files Browse the repository at this point in the history
sync the streams that are present, when a new Ingest Server is started
  • Loading branch information
Eshanatnight committed Mar 19, 2024
1 parent 8717a26 commit 56015fd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl StreamInfo {

for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema(&stream.name).await?;
let schema = storage.get_schema_for_the_first_time(&stream.name).await?;
let meta = storage.get_stream_metadata(&stream.name).await?;

let schema = update_schema_from_staging(&stream.name, schema);
Expand Down
25 changes: 25 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ pub trait ObjectStorage: Sync + 'static {
.await
}

async fn get_schema_for_the_first_time(
&self,
stream_name: &str,
) -> Result<Schema, ObjectStorageError> {
let schema_path = RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]);
let schema_map = self.get_object(&schema_path).await?;
Ok(serde_json::from_slice(&schema_map)?)
}

async fn get_schema(&self, stream_name: &str) -> Result<Schema, ObjectStorageError> {
let schema_map = self.get_object(&schema_path(stream_name)).await?;
Ok(serde_json::from_slice(&schema_map)?)
Expand Down Expand Up @@ -231,6 +240,22 @@ pub trait ObjectStorage: Sync + 'static {
self.put_object(&path, to_bytes(manifest)).await
}

/// for future use
async fn get_stats_for_first_time(
&self,
stream_name: &str,
) -> Result<Stats, ObjectStorageError> {
let path = RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]);
let stream_metadata = self.get_object(&path).await?;
let stream_metadata: Value =
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
let stats = &stream_metadata["stats"];

let stats = serde_json::from_value(stats.clone()).unwrap_or_default();

Ok(stats)
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
let stream_metadata: Value =
Expand Down

0 comments on commit 56015fd

Please sign in to comment.