Skip to content

Commit

Permalink
fix: Query Optimization (#696)
Browse files Browse the repository at this point in the history
modified function that checks if query has starttime before the 1st manifest lower bound time
to ensure server uses manifest for count(*) where starttime is greater than manifest creation date

fix for ingestion with time partition
fix for query with time partition
fix for query optimization
fixed get_first_event call for time partition
random number generation logic changed while creating parquet file name
  • Loading branch information
nikhilsinhaparseable authored Mar 12, 2024
1 parent 94c95fc commit a04dd4f
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 60 deletions.
14 changes: 10 additions & 4 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pub async fn get_first_event(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
Expand All @@ -232,9 +232,15 @@ pub async fn get_first_event(
};

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
if let Some(time_partition) = time_partition {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
} else {
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
}
Ok(None)
}
Expand Down
7 changes: 7 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ impl StreamInfo {
.map(|metadata| metadata.cache_enabled)
}

pub fn get_time_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.time_partition.clone())
}

pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
let stream = map
Expand Down
14 changes: 11 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Query {
// this can be eliminated in later version of datafusion but with slight caveat
// transform cannot modify stringified plans by itself
// we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan

match self.raw_logical_plan.clone() {
LogicalPlan::Explain(plan) => {
let transformed = transform(
Expand Down Expand Up @@ -221,7 +222,7 @@ fn transform(
.clone();

let mut new_filters = vec![];
if !table_contains_any_time_filters(&table) {
if !table_contains_any_time_filters(&table, &time_partition) {
let mut _start_time_filter: Expr;
let mut _end_time_filter: Expr;
match time_partition {
Expand Down Expand Up @@ -276,7 +277,10 @@ fn transform(
.expect("transform only transforms the tablescan")
}

fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) -> bool {
fn table_contains_any_time_filters(
table: &datafusion::logical_expr::TableScan,
time_partition: &Option<String>,
) -> bool {
table
.filters
.iter()
Expand All @@ -287,7 +291,11 @@ fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan)
None
}
})
.any(|expr| matches!(&*expr.left, Expr::Column(Column { name, .. }) if (name == event::DEFAULT_TIMESTAMP_KEY)))
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
})
}

#[allow(dead_code)]
Expand Down
12 changes: 9 additions & 3 deletions server/src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
logical_expr::col,
logical_expr::{col, Expr},
};
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -183,13 +183,19 @@ impl ListingTableBuilder {
self,
schema: Arc<Schema>,
map: impl Fn(Vec<String>) -> Vec<ListingTableUrl>,
time_partition: Option<String>,
) -> Result<Option<Arc<ListingTable>>, DataFusionError> {
if self.listing.is_empty() {
return Ok(None);
}

let file_sort_order: Vec<Vec<Expr>>;
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
if let Some(time_partition) = time_partition {
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
} else {
file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
}

let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_file_sort_order(file_sort_order)
Expand Down
67 changes: 37 additions & 30 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*
*/

use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
Expand Down Expand Up @@ -46,6 +44,7 @@ use datafusion::{
use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use object_store::{path::Path, ObjectStore};
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
use url::Url;

use crate::{
Expand Down Expand Up @@ -114,6 +113,7 @@ async fn create_parquet_physical_plan(
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
let table_df_schema = schema.as_ref().clone().to_dfschema()?;
Expand All @@ -125,14 +125,18 @@ async fn create_parquet_physical_plan(
};

let sort_expr = PhysicalSortExpr {
expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?,
expr: if let Some(time_partition) = time_partition {
physical_plan::expressions::col(&time_partition, &schema)?
} else {
physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?
},
options: SortOptions {
descending: true,
nulls_first: true,
},
};

let file_format = ParquetFormat::default().with_enable_pruning(Some(true));

// create the execution plan
let plan = file_format
.create_physical_plan(
Expand All @@ -151,7 +155,6 @@ async fn create_parquet_physical_plan(
filters.as_ref(),
)
.await?;

Ok(plan)
}

Expand Down Expand Up @@ -209,7 +212,6 @@ fn partitioned_files(
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
let mut count = 0;

for (index, file) in manifest_files
.into_iter()
.enumerate()
Expand All @@ -221,7 +223,6 @@ fn partitioned_files(
columns,
..
} = file;

partitioned_files[index].push(PartitionedFile::new(file_path, file.file_size));
columns.into_iter().for_each(|col| {
column_statistics
Expand All @@ -235,7 +236,6 @@ fn partitioned_files(
});
count += num_rows;
}

let statistics = table_schema
.fields()
.iter()
Expand Down Expand Up @@ -304,7 +304,7 @@ impl TableProvider for StandardTableProvider {
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
}

if include_now(filters, time_partition) {
if include_now(filters, time_partition.clone()) {
if let Some(records) =
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
{
Expand Down Expand Up @@ -333,6 +333,7 @@ impl TableProvider for StandardTableProvider {
projection,
filters,
limit,
time_partition.clone(),
)
.await;
}
Expand Down Expand Up @@ -375,6 +376,7 @@ impl TableProvider for StandardTableProvider {
filters,
limit,
state,
time_partition.clone(),
)
.await?;

Expand All @@ -400,6 +402,7 @@ impl TableProvider for StandardTableProvider {
filters,
limit,
state,
time_partition.clone(),
)
.await?;

Expand Down Expand Up @@ -437,11 +440,16 @@ async fn legacy_listing_table(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let remote_table = ListingTableBuilder::new(stream)
.populate_via_listing(glob_storage.clone(), object_store, time_filters)
.and_then(|builder| async {
let table = builder.build(schema.clone(), |x| glob_storage.query_prefixes(x))?;
let table = builder.build(
schema.clone(),
|x| glob_storage.query_prefixes(x),
time_partition,
)?;
let res = match table {
Some(table) => Some(table.scan(state, projection, filters, limit).await?),
_ => None,
Expand All @@ -459,6 +467,7 @@ fn final_plan(
schema: Arc<Schema>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut execution_plans = execution_plans.into_iter().flatten().collect_vec();

let exec: Arc<dyn ExecutionPlan> = if execution_plans.is_empty() {
let schema = match projection {
Some(projection) => Arc::new(schema.project(projection)?),
Expand All @@ -470,7 +479,6 @@ fn final_plan(
} else {
Arc::new(UnionExec::new(execution_plans))
};

Ok(exec)
}

Expand Down Expand Up @@ -557,34 +565,33 @@ impl PartialTimeFilter {
))))),
))
}

fn is_greater_than(&self, other: &NaiveDateTime) -> bool {
match self {
PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other,
PartialTimeFilter::Low(Bound::Included(time))
| PartialTimeFilter::High(Bound::Excluded(time))
| PartialTimeFilter::High(Bound::Included(time)) => time > other,
PartialTimeFilter::Eq(time) => time > other,
_ => unimplemented!(),
}
}
}

fn is_overlapping_query(
manifest_list: &[ManifestItem],
time_filters: &[PartialTimeFilter],
) -> bool {
// This is for backwards compatiblity. Older table format relies on listing.
// if the time is lower than upper bound of first file then we consider it overlapping
let Some(first_entry_upper_bound) =
manifest_list.iter().map(|file| file.time_upper_bound).min()
// if the start time is lower than lower bound of first file then we consider it overlapping
let Some(first_entry_lower_bound) =
manifest_list.iter().map(|file| file.time_lower_bound).min()
else {
return true;
};

!time_filters
.iter()
.all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc()))
for filter in time_filters {
match filter {
PartialTimeFilter::Low(Bound::Excluded(time))
| PartialTimeFilter::Low(Bound::Included(time)) => {
if time < &first_entry_lower_bound.naive_utc() {
return true;
}
}
_ => {}
}
}

false
}

fn include_now(filters: &[Expr], time_partition: Option<String>) -> bool {
Expand Down Expand Up @@ -862,7 +869,7 @@ mod tests {
let res = is_overlapping_query(
&manifest_items(),
&[PartialTimeFilter::Low(std::ops::Bound::Included(
datetime_min(2023, 12, 15).naive_utc(),
datetime_min(2023, 12, 14).naive_utc(),
))],
);

Expand All @@ -874,7 +881,7 @@ mod tests {
let res = is_overlapping_query(
&manifest_items(),
&[PartialTimeFilter::Low(std::ops::Bound::Included(
datetime_min(2023, 12, 15)
datetime_min(2023, 12, 14)
.naive_utc()
.add(Duration::hours(3)),
))],
Expand Down
5 changes: 4 additions & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,11 @@ pub trait ObjectStorage: Sync + 'static {
let cache_enabled = STREAM_INFO
.cache_enabled(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let time_partition = STREAM_INFO
.get_time_partition(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let dir = StorageDir::new(stream);
let schema = convert_disk_files_to_parquet(stream, &dir)
let schema = convert_disk_files_to_parquet(stream, &dir, time_partition)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;

if let Some(schema) = schema {
Expand Down
Loading

0 comments on commit a04dd4f

Please sign in to comment.