diff --git a/.gitignore b/.gitignore index c3994ec35..bf69b23aa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ target -data -staging +data* +staging* limitcache examples cert.pem diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 5599dce53..a8c29c3b6 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -16,6 +16,11 @@ * */ +use crate::Mode; +use crate::{ + catalog::snapshot::{self, Snapshot}, + storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, +}; use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; @@ -44,13 +49,13 @@ use datafusion::{ use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; +use relative_path::RelativePathBuf; use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use url::Url; use crate::{ catalog::{ self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile, - Snapshot, }, event::{self, DEFAULT_TIMESTAMP_KEY}, localcache::LocalCacheManager, @@ -61,6 +66,7 @@ use crate::{ }; use super::listing_table_builder::ListingTableBuilder; +use crate::catalog::Snapshot as CatalogSnapshot; // schema provider for stream based on global data pub struct GlobalSchemaProvider { @@ -316,12 +322,34 @@ impl TableProvider for StandardTableProvider { ); } }; - - // Fetch snapshot - let snapshot = object_store_format.snapshot; + let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); + if CONFIG.parseable.mode == Mode::Query { + let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); + let obs = glob_storage + .get_objects( + Some(&path), + Box::new(|file_name| file_name.starts_with(".ingester")), + ) + .await; + + if let Ok(obs) = obs { + for ob in obs { + if let Ok(object_store_format) = + serde_json::from_slice::(&ob) + { + let snapshot = object_store_format.snapshot; + for manifest in snapshot.manifest_list { + merged_snapshot.manifest_list.push(manifest); + } + } + } + } + } else { + merged_snapshot = object_store_format.snapshot; + } // Is query timerange is overlapping with older data. - if is_overlapping_query(&snapshot.manifest_list, &time_filters) { + if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { return legacy_listing_table( self.stream.clone(), memory_exec, @@ -338,8 +366,14 @@ impl TableProvider for StandardTableProvider { .await; } - let mut manifest_files = - collect_from_snapshot(&snapshot, &time_filters, object_store, filters, limit).await?; + let mut manifest_files = collect_from_snapshot( + &merged_snapshot, + &time_filters, + object_store, + filters, + limit, + ) + .await?; if manifest_files.is_empty() { return final_plan(vec![memory_exec], projection, self.schema.clone());