Skip to content

Commit

Permalink
feat: implement distributed system with ingest and query modes (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight authored Apr 2, 2024
1 parent c11641f commit 5c2be32
Show file tree
Hide file tree
Showing 39 changed files with 4,012 additions and 1,098 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prometheus-parse = "0.2.5"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
8 changes: 4 additions & 4 deletions server/src/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ pub fn print_about(
eprint!(
"
{}
Version: \"v{}\"",
Version:\t\t\t\t\t\"v{}\"",
"About:".to_string().bold(),
current_version,
);
); // " " " "

if let Some(latest_release) = latest_release {
if latest_release.version > current_version {
Expand All @@ -103,8 +103,8 @@ pub fn print_about(

eprintln!(
"
Commit: \"{commit_hash}\"
Docs: \"https://logg.ing/docs\""
Commit:\t\t\t\t\t\t\"{commit_hash}\"
Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
);
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Report {
cpu_count,
memory_total_bytes: mem_total,
platform: platform().to_string(),
mode: CONFIG.mode_string().to_string(),
mode: CONFIG.get_storage_mode_string().to_string(),
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
metrics: build_metrics(),
Expand Down
32 changes: 17 additions & 15 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ pub async fn print(config: &Config, meta: &StorageMetadata) {

fn print_ascii_art() {
let ascii_name = r#"
`7MM"""Mq. *MM `7MM
MM `MM. MM MM
MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
.JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
`7MM"""Mq. *MM `7MM
MM `MM. MM MM
MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
.JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
"#;

eprint!("{ascii_name}");
Expand Down Expand Up @@ -77,12 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
Address: {}
Credentials: {}
LLM Status: \"{}\"",
Address:\t\t\t\t\t{}
Credentials:\t\t\t\t\t{}
Server Mode:\t\t\t\t\t\"{}\"
LLM Status:\t\t\t\t\t\"{}\"",
"Server:".to_string().bold(),
address,
credentials,
config.parseable.mode.to_str(),
llm_status
);
}
Expand All @@ -99,10 +101,10 @@ async fn storage_info(config: &Config) {
eprintln!(
"
{}
Mode: \"{}\"
Staging: \"{}\"",
Storage Mode:\t\t\t\t\t\"{}\"
Staging Path:\t\t\t\t\t\"{}\"",
"Storage:".to_string().bold(),
config.mode_string(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
);

Expand All @@ -114,7 +116,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Cache: \"{}\", (size: {})",
{:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
"",
path.display(),
size
Expand All @@ -123,7 +125,7 @@ async fn storage_info(config: &Config) {

eprintln!(
"\
{:8}Store: \"{}\", (latency: {:?})",
{:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
Expand Down
72 changes: 61 additions & 11 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use relative_path::RelativePathBuf;
use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
utils::get_address,
};

use self::{column::Column, snapshot::ManifestItem};
Expand Down Expand Up @@ -105,20 +106,67 @@ pub async fn update_snapshot(
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});

// if the mode in I.S. manifest needs to be created but it is not getting created because
// there is already a pos, to index into stream.json

// We update the manifest referenced by this position
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;

let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
if m.manifest_path.contains(&p) {
ch = true;
}
}
if ch {
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand All @@ -137,7 +185,9 @@ pub async fn update_snapshot(
..Manifest::default()
};

let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
Expand Down
Loading

0 comments on commit 5c2be32

Please sign in to comment.