diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cb67d62d13..9109ac7e4f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -690,6 +690,7 @@ def scanner( strict_batch_size: Optional[bool] = None, order_by: Optional[List[Union[ColumnOrdering, str]]] = None, disable_scoring_autoprojection: Optional[bool] = None, + late_materialize_selectivity_threshold: Optional[float] = None, ) -> LanceScanner: """Return a Scanner that can support various pushdowns. @@ -870,6 +871,7 @@ def setopt(opt, val): setopt(builder.strict_batch_size, strict_batch_size) setopt(builder.order_by, order_by) setopt(builder.disable_scoring_autoprojection, disable_scoring_autoprojection) + setopt(builder.late_materialize_selectivity_threshold, late_materialize_selectivity_threshold) # columns=None has a special meaning. we can't treat it as "user didn't specify" if self._default_scan_options is None: # No defaults, use user-provided, if any @@ -4207,6 +4209,7 @@ def __init__(self, ds: LanceDataset): self._strict_batch_size = False self._orderings = None self._disable_scoring_autoprojection = False + self._late_materialize_selectivity_threshold: Optional[float] = None def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder: for key, value in default_opts.items(): @@ -4588,6 +4591,47 @@ def disable_scoring_autoprojection(self, disable: bool = True) -> ScannerBuilder self._disable_scoring_autoprojection = disable return self + def late_materialize_selectivity_threshold( + self, threshold: float + ) -> ScannerBuilder: + """ + Set the selectivity threshold for late materialization in filtered KNN searches. + + When a filter is present in a KNN search, Lance first executes it to measure selectivity. + If the filter selects fewer than this percentage of rows, Lance uses late materialization + (scan scalars first, then take vectors for filtered rows only). If the filter selects + this percentage or more rows, Lance does a single scan with both filter and vector columns + to avoid the random access overhead of the take operation. + + The optimal value depends on your storage medium: + - **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since + random access is very expensive + - **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper + - **NVMe**: Can use even higher thresholds like 0.1 (10%) + + The default is 0.005 (0.5%), which is conservative for object storage. + + Parameters + ---------- + threshold : float + The selectivity threshold as a fraction (e.g., 0.005 for 0.5%) + + Returns + ------- + ScannerBuilder + Returns self for method chaining + """ + if not isinstance(threshold, (int, float)): + raise TypeError( + f"late_materialize_selectivity_threshold must be a number, got {type(threshold)}" + ) + if not (0.0 <= threshold <= 1.0): + raise ValueError( + f"late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {threshold}" + ) + self._late_materialize_selectivity_threshold = float(threshold) + return self + def to_scanner(self) -> LanceScanner: scanner = self.ds._ds.scanner( self._columns, @@ -4616,6 +4660,7 @@ def to_scanner(self) -> LanceScanner: self._strict_batch_size, self._orderings, self._disable_scoring_autoprojection, + self._late_materialize_selectivity_threshold, ) return LanceScanner(scanner, self.ds) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 32c787ad9f..033ff2f72d 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -412,7 +412,7 @@ def make_fts_search(ds): assert "ScalarIndexQuery" in plan assert "MaterializeIndex" not in plan assert "KNNVectorDistance" in plan - assert "LanceScan" in plan + assert "LanceRead" in plan assert make_vec_search(ds).to_table().num_rows == 12 plan = make_fts_search(ds).explain_plan() diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 06cd596589..bc8a87dc4e 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -742,7 +742,7 @@ impl Dataset { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None))] + #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, late_materialize_selectivity_threshold=None))] fn scanner( self_: PyRef<'_, Self>, columns: Option>, @@ -771,6 +771,7 @@ impl Dataset { strict_batch_size: Option, order_by: Option>>, disable_scoring_autoprojection: Option, + late_materialize_selectivity_threshold: Option, ) -> PyResult { let mut scanner: LanceScanner = self_.ds.scan(); @@ -958,6 +959,12 @@ impl Dataset { scanner.strict_batch_size(strict_batch_size); } + if let Some(threshold) = late_materialize_selectivity_threshold { + scanner + .late_materialize_selectivity_threshold(threshold) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + } + if let Some(nearest) = nearest { let column = nearest .get_item("column")? diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 50cdbcd2aa..f4ca16c231 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -411,6 +411,21 @@ pub struct ExecutionSummaryCounts { pub all_counts: HashMap, } +impl ExecutionSummaryCounts { + /// Create a new ExecutionSummaryCounts with all values initialized to zero + pub fn new() -> Self { + Self::default() + } + + /// Create a new ExecutionSummaryCounts with only custom counts + pub fn with_counts(counts: impl IntoIterator, usize)>) -> Self { + Self { + all_counts: counts.into_iter().map(|(k, v)| (k.into(), v)).collect(), + ..Default::default() + } + } +} + fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) { if let Some(metrics) = node.metrics() { for (metric_name, count) in metrics.iter_counts() { diff --git a/rust/lance-tools/src/meta.rs b/rust/lance-tools/src/meta.rs index d32fa8987f..f45b17dd8f 100644 --- a/rust/lance-tools/src/meta.rs +++ b/rust/lance-tools/src/meta.rs @@ -47,7 +47,7 @@ impl LanceToolFileMetadata { .open_file(&path, &CachedFileSize::unknown()) .await?; let file_metadata = FileReader::read_all_metadata(&file_scheduler).await?; - let lance_tool_file_metadata = LanceToolFileMetadata { file_metadata }; + let lance_tool_file_metadata = Self { file_metadata }; Ok(lance_tool_file_metadata) } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ec0ac823f4..d3fd1bc0a8 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -8873,12 +8873,7 @@ mod tests { } fn make_tx(read_version: u64) -> Transaction { - Transaction::new( - read_version, - Operation::Append { fragments: vec![] }, - None, - None, - ) + Transaction::new(read_version, Operation::Append { fragments: vec![] }, None) } async fn delete_external_tx_file(ds: &Dataset) { @@ -8939,7 +8934,6 @@ mod tests { ds.load_indices().await.unwrap().as_ref().clone(), &tx_file, &ManifestWriteConfig::default(), - None, ) .unwrap(); let location = write_manifest_file( diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index b1f2d07540..b18c7ebff8 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::BTreeSet; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -50,7 +51,11 @@ use lance_core::error::LanceOptionExt; use lance_core::utils::address::RowAddress; use lance_core::utils::mask::{RowIdMask, RowIdTreeMap}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; -use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; +use lance_core::{ + ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET, +}; +#[cfg(feature = "substrait")] +use lance_datafusion::exec::get_session_context; use lance_datafusion::exec::{ analyze_plan, execute_plan, LanceExecutionOptions, OneShotExec, StrictBatchSizeExec, }; @@ -78,7 +83,7 @@ use crate::dataset::row_offsets_to_row_addresses; use crate::dataset::utils::SchemaAdapter; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::index::DatasetIndexInternalExt; -use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions}; +use crate::io::exec::filtered_read::{AdaptiveColumnConfig, FilteredReadExec, FilteredReadOptions}; use crate::io::exec::fts::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec}; use crate::io::exec::knn::MultivectorScoringExec; use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec}; @@ -128,6 +133,13 @@ pub static DEFAULT_IO_BUFFER_SIZE: LazyLock = LazyLock::new(|| { .unwrap_or(2 * 1024 * 1024 * 1024) }); +/// Selectivity threshold for late materialization in filtered KNN searches. +/// If the filter selects fewer than this percentage of the table's rows, we use late +/// materialization to avoid fetching vector data for rows that will be filtered out. +/// If the filter selects this percentage or more rows, we do a single scan with vectors +/// to avoid the random access overhead of the take operation. +pub const LATE_MATERIALIZE_SELECTIVITY_THRESHOLD: f64 = 0.005; // 0.5% + /// Defines an ordering for a single column /// /// Floats are sorted using the IEEE 754 total ordering @@ -227,6 +239,21 @@ struct PlannedFilteredScan { filter_pushed_down: bool, } +/// Empty index info provider for cases where scalar indices are not used +struct EmptyIndexInfo; + +impl lance_index::scalar::expression::IndexInformationProvider for EmptyIndexInfo { + fn get_index( + &self, + _col: &str, + ) -> Option<( + &arrow_schema::DataType, + &dyn lance_index::scalar::expression::ScalarQueryParser, + )> { + None + } +} + /// Filter for filtering rows #[derive(Debug, Clone)] pub enum LanceFilter { @@ -275,8 +302,6 @@ impl LanceFilter { } #[cfg(feature = "substrait")] Self::Substrait(expr) => { - use lance_datafusion::exec::{get_session_context, LanceExecutionOptions}; - let ctx = get_session_context(&LanceExecutionOptions::default()); let state = ctx.state(); let schema = Arc::new(ArrowSchema::from(dataset_schema)); @@ -407,6 +432,9 @@ pub struct Scanner { /// File reader options to use when reading data files. file_reader_options: Option, + /// Selectivity threshold for late materialization in filtered KNN searches. + late_materialize_selectivity_threshold: Option, + // Legacy fields to help migrate some old projection behavior to new behavior // // There are two behaviors we are moving away from: @@ -623,6 +651,7 @@ impl Scanner { scan_stats_callback: None, strict_batch_size: false, file_reader_options, + late_materialize_selectivity_threshold: None, legacy_with_row_addr: false, legacy_with_row_id: false, explicit_projection: false, @@ -856,6 +885,59 @@ impl Scanner { self } + /// Set the selectivity threshold for late materialization in filtered KNN searches. + /// + /// When a filter is present in a KNN search, we execute it first to measure selectivity. + /// If the filter selects fewer than this percentage of rows, we use late materialization + /// (scan scalars first, then take vectors for filtered rows only). If the filter selects + /// this percentage or more rows, we do a single scan with both filter and vector columns to avoid the + /// random access overhead of the take operation. + /// + /// The optimal value depends on your storage medium: + /// - **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since + /// random access is very expensive + /// - **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper + /// - **NVMe**: Can use even higher thresholds like 0.1 (10%) + /// + /// The default is 0.005 (0.5%), which is conservative for object storage. + /// + /// # Arguments + /// + /// * `threshold` - The selectivity threshold as a fraction (e.g., 0.005 for 0.5%) + /// + /// # Example + /// + /// ```ignore + /// // For local SSD, use a higher threshold + /// scanner.late_materialize_selectivity_threshold(0.05)?; + /// ``` + /// + /// # Errors + /// + /// Returns an error if the threshold is not finite or is outside the range [0.0, 1.0]. + pub fn late_materialize_selectivity_threshold(&mut self, threshold: f64) -> Result<&mut Self> { + if !threshold.is_finite() { + return Err(Error::invalid_input( + format!( + "late_materialize_selectivity_threshold must be a finite value, got {}", + threshold + ), + location!(), + )); + } + if !(0.0..=1.0).contains(&threshold) { + return Err(Error::invalid_input( + format!( + "late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {}", + threshold + ), + location!(), + )); + } + self.late_materialize_selectivity_threshold = Some(threshold); + Ok(self) + } + /// Set the prefetch size. /// Ignored in v2 and newer format pub fn batch_readahead(&mut self, nbatches: usize) -> &mut Self { @@ -1921,8 +2003,6 @@ impl Scanner { // Check if a filter plan references version columns fn filter_references_version_columns(&self, filter_plan: &FilterPlan) -> bool { - use lance_core::{ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION}; - if let Some(refine_expr) = &filter_plan.refine_expr { let column_names = Planner::column_names_in_expr(refine_expr); for col_name in column_names { @@ -2832,96 +2912,163 @@ impl Scanner { )); }; - // Sanity check let (vector_type, _) = get_vector_type(self.dataset.schema(), &q.column)?; - let column_id = self.dataset.schema().field_id(q.column.as_str())?; + let use_index = self.nearest.as_ref().map(|q| q.use_index).unwrap_or(false); let indices = if use_index { self.dataset.load_indices().await? } else { Arc::new(vec![]) }; - if let Some(index) = indices.iter().find(|i| i.fields.contains(&column_id)) { - log::trace!("index found for vector search"); - // There is an index built for the column. - // We will use the index. - if matches!(q.refine_factor, Some(0)) { - return Err(Error::invalid_input( - "Refine factor cannot be zero".to_string(), - location!(), - )); + + match indices.iter().find(|i| i.fields.contains(&column_id)) { + Some(index) => { + log::trace!("index found for vector search"); + self.vector_search_with_vector_index(q, index, vector_type, filter_plan) + .await } + None => self.vector_search_flat(q, filter_plan).await, + } + } - // Find all deltas with the same index name. - let deltas = self.dataset.load_indices_by_name(&index.name).await?; - let ann_node = match vector_type { - DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?, - DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?, - _ => unreachable!(), - }; + /// Index-based vector search (ANN) + /// + /// Uses a vector index to perform approximate nearest neighbor search. + async fn vector_search_with_vector_index( + &self, + q: &Query, + index: &IndexMetadata, + vector_type: DataType, + filter_plan: &FilterPlan, + ) -> Result> { + if matches!(q.refine_factor, Some(0)) { + return Err(Error::invalid_input( + "Refine factor cannot be zero".to_string(), + location!(), + )); + } - let mut knn_node = if q.refine_factor.is_some() { - let vector_projection = self - .dataset - .empty_projection() - .union_column(&q.column, OnMissing::Error) - .unwrap(); - let knn_node_with_vector = self.take(ann_node, vector_projection)?; - // TODO: now we just open an index to get its metric type. - let idx = self - .dataset - .open_vector_index( - q.column.as_str(), - &index.uuid.to_string(), - &NoOpMetricsCollector, - ) - .await?; - let mut q = q.clone(); - q.metric_type = idx.metric_type(); - self.flat_knn(knn_node_with_vector, &q)? - } else { - ann_node - }; // vector, _distance, _rowid + // Find all deltas with the same index name + let deltas = self.dataset.load_indices_by_name(&index.name).await?; + let ann_plan = match vector_type { + DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?, + DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?, + _ => unreachable!(), + }; - if !self.fast_search { - knn_node = self.knn_combined(q, index, knn_node, filter_plan).await?; - } + let mut plan = if q.refine_factor.is_some() { + self.apply_vector_refinement(ann_plan, q, index).await? + } else { + ann_plan + }; + + if !self.fast_search { + plan = self.knn_combined(q, index, plan, filter_plan).await?; + } - Ok(knn_node) + Ok(plan) + } + + /// Flat (non-indexed) vector search + /// + /// Performs brute-force KNN search by scanning all data and computing distances. + async fn vector_search_flat( + &self, + q: &Query, + filter_plan: &FilterPlan, + ) -> Result> { + let has_filter = filter_plan.full_expr.is_some() || filter_plan.refine_expr.is_some(); + let has_scalar_index = filter_plan.index_query.is_some(); + + let plan = if has_filter && !has_scalar_index { + // Use adaptive late materialization for filters without scalar indices + self.adaptive_column_scan(filter_plan, None, &q.column, false) + .await? } else { - // No index found. use flat search. - let mut columns = vec![q.column.clone()]; - if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { - columns.extend(Planner::column_names_in_expr(refine_expr)); - } - let mut vector_scan_projection = self - .dataset - .empty_projection() - .with_row_id() - .union_columns(&columns, OnMissing::Error)?; + // Direct scan when no filter or when scalar index is present + self.build_direct_vector_scan(q, filter_plan, None, /*include_deleted_rows=*/ true) + .await? + }; - vector_scan_projection.with_row_addr = - self.projection_plan.physical_projection.with_row_addr; + self.flat_knn(plan, q) + } - let PlannedFilteredScan { mut plan, .. } = self - .filtered_read( - filter_plan, - vector_scan_projection, - /*include_deleted_rows=*/ true, - None, - None, - /*is_prefilter= */ true, - ) - .await?; + /// Build a direct scan plan for vector search + /// + /// Performs a direct filtered read when no filter is present or when scalar indices are used. + /// Scalar indices use MaterializeIndex which already optimizes I/O. + async fn build_direct_vector_scan( + &self, + q: &Query, + filter_plan: &FilterPlan, + fragments: Option>>, + include_deleted_rows: bool, + ) -> Result> { + let mut scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_column(&q.column, OnMissing::Error)?; - if let Some(refine_expr) = &filter_plan.refine_expr { - plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); - } - Ok(self.flat_knn(plan, q)?) + // Include columns referenced by refine expression + if let Some(ref refine_expr) = filter_plan.refine_expr { + let refine_cols = Planner::column_names_in_expr(refine_expr); + scan_projection = scan_projection.union_columns(refine_cols, OnMissing::Error)?; + } + + scan_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + filter_plan, + scan_projection, + include_deleted_rows, + fragments, + None, + /*is_prefilter=*/ true, + ) + .await?; + + // Apply refine filter if present + if let Some(ref refine_expr) = filter_plan.refine_expr { + Ok(Arc::new(LanceFilterExec::try_new( + refine_expr.clone(), + plan, + )?)) + } else { + Ok(plan) } } + /// Apply refinement to ANN results by fetching vectors and recomputing distances + async fn apply_vector_refinement( + &self, + ann_node: Arc, + q: &Query, + index: &IndexMetadata, + ) -> Result> { + let vector_projection = self + .dataset + .empty_projection() + .union_column(&q.column, OnMissing::Error)?; + let knn_node_with_vector = self.take(ann_node, vector_projection)?; + + // Open index to get its metric type + let idx = self + .dataset + .open_vector_index( + q.column.as_str(), + &index.uuid.to_string(), + &NoOpMetricsCollector, + ) + .await?; + + let mut q = q.clone(); + q.metric_type = idx.metric_type(); + self.flat_knn(knn_node_with_vector, &q) + } + /// Combine ANN results with KNN results for data appended after index creation async fn knn_combined( &self, @@ -2957,36 +3104,41 @@ impl Scanner { knn_node = self.take(knn_node, vector_projection)?; } - let mut columns = vec![q.column.clone()]; - if let Some(expr) = filter_plan.full_expr.as_ref() { - let filter_columns = Planner::column_names_in_expr(expr); - columns.extend(filter_columns); - } - let vector_scan_projection = Arc::new(self.dataset.schema().project(&columns).unwrap()); - // Note: we could try and use the scalar indices here to reduce the scope of this scan but the - // most common case is that fragments that are newer than the vector index are going to be newer - // than the scalar indices anyways - let mut scan_node = self.scan_fragments( - true, - false, - false, - false, - false, - vector_scan_projection, - Arc::new(unindexed_fragments), - // Can't pushdown limit/offset in an ANN search - None, - // We are re-ordering anyways, so no need to get data in data - // in a deterministic order. - false, - ); + // Create a filter plan without scalar indices since they don't cover this fragment subset. + let unindexed_filter_plan = if let Some(expr) = filter_plan.full_expr.clone() { + let filter_schema = self.filterable_schema()?; + let planner = Planner::new(Arc::new(filter_schema.as_ref().into())); - if let Some(expr) = filter_plan.full_expr.as_ref() { - // If there is a prefilter we need to manually apply it to the new data - scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); - } - // first we do flat search on just the new data - let topk_appended = self.flat_knn(scan_node, &q)?; + planner.create_filter_plan(expr, &EmptyIndexInfo, false)? + } else { + FilterPlan::empty() + }; + + let has_filter = unindexed_filter_plan.full_expr.is_some() + || unindexed_filter_plan.refine_expr.is_some(); + let has_scalar_index = unindexed_filter_plan.index_query.is_some(); + + let plan = if has_filter && !has_scalar_index { + // Adaptive late materialization when there's a filter without scalar index + self.adaptive_column_scan( + &unindexed_filter_plan, + Some(Arc::new(unindexed_fragments.clone())), + &q.column, + unindexed_filter_plan.skip_recheck, + ) + .await? + } else { + // Direct scan when no filter or scalar index is present + self.build_direct_vector_scan( + &q, + &unindexed_filter_plan, + Some(Arc::new(unindexed_fragments.clone())), + /*include_deleted_rows=*/ false, + ) + .await? + }; + + let topk_appended = self.flat_knn(plan, &q)?; // To do a union, we need to make the schemas match. Right now // knn_node: _distance, _rowid, vector @@ -3618,6 +3770,158 @@ impl Scanner { } } + /// Helper: performs a full scan with filter and vector columns together + async fn full_scan_with_filter( + &self, + filter_plan: &FilterPlan, + frags: Option>>, + expensive_col: &str, + filter_cols: Vec, + skip_recheck: bool, + ) -> Result> { + // Deduplicate columns to avoid issues if vector_column is in filter_columns + let mut cols: BTreeSet = filter_cols.into_iter().collect(); + cols.insert(expensive_col.to_string()); + let cols: Vec = cols.into_iter().collect(); + + let mut projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&cols, OnMissing::Error)?; + + projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + filter_plan, + projection, + /*include_deleted_rows=*/ false, + frags, + None, + /*is_prefilter= */ true, + ) + .await?; + + if !skip_recheck { + if let Some(refine_expr) = &filter_plan.refine_expr { + return Ok(Arc::new(LanceFilterExec::try_new( + refine_expr.clone(), + plan, + )?)); + } + } + + Ok(plan) + } + + /// Get the total row count from the given fragments, or fall back to scanner/dataset fragments. + fn get_total_row_count(&self, fragments: Option<&Arc>>) -> usize { + if let Some(frags) = fragments { + frags.iter().filter_map(|f| f.num_rows()).sum() + } else { + let frags = if let Some(scanner_frags) = self.fragments.as_ref() { + scanner_frags + } else { + self.dataset.fragments().as_ref() + }; + frags.iter().filter_map(|f| f.num_rows()).sum() + } + } + + /// Adaptive late materialization for filtered vector scans. + /// + /// When a filter is present, this method: + /// 1. Scans with scalar columns first to check selectivity + /// 2. If selective (< threshold): uses late materialization (collect row IDs, then take vectors) + /// 3. If not selective (>= threshold): does a full scan with both filter and vector columns + /// + /// This avoids expensive random access for non-selective filters while benefiting + /// from late materialization for selective ones. + async fn adaptive_column_scan( + &self, + filter_plan: &FilterPlan, + frags: Option>>, + take_column: &str, + skip_recheck: bool, + ) -> Result> { + // FilteredRead doesn't support v1/legacy files, so fall back for legacy datasets + if self.dataset.is_legacy_storage() { + let mut filter_cols: BTreeSet = BTreeSet::new(); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(refine_expr)); + } + if let Some(full_expr) = filter_plan.full_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(full_expr)); + } + let mut filter_cols: Vec = filter_cols.into_iter().collect(); + filter_cols.sort(); + return self + .full_scan_with_filter(filter_plan, frags, take_column, filter_cols, skip_recheck) + .await; + } + + // Build full projection (filter columns + vector column) + let mut filter_cols: BTreeSet = BTreeSet::new(); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(refine_expr)); + } + if let Some(full_expr) = filter_plan.full_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(full_expr)); + } + let mut filter_cols: Vec = filter_cols.into_iter().collect(); + filter_cols.sort(); + let mut full_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&filter_cols, OnMissing::Error)? + .union_column(take_column, OnMissing::Error)?; + full_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + + // Use new_filtered_read but add adaptive config + let threshold = self + .late_materialize_selectivity_threshold + .unwrap_or(LATE_MATERIALIZE_SELECTIVITY_THRESHOLD); + + let total_count = self.get_total_row_count(frags.as_ref()); + + let mut plan = self + .new_filtered_read( + filter_plan, + full_projection, + /*make_deletions_null=*/ false, + frags.clone(), + /*scan_range=*/ None, + ) + .await?; + + // Unwrap FilteredReadExec to add adaptive config + if let Some(filtered_exec) = plan.as_any().downcast_ref::() { + let mut opts = filtered_exec.options().clone(); + opts.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: take_column.to_string(), + threshold, + total_row_count: total_count, + }); + + plan = Arc::new(FilteredReadExec::try_new( + filtered_exec.dataset().clone(), + opts, + filtered_exec.index_input().cloned(), + )?); + } + + // Apply refine filter if needed + if !skip_recheck { + if let Some(refine_expr) = &filter_plan.refine_expr { + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); + } + } + + Ok(plan) + } + /// Global offset-limit of the result of the input plan fn limit_node(&self, plan: Arc) -> Arc { Arc::new(GlobalLimitExec::new( @@ -3877,7 +4181,6 @@ pub mod test_dataset { #[cfg(test)] mod test { - use std::collections::BTreeSet; use std::time::{Duration, Instant}; use std::vec; @@ -4053,6 +4356,166 @@ mod test { } } + #[tokio::test] + async fn test_late_materialize_selectivity_threshold() { + use super::test_dataset::TestVectorDataset; + + // Create a test dataset with vectors and a filter column (NO INDICES) + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + + // Use a very selective filter "i < 2" which matches only 2/400 rows (0.5% selectivity) + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // Test 1: With default threshold (0.005 = 0.5%) + // Adaptive late materialization is enabled internally in FilteredReadExec + let plan_str_default = dataset + .scan() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Test 2: With very high threshold (0.99 = 99%) + let plan_str_high = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Test 3: With zero threshold (0.0) + let plan_str_zero = dataset + .scan() + .late_materialize_selectivity_threshold(0.0) + .unwrap() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // All three should successfully build plans with LanceRead + // The adaptive behavior is internal to FilteredReadExec, not visible in the plan + assert!( + plan_str_default.contains("LanceRead"), + "Default threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_default + ); + + assert!( + plan_str_high.contains("LanceRead"), + "High threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_high + ); + + assert!( + plan_str_zero.contains("LanceRead"), + "Zero threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_zero + ); + } + + /// Verifies that filters referencing the vector column skip late materialization + #[tokio::test] + async fn test_late_mat_skipped_when_filter_uses_vector() { + use super::test_dataset::TestVectorDataset; + + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // Filter references vector column - should skip late materialization + let plan_vector_filter = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() // High threshold + .nearest("vec", &q, 10) + .unwrap() + .filter("vec[0] > 0.0") // Filter uses vector column + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Should NOT use late materialization even with high threshold + assert!( + !plan_vector_filter.contains("AdaptiveColumnScan"), + "Should skip late materialization when filter references vector column" + ); + } + + /// Verifies threshold configuration: both plans use AdaptiveColumnScanExec with different thresholds + #[tokio::test] + async fn test_late_mat_threshold_short_circuit() { + use super::test_dataset::TestVectorDataset; + + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // 1% threshold with 50% selectivity → FilteredReadExec will choose full scan at runtime + let plan_low_threshold = dataset + .scan() + .late_materialize_selectivity_threshold(0.01) + .unwrap() + .nearest("vec", &q, 10) + .unwrap() + .filter("i < 200") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // 99% threshold with 50% selectivity → FilteredReadExec will choose late materialization at runtime + let plan_high_threshold = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() + .nearest("vec", &q, 10) + .unwrap() + .filter("i < 200") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Both should successfully build plans with LanceRead + assert!(plan_low_threshold.contains("LanceRead")); + assert!(plan_high_threshold.contains("LanceRead")); + + // The plans should show adaptive configuration + assert!(plan_low_threshold.contains("adaptive_column=vec")); + assert!(plan_high_threshold.contains("adaptive_column=vec")); + assert!(plan_low_threshold.contains("threshold=0.01")); + assert!(plan_high_threshold.contains("threshold=0.99")); + } + #[cfg(not(windows))] #[tokio::test] async fn test_local_object_store() { @@ -6878,13 +7341,13 @@ mod test { ) .await?; - // use_index = False -> same plan as KNN + // use_index = False -> same plan as KNN without late materialization (no filter) log::info!("Test case: ANN with index disabled"); let expected = if data_storage_version == LanceFileVersion::Legacy { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 - FilterExec: _distance@... IS NOT NULL + FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=13), expr=... KNNVectorDistance: metric=l2 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None" @@ -6892,7 +7355,7 @@ mod test { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 - FilterExec: _distance@... IS NOT NULL + FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=13), expr=... KNNVectorDistance: metric=l2 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \ @@ -6963,24 +7426,45 @@ mod test { dataset.append_new_data().await?; log::info!("Test case: Combined KNN/ANN"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... KNNVectorDistance: metric=l2 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + ANNSubIndex: name=..., k=6, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=-- + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... ANNSubIndex: name=..., k=6, deltas=1 - ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1"; + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + }; assert_plan_equals( &dataset.dataset, |scan| scan.nearest("vec", &q, 6), @@ -6990,9 +7474,10 @@ mod test { ) .await?; - // new data and with filter + // new data and with filter (postfilter - no late materialization) log::info!("Test case: Combined KNN/ANN with postfilter"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: i@3 > 10 @@ -7012,7 +7497,30 @@ mod test { CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=15), expr=... ANNSubIndex: name=..., k=15, deltas=1 - ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1"; + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, i, (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: i@3 > 10 + Take: columns=\"_rowid, vec, _distance, (i)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=-- + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=15), expr=... + ANNSubIndex: name=..., k=15, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + }; assert_plan_equals( &dataset.dataset, |scan| scan.nearest("vec", &q, 15)?.filter("i > 10"), @@ -7020,7 +7528,7 @@ mod test { ) .await?; - // new data and with prefilter + // new data and with prefilter (filter has 97% selectivity, so no late materialization) log::info!("Test case: Combined KNN/ANN with prefilter"); let expected = if data_storage_version == LanceFileVersion::Legacy { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] @@ -7031,12 +7539,12 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=5), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=5), expr=... @@ -7053,18 +7561,18 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=5), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=5), expr=... ANNSubIndex: name=..., k=5, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \ + LanceRead: uri=..., projection=[], num_fragments=..., range_before=None, range_after=None, \ row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)" }; assert_plan_equals( @@ -7143,8 +7651,9 @@ mod test { dataset.append_new_data().await?; - log::info!("Test case: Combined KNN/ANN with scalar index"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + log::info!("Test case: Combined KNN/ANN with scalar index (filter has 97% selectivity, so no late materialization)"); + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL @@ -7152,18 +7661,40 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=8), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=8), expr=... ANNSubIndex: name=..., k=8, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - ScalarIndexQuery: query=[i > 10]@i_idx"; + ScalarIndexQuery: query=[i > 10]@i_idx" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=8), expr=... + ANNSubIndex: name=..., k=8, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 + ScalarIndexQuery: query=[i > 10]@i_idx" + }; assert_plan_equals( &dataset.dataset, |scan| { @@ -7178,9 +7709,10 @@ mod test { // Update scalar index but not vector index log::info!( - "Test case: Combined KNN/ANN with updated scalar index and outdated vector index" + "Test case: Combined KNN/ANN with updated scalar index and outdated vector index (filter has 97% selectivity, so no late materialization)" ); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL @@ -7188,18 +7720,40 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=11), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=11), expr=... ANNSubIndex: name=..., k=11, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - ScalarIndexQuery: query=[i > 10]@i_idx"; + ScalarIndexQuery: query=[i > 10]@i_idx" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=11), expr=... + ANNSubIndex: name=..., k=11, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 + ScalarIndexQuery: query=[i > 10]@i_idx" + }; dataset.make_scalar_index().await?; assert_plan_equals( &dataset.dataset, @@ -7576,7 +8130,8 @@ mod test { FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]... KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=--, refine_filter=-- Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]... @@ -8399,6 +8954,93 @@ mod test { } } + /// Test vector search with filter on nested struct field to ensure column name handling is robust + #[tokio::test] + async fn test_vector_search_with_nested_struct_filter() { + use lance_arrow::FixedSizeListArrayExt; + + // Create minimal dataset with nested struct + vector + let category = StringArray::from(vec!["urgent", "normal", "urgent", "low"]); + let priority = Int32Array::from(vec![10, 50, 20, 80]); + let metadata = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("category", DataType::Utf8, false)), + Arc::new(category) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("priority", DataType::Int32, false)), + Arc::new(priority) as ArrayRef, + ), + ]); + + let vectors = FixedSizeListArray::try_new_from_values( + Float32Array::from_iter_values((0..32).map(|v| v as f32)), + 8, + ) + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("metadata", metadata.data_type().clone(), false), + ArrowField::new("vector", vectors.data_type().clone(), false), + ])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(metadata), Arc::new(vectors)]) + .unwrap(); + + let tmp_dir = TempStrDir::default(); + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + let dataset = Dataset::write(reader, &tmp_dir, None).await.unwrap(); + + let query_vec = Float32Array::from_iter_values((0..8).map(|v| v as f32)); + + // Test 1: Filter on nested string field + let results = dataset + .scan() + .nearest("vector", &query_vec, 10) + .unwrap() + .filter("metadata.category = 'urgent'") + .unwrap() + .prefilter(true) + .try_into_batch() + .await + .unwrap(); + + // Should return 2 rows (indices 0 and 2 have category='urgent') + assert_eq!(results.num_rows(), 2); + let metadata_col = results.column_by_name("metadata").unwrap().as_struct(); + let category_col = metadata_col + .column_by_name("category") + .unwrap() + .as_string::(); + for i in 0..results.num_rows() { + assert_eq!(category_col.value(i), "urgent"); + } + + // Test 2: Filter on nested int field + let results2 = dataset + .scan() + .nearest("vector", &query_vec, 10) + .unwrap() + .filter("metadata.priority >= 50") + .unwrap() + .prefilter(true) + .try_into_batch() + .await + .unwrap(); + + // Should return 2 rows (indices 1 and 3 have priority >= 50) + assert_eq!(results2.num_rows(), 2); + let metadata_col2 = results2.column_by_name("metadata").unwrap().as_struct(); + let priority_col = metadata_col2 + .column_by_name("priority") + .unwrap() + .as_primitive::(); + for i in 0..results2.num_rows() { + assert!(priority_col.value(i) >= 50); + } + } + #[test_log::test(test)] fn test_scan_finishes_all_tasks() { // Need to use multi-threaded runtime otherwise tasks don't run unless someone is polling somewhere diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index e1dd87195a..d99dc7e613 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -34,7 +34,7 @@ use lance_core::utils::deletion::DeletionVector; use lance_core::utils::futures::FinallyStreamExt; use lance_core::utils::mask::RowIdMask; use lance_core::utils::tokio::get_num_compute_intensive_cpus; -use lance_core::{datatypes::Projection, Error, Result}; +use lance_core::{datatypes::Projection, Error, Result, ROW_ID}; use lance_datafusion::planner::Planner; use lance_datafusion::utils::{ ExecutionPlanMetricsSetExt, FRAGMENTS_SCANNED_METRIC, RANGES_SCANNED_METRIC, @@ -55,6 +55,7 @@ use crate::dataset::rowids::load_row_id_sequence; use crate::dataset::scanner::{ get_default_batch_size, BATCH_SIZE_FALLBACK, DEFAULT_FRAGMENT_READAHEAD, }; +use crate::io::exec::TakeExec; use crate::Dataset; use super::utils::IoMetrics; @@ -199,6 +200,10 @@ pub struct FilteredReadGlobalMetrics { ranges_scanned: Count, rows_scanned: Count, io_metrics: IoMetrics, + // Adaptive late materialization metrics + adaptive_used_take: Count, + adaptive_used_full_scan: Count, + adaptive_pass1_rows: Count, } impl FilteredReadGlobalMetrics { @@ -208,6 +213,9 @@ impl FilteredReadGlobalMetrics { ranges_scanned: metrics.new_count(RANGES_SCANNED_METRIC, 0), rows_scanned: metrics.new_count(ROWS_SCANNED_METRIC, 0), io_metrics: IoMetrics::new(metrics, 0), + adaptive_used_take: metrics.new_count("adaptive_used_take", 0), + adaptive_used_full_scan: metrics.new_count("adaptive_used_full_scan", 0), + adaptive_pass1_rows: metrics.new_count("adaptive_pass1_rows", 0), } } } @@ -1185,6 +1193,17 @@ impl FilteredReadStream { } } +/// Configuration for adaptive expensive column handling +#[derive(Debug, Clone)] +pub struct AdaptiveColumnConfig { + /// Column to handle adaptively (will be excluded from initial scan) + pub expensive_column: String, + /// Selectivity threshold (0.0 to 1.0) + pub threshold: f64, + /// Total row count for selectivity calculation + pub total_row_count: usize, +} + /// Options for a filtered read. #[derive(Debug, Clone)] pub struct FilteredReadOptions { @@ -1213,6 +1232,8 @@ pub struct FilteredReadOptions { pub threading_mode: FilteredReadThreadingMode, /// The size of the I/O buffer to use for the scan pub io_buffer_size_bytes: Option, + /// Enable adaptive expensive column handling + pub adaptive_expensive_column: Option, } impl FilteredReadOptions { @@ -1243,6 +1264,7 @@ impl FilteredReadOptions { threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads( get_num_compute_intensive_cpus(), ), + adaptive_expensive_column: None, } } @@ -1390,6 +1412,12 @@ impl FilteredReadOptions { self.io_buffer_size_bytes = Some(io_buffer_size); self } + + /// Enable adaptive expensive column handling + pub fn with_adaptive_column(mut self, config: AdaptiveColumnConfig) -> Self { + self.adaptive_expensive_column = Some(config); + self + } } /// A plan node that reads a dataset, applying an optional filter and projection. @@ -1494,6 +1522,11 @@ impl FilteredReadExec { partition: usize, context: Arc, ) -> SendableRecordBatchStream { + // Check if adaptive mode is enabled + if self.options.adaptive_expensive_column.is_some() { + return self.obtain_adaptive_stream(partition, context); + } + // There are two subtleties here: // // First, we need to defer execution until first polled (hence the once/flatten) @@ -1538,6 +1571,288 @@ impl FilteredReadExec { Box::pin(RecordBatchStreamAdapter::new(self.schema(), stream)) } + /// Adaptive stream that decides at runtime whether to use take or full scan + fn obtain_adaptive_stream( + &self, + partition: usize, + context: Arc, + ) -> SendableRecordBatchStream { + use arrow_array::UInt64Array; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use futures::StreamExt; + use lance_datafusion::exec::OneShotExec; + + let dataset = self.dataset.clone(); + let options = self.options.clone(); + let metrics = self.metrics.clone(); + let index_input = self.index_input.clone(); + let output_schema = self.schema(); + let output_schema_for_adapter = output_schema.clone(); + + let stream = futures::stream::once(async move { + let config = options.adaptive_expensive_column.as_ref().unwrap().clone(); + + // Check if adaptive materialization is applicable + // If not applicable, fall back to normal stream + + // 1. Check if filter uses the expensive column + // If it does, we'd need to load it anyway for filtering, so no benefit + let filter_columns: Vec = { + let mut cols = Vec::new(); + if let Some(ref full_filter) = options.full_filter { + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr( + full_filter, + )); + } + if let Some(ref refine_filter) = options.refine_filter { + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr( + refine_filter, + )); + } + cols + }; + + if filter_columns.contains(&config.expensive_column) { + // Fall back to normal stream - filter needs the expensive column + let normal_stream = + FilteredReadStream::try_new(dataset.clone(), options, &metrics, None).await?; + return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); + } + + // 2. Check if total_row_count is valid + if config.total_row_count == 0 { + let normal_stream = + FilteredReadStream::try_new(dataset.clone(), options, &metrics, None).await?; + return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); + } + + // PASS 1: Scan cheap columns (exclude expensive column) + // Build projection without the expensive column + let projection_schema = options.projection.to_bare_schema(); + let cheap_columns: Vec = projection_schema + .fields + .iter() + .map(|f| f.name.clone()) + .filter(|name| name != &config.expensive_column) + .collect(); + + let cheap_column_refs: Vec<&str> = cheap_columns.iter().map(|s| s.as_str()).collect(); + let cheap_projection = dataset + .empty_projection() + .union_columns(cheap_column_refs, OnMissing::Error)? + .with_row_id(); + + let mut cheap_options = options.clone(); + cheap_options.projection = cheap_projection; + cheap_options.adaptive_expensive_column = None; // Disable recursion + + // Evaluate index if present + let mut evaluated_index = None; + if let Some(index_input) = index_input { + let mut index_search = index_input + .execute(partition, context.clone()) + .map_err(Error::from)?; + let index_search_result = index_search + .next() + .await + .ok_or_else(|| Error::Internal { + message: "Index search did not yield any results".to_string(), + location: location!(), + })? + .map_err(Error::from)?; + evaluated_index = Some(Arc::new(EvaluatedIndex::try_from_arrow( + &index_search_result, + )?)); + } + + let cheap_stream = FilteredReadStream::try_new( + dataset.clone(), + cheap_options, + &metrics, + evaluated_index.clone(), + ) + .await?; + + let mut cheap_stream_boxed = cheap_stream.get_stream(&metrics, partition); + + // Collect row IDs with EARLY TERMINATION + let threshold_count = + (config.total_row_count as f64 * config.threshold).ceil() as usize; + let mut row_ids = Vec::new(); + let mut cheap_batches = Vec::new(); + let mut filtered_count = 0; + + while let Some(batch_result) = cheap_stream_boxed.next().await { + let batch = batch_result.map_err(Error::from)?; + + // EARLY STOP CHECK + if filtered_count + batch.num_rows() >= threshold_count { + // NOT SELECTIVE - switch to full scan + drop(cheap_stream_boxed); // Drop the cheap stream + + // Record metrics: full scan path taken + let global_metrics = FilteredReadGlobalMetrics::new(&metrics); + global_metrics.adaptive_used_full_scan.add(1); + global_metrics.adaptive_pass1_rows.add(filtered_count); + + let full_stream = FilteredReadStream::try_new( + dataset.clone(), + options, + &metrics, + evaluated_index, + ) + .await?; + + return DataFusionResult::Ok(full_stream.get_stream(&metrics, partition)); + } + + filtered_count += batch.num_rows(); + + // Extract row IDs + let rowid_col = batch + .column_by_name(ROW_ID) + .ok_or_else(|| Error::Internal { + message: format!("Expected {} column in batch", ROW_ID), + location: location!(), + })?; + let rowid_array = rowid_col + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::Internal { + message: format!("{} column is not UInt64Array", ROW_ID), + location: location!(), + })?; + + row_ids.extend(rowid_array.values().iter().copied()); + cheap_batches.push(batch); + } + + // SELECTIVE - do Pass 2 (take expensive column) + + // Record metrics: take path taken + let global_metrics = FilteredReadGlobalMetrics::new(&metrics); + global_metrics.adaptive_used_take.add(1); + global_metrics.adaptive_pass1_rows.add(filtered_count); + + // Handle empty result case + if cheap_batches.is_empty() || row_ids.is_empty() { + // No rows matched the filter, return empty stream + let empty_batch = RecordBatch::new_empty(output_schema.clone()); + let stream = futures::stream::iter(vec![Ok(empty_batch)]); + let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); + return Result::::Ok( + Box::pin(stream_adapter) as SendableRecordBatchStream + ); + } + + let expensive_projection = dataset + .empty_projection() + .union_column(&config.expensive_column, OnMissing::Error)?; + + // Create row ID batch for TakeExec + let row_id_array = Arc::new(UInt64Array::from(row_ids.clone())); + let row_id_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + ROW_ID, + DataType::UInt64, + false, + )])); + let row_id_batch = RecordBatch::try_new(row_id_schema, vec![row_id_array])?; + let row_id_plan = Arc::new(OneShotExec::from_batch(row_id_batch)); + + let take_exec = TakeExec::try_new(dataset.clone(), row_id_plan, expensive_projection)?; + + let take_exec = take_exec.ok_or_else(|| Error::Internal { + message: "TakeExec returned None unexpectedly".to_string(), + location: location!(), + })?; + + let mut expensive_stream = take_exec + .execute(partition, context) + .map_err(Error::from)?; + + // Concatenate all cheap batches into one + use arrow_select::concat::concat_batches; + let cheap_schema = cheap_batches[0].schema(); + let cheap_batch_combined = + concat_batches(&cheap_schema, &cheap_batches).map_err(|e| Error::Arrow { + message: format!("Failed to concatenate cheap batches: {}", e), + location: location!(), + })?; + + // Collect all expensive batches + let mut expensive_batches = Vec::new(); + while let Some(batch_result) = expensive_stream.next().await { + let batch = batch_result.map_err(Error::from)?; + expensive_batches.push(batch); + } + + // Concatenate all expensive batches into one + if expensive_batches.is_empty() { + return Err(Error::Internal { + message: "TakeExec returned no batches".to_string(), + location: location!(), + }); + } + let expensive_schema = expensive_batches[0].schema(); + let expensive_batch_combined = concat_batches(&expensive_schema, &expensive_batches) + .map_err(|e| Error::Arrow { + message: format!("Failed to concatenate expensive batches: {}", e), + location: location!(), + })?; + + // Merge the two combined batches - build columns in the order of output_schema + let expensive_col_idx = expensive_batch_combined + .schema() + .index_of(&config.expensive_column) + .map_err(|e| Error::Internal { + message: format!( + "Expected {} column in expensive batch: {}", + config.expensive_column, e + ), + location: location!(), + })?; + + let mut columns = Vec::new(); + for field in output_schema.fields() { + let col_name = field.name(); + if col_name == &config.expensive_column { + // Take from expensive batch + columns.push(expensive_batch_combined.column(expensive_col_idx).clone()); + } else { + // Take from cheap batch + let cheap_col_idx = + cheap_batch_combined + .schema() + .index_of(col_name) + .map_err(|e| Error::Internal { + message: format!( + "Expected {} column in cheap batch: {}", + col_name, e + ), + location: location!(), + })?; + columns.push(cheap_batch_combined.column(cheap_col_idx).clone()); + } + } + + let combined_batch: RecordBatch = RecordBatch::try_new(output_schema.clone(), columns) + .map_err(|e: arrow_schema::ArrowError| Error::from(e))?; + + // Return stream with single combined batch + let stream = futures::stream::iter(vec![Ok(combined_batch)]); + let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); + Result::::Ok( + Box::pin(stream_adapter) as SendableRecordBatchStream + ) + }) + .try_flatten(); + + Box::pin(RecordBatchStreamAdapter::new( + output_schema_for_adapter, + stream, + )) + } + pub fn dataset(&self) -> &Arc { &self.dataset } @@ -1562,11 +1877,21 @@ impl DisplayAs for FilteredReadExec { .map(|f| f.name.as_str()) .collect::>() .join(", "); + + let adaptive_info = if let Some(ref config) = self.options.adaptive_expensive_column { + format!( + ", adaptive_column={}, threshold={}", + config.expensive_column, config.threshold + ) + } else { + String::new() + }; + match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, - "LanceRead: uri={}, projection=[{}], num_fragments={}, range_before={:?}, range_after={:?}, row_id={}, row_addr={}, full_filter={}, refine_filter={}", + "LanceRead: uri={}, projection=[{}], num_fragments={}, range_before={:?}, range_after={:?}, row_id={}, row_addr={}, full_filter={}, refine_filter={}{}", self.dataset.data_dir(), columns, self.options.fragments.as_ref().map(|f| f.len()).unwrap_or(self.dataset.fragments().len()), @@ -1576,10 +1901,11 @@ impl DisplayAs for FilteredReadExec { self.options.projection.with_row_addr, self.options.full_filter.as_ref().map(|i| i.to_string()).unwrap_or("--".to_string()), self.options.refine_filter.as_ref().map(|i| i.to_string()).unwrap_or("--".to_string()), + adaptive_info, ) } DisplayFormatType::TreeRender => { - write!(f, "LanceRead\nuri={}\nprojection=[{}]\nnum_fragments={}\nrange_before={:?}\nrange_after={:?}\nrow_id={}\nrow_addr={}\nfull_filter={}\nrefine_filter={}", + write!(f, "LanceRead\nuri={}\nprojection=[{}]\nnum_fragments={}\nrange_before={:?}\nrange_after={:?}\nrow_id={}\nrow_addr={}\nfull_filter={}\nrefine_filter={}{}", self.dataset.data_dir(), columns, self.options.fragments.as_ref().map(|f| f.len()).unwrap_or(self.dataset.fragments().len()), @@ -1589,6 +1915,7 @@ impl DisplayAs for FilteredReadExec { self.options.projection.with_row_addr, self.options.full_filter.as_ref().map(|i| i.to_string()).unwrap_or("true".to_string()), self.options.refine_filter.as_ref().map(|i| i.to_string()).unwrap_or("true".to_string()), + adaptive_info, ) } } @@ -3359,4 +3686,97 @@ mod tests { .unwrap_or(0); assert!(iops > 0, "Should have recorded IO operations"); } + + #[tokio::test] + async fn test_adaptive_metrics() { + // Test that adaptive late materialization metrics are recorded correctly + use super::AdaptiveColumnConfig; + + let fixture = TestFixture::new().await; + + // Test SELECTIVE case - should use take path + // Filter for fully_indexed < 10 matches only 10 rows out of 300 + let mut options = FilteredReadOptions::basic_full_read(&fixture.dataset); + options.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: "vector".to_string(), + threshold: 0.99, // Very high threshold, so it will use take + total_row_count: 300, + }); + + // Add a filter that matches only a few rows (10 out of 300 = 3.3%) + options.full_filter = + Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(10u32))); + + let filtered_read = + Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); + + let _batches = filtered_read + .execute(0, Arc::new(TaskContext::default())) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let metrics = filtered_read.metrics().unwrap(); + let used_take = metrics + .sum_by_name("adaptive_used_take") + .map(|v| v.as_usize()) + .unwrap_or(0); + let used_full_scan = metrics + .sum_by_name("adaptive_used_full_scan") + .map(|v| v.as_usize()) + .unwrap_or(0); + let pass1_rows = metrics + .sum_by_name("adaptive_pass1_rows") + .map(|v| v.as_usize()) + .unwrap_or(0); + + assert_eq!( + used_take, 1, + "Should have used take path for selective filter" + ); + assert_eq!(used_full_scan, 0, "Should not have used full scan path"); + assert!( + pass1_rows > 0 && pass1_rows <= 10, + "Should have scanned ~10 rows in pass 1" + ); + + // Test NON-SELECTIVE case - should use full scan path + // Filter for fully_indexed < 290 matches 290 rows out of 300 + let mut options2 = FilteredReadOptions::basic_full_read(&fixture.dataset); + options2.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: "vector".to_string(), + threshold: 0.01, // Very low threshold (1%), so it will use full scan + total_row_count: 300, + }); + + // Add a filter that matches many rows (290 out of 300 = 96.7%) + options2.full_filter = + Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(290u32))); + + let filtered_read2 = + Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options2, None).unwrap()); + + let _batches2 = filtered_read2 + .execute(0, Arc::new(TaskContext::default())) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let metrics2 = filtered_read2.metrics().unwrap(); + let used_take2 = metrics2 + .sum_by_name("adaptive_used_take") + .map(|v| v.as_usize()) + .unwrap_or(0); + let used_full_scan2 = metrics2 + .sum_by_name("adaptive_used_full_scan") + .map(|v| v.as_usize()) + .unwrap_or(0); + assert_eq!( + used_take2, 0, + "Should not have used take path for non-selective filter" + ); + assert_eq!(used_full_scan2, 1, "Should have used full scan path"); + } }