Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ def scanner(
strict_batch_size: Optional[bool] = None,
order_by: Optional[List[Union[ColumnOrdering, str]]] = None,
disable_scoring_autoprojection: Optional[bool] = None,
late_materialize_selectivity_threshold: Optional[float] = None,
) -> LanceScanner:
"""Return a Scanner that can support various pushdowns.

Expand Down Expand Up @@ -870,6 +871,7 @@ def setopt(opt, val):
setopt(builder.strict_batch_size, strict_batch_size)
setopt(builder.order_by, order_by)
setopt(builder.disable_scoring_autoprojection, disable_scoring_autoprojection)
setopt(builder.late_materialize_selectivity_threshold, late_materialize_selectivity_threshold)
# columns=None has a special meaning. we can't treat it as "user didn't specify"
if self._default_scan_options is None:
# No defaults, use user-provided, if any
Expand Down Expand Up @@ -4207,6 +4209,7 @@ def __init__(self, ds: LanceDataset):
self._strict_batch_size = False
self._orderings = None
self._disable_scoring_autoprojection = False
self._late_materialize_selectivity_threshold: Optional[float] = None

def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder:
for key, value in default_opts.items():
Expand Down Expand Up @@ -4588,6 +4591,47 @@ def disable_scoring_autoprojection(self, disable: bool = True) -> ScannerBuilder
self._disable_scoring_autoprojection = disable
return self

def late_materialize_selectivity_threshold(
self, threshold: float
) -> ScannerBuilder:
"""
Set the selectivity threshold for late materialization in filtered KNN searches.

When a filter is present in a KNN search, Lance first executes it to measure selectivity.
If the filter selects fewer than this percentage of rows, Lance uses late materialization
(scan scalars first, then take vectors for filtered rows only). If the filter selects
this percentage or more rows, Lance does a single scan with both filter and vector columns
to avoid the random access overhead of the take operation.

The optimal value depends on your storage medium:
- **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since
random access is very expensive
- **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper
- **NVMe**: Can use even higher thresholds like 0.1 (10%)

The default is 0.005 (0.5%), which is conservative for object storage.

Parameters
----------
threshold : float
The selectivity threshold as a fraction (e.g., 0.005 for 0.5%)

Returns
-------
ScannerBuilder
Returns self for method chaining
"""
if not isinstance(threshold, (int, float)):
raise TypeError(
f"late_materialize_selectivity_threshold must be a number, got {type(threshold)}"
)
if not (0.0 <= threshold <= 1.0):
raise ValueError(
f"late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {threshold}"
)
self._late_materialize_selectivity_threshold = float(threshold)
return self
Comment on lines +4594 to +4633
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a property of the ObjectStore? We already have fields like io_parallelism, max_iop_size, and block_size which describe how an object store should be interacted with. I think this could be added to the list. Then the defaults will work for most users. Users with a custom storage scenario will need to create a custom object store.

This way, we hide the parameter from most users, and get a better default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems like a great idea, thanks.


def to_scanner(self) -> LanceScanner:
scanner = self.ds._ds.scanner(
self._columns,
Expand Down Expand Up @@ -4616,6 +4660,7 @@ def to_scanner(self) -> LanceScanner:
self._strict_batch_size,
self._orderings,
self._disable_scoring_autoprojection,
self._late_materialize_selectivity_threshold,
)
return LanceScanner(scanner, self.ds)

Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def make_fts_search(ds):
assert "ScalarIndexQuery" in plan
assert "MaterializeIndex" not in plan
assert "KNNVectorDistance" in plan
assert "LanceScan" in plan
assert "LanceRead" in plan
assert make_vec_search(ds).to_table().num_rows == 12

plan = make_fts_search(ds).explain_plan()
Expand Down
9 changes: 8 additions & 1 deletion python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ impl Dataset {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None))]
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, late_materialize_selectivity_threshold=None))]
fn scanner(
self_: PyRef<'_, Self>,
columns: Option<Vec<String>>,
Expand Down Expand Up @@ -771,6 +771,7 @@ impl Dataset {
strict_batch_size: Option<bool>,
order_by: Option<Vec<PyLance<ColumnOrdering>>>,
disable_scoring_autoprojection: Option<bool>,
late_materialize_selectivity_threshold: Option<f64>,
) -> PyResult<Scanner> {
let mut scanner: LanceScanner = self_.ds.scan();

Expand Down Expand Up @@ -958,6 +959,12 @@ impl Dataset {
scanner.strict_batch_size(strict_batch_size);
}

if let Some(threshold) = late_materialize_selectivity_threshold {
scanner
.late_materialize_selectivity_threshold(threshold)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}

if let Some(nearest) = nearest {
let column = nearest
.get_item("column")?
Expand Down
15 changes: 15 additions & 0 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,21 @@ pub struct ExecutionSummaryCounts {
pub all_counts: HashMap<String, usize>,
}

impl ExecutionSummaryCounts {
/// Create a new ExecutionSummaryCounts with all values initialized to zero
pub fn new() -> Self {
Self::default()
}

/// Create a new ExecutionSummaryCounts with only custom counts
pub fn with_counts(counts: impl IntoIterator<Item = (impl Into<String>, usize)>) -> Self {
Self {
all_counts: counts.into_iter().map(|(k, v)| (k.into(), v)).collect(),
..Default::default()
}
}
}

fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
if let Some(metrics) = node.metrics() {
for (metric_name, count) in metrics.iter_counts() {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-tools/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl LanceToolFileMetadata {
.open_file(&path, &CachedFileSize::unknown())
.await?;
let file_metadata = FileReader::read_all_metadata(&file_scheduler).await?;
let lance_tool_file_metadata = LanceToolFileMetadata { file_metadata };
let lance_tool_file_metadata = Self { file_metadata };
Ok(lance_tool_file_metadata)
}
}
Expand Down
8 changes: 1 addition & 7 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8873,12 +8873,7 @@ mod tests {
}

fn make_tx(read_version: u64) -> Transaction {
Transaction::new(
read_version,
Operation::Append { fragments: vec![] },
None,
None,
)
Transaction::new(read_version, Operation::Append { fragments: vec![] }, None)
}

async fn delete_external_tx_file(ds: &Dataset) {
Expand Down Expand Up @@ -8939,7 +8934,6 @@ mod tests {
ds.load_indices().await.unwrap().as_ref().clone(),
&tx_file,
&ManifestWriteConfig::default(),
None,
)
.unwrap();
let location = write_manifest_file(
Expand Down
Loading
Loading