Skip to content

Commit

Permalink
fix for querier to use manifest list from all ingesters
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable committed Apr 8, 2024
1 parent eddd332 commit 4b32c2d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
target
data
staging
data*
staging*
limitcache
examples
cert.pem
Expand Down
48 changes: 41 additions & 7 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*
*/

use crate::Mode;
use crate::{
catalog::snapshot::{self, Snapshot},
storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
};
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
Expand Down Expand Up @@ -44,13 +49,13 @@ use datafusion::{
use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use object_store::{path::Path, ObjectStore};
use relative_path::RelativePathBuf;
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
use url::Url;

use crate::{
catalog::{
self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile,
Snapshot,
},
event::{self, DEFAULT_TIMESTAMP_KEY},
localcache::LocalCacheManager,
Expand All @@ -61,6 +66,7 @@ use crate::{
};

use super::listing_table_builder::ListingTableBuilder;
use crate::catalog::Snapshot as CatalogSnapshot;

// schema provider for stream based on global data
pub struct GlobalSchemaProvider {
Expand Down Expand Up @@ -316,12 +322,34 @@ impl TableProvider for StandardTableProvider {
);
}
};

// Fetch snapshot
let snapshot = object_store_format.snapshot;
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
if CONFIG.parseable.mode == Mode::Query {
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
let obs = glob_storage
.get_objects(
Some(&path),
Box::new(|file_name| file_name.starts_with(".ingester")),
)
.await;

if let Ok(obs) = obs {
for ob in obs {
if let Ok(object_store_format) =
serde_json::from_slice::<ObjectStoreFormat>(&ob)
{
let snapshot = object_store_format.snapshot;
for manifest in snapshot.manifest_list {
merged_snapshot.manifest_list.push(manifest);
}
}
}
}
} else {
merged_snapshot = object_store_format.snapshot;
}

// Is query timerange is overlapping with older data.
if is_overlapping_query(&snapshot.manifest_list, &time_filters) {
if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
return legacy_listing_table(
self.stream.clone(),
memory_exec,
Expand All @@ -338,8 +366,14 @@ impl TableProvider for StandardTableProvider {
.await;
}

let mut manifest_files =
collect_from_snapshot(&snapshot, &time_filters, object_store, filters, limit).await?;
let mut manifest_files = collect_from_snapshot(
&merged_snapshot,
&time_filters,
object_store,
filters,
limit,
)
.await?;

if manifest_files.is_empty() {
return final_plan(vec![memory_exec], projection, self.schema.clone());
Expand Down

0 comments on commit 4b32c2d

Please sign in to comment.