Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,262 changes: 1,658 additions & 604 deletions Cargo.lock

Large diffs are not rendered by default.

35 changes: 18 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,33 @@ rust-version = "1.91.0"
crate-type = ["cdylib", "staticlib", "rlib"]

[dependencies]
lance = { version = "4.0.1", features = ["substrait"] }
lance-core = "4.0.1"
lance-file = "4.0.1"
lance-index = "4.0.1"
lance-io = "4.0.1"
lance-linalg = "4.0.1"
arrow = { version = "57.0.0", features = ["prettyprint", "ffi"] }
arrow-array = "57.0.0"
arrow-schema = "57.0.0"
lance = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6", features = ["substrait"] }
lance-core = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-file = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-index = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-io = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-linalg = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
arrow = { version = "58.0.0", features = ["prettyprint", "ffi"] }
arrow-array = "58.0.0"
arrow-schema = "58.0.0"
half = "2"
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
futures = "0.3"
log = "0.4"
pin-project = "1.0"
snafu = "0.9"
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
lance = { version = "4.0.1", features = ["substrait"] }
lance-datafusion = { version = "4.0.1", features = ["substrait"] }
lance-datagen = "4.0.1"
lance-file = "4.0.1"
lance-table = "4.0.1"
datafusion = { version = "52.1.0", default-features = false }
lance = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6", features = ["substrait"] }
lance-datafusion = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6", features = ["substrait"] }
lance-datagen = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-file = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
lance-table = { git = "https://github.com/lance-format/lance.git", rev = "e0e977a6" }
datafusion = { version = "53.1.0", default-features = false }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
arrow-array = "57.0.0"
arrow-schema = "57.0.0"
arrow-array = "58.0.0"
arrow-schema = "58.0.0"
tempfile = "3"

[profile.release]
Expand Down
55 changes: 55 additions & 0 deletions include/lance/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,41 @@ uint64_t lance_dataset_index_count(const LanceDataset* dataset);
*/
const char* lance_dataset_index_list_json(const LanceDataset* dataset);

/* ─── Distributed vector search: index segment enumeration ─── */

/**
* Count the segments that make up a logical vector index.
*
* A logical index is a set of physical segments (one per distributed-build
* worker, or one per fragment range). Each segment has a stable UUID. Returns
* 0 if the index does not exist (also sets `LANCE_ERR_NOT_FOUND`) or on error.
*/
uint64_t lance_dataset_index_segment_count(
const LanceDataset* dataset,
const char* index_name
);

/**
* Fill `out_uuids` with the UUIDs of the segments that make up a logical index.
* Each UUID is written as 16 raw bytes (RFC 4122 layout).
*
* @param out_uuids Caller-allocated buffer for the UUIDs (byte length >= capacity * 16).
* @param capacity Number of UUIDs the buffer can hold.
* @param out_count Optional (may be NULL). On success, receives the number of
* UUIDs actually written.
*
* Returns 0 on success, -1 on error. If the index has more segments than
* `capacity`, returns LANCE_ERR_INVALID_ARGUMENT without writing anything;
* the caller can retry with a larger buffer.
*/
int32_t lance_dataset_index_segments(
const LanceDataset* dataset,
const char* index_name,
uint8_t* out_uuids,
size_t capacity,
uint64_t* out_count
);

/* ─── Vector search (Phase 2) ─── */

/**
Expand Down Expand Up @@ -736,6 +771,26 @@ int32_t lance_scanner_set_metric(LanceScanner* scanner, LanceMetricType metric);
int32_t lance_scanner_set_use_index(LanceScanner* scanner, bool enable);
int32_t lance_scanner_set_prefilter(LanceScanner* scanner, bool enable);

/**
* Restrict the next k-NN query to a specific subset of vector index segments.
*
* Used by distributed query engines (e.g. Velox) to fan a single k-NN query
* out across workers, each handling a slice of segments. The coordinator gets
* the segment list via `lance_dataset_index_segments()`.
*
* @param segment_uuids Pointer to `len` 16-byte UUIDs concatenated end-to-end
* (total byte length = `len * 16`). Each UUID identifies
* one physical segment of a logical index.
* @param len Number of UUIDs. Pass 0 (and segment_uuids may be NULL)
* to clear any previously-set segment restriction.
* @return 0 on success, -1 on error.
*/
int32_t lance_scanner_set_index_segments(
LanceScanner* scanner,
const uint8_t* segment_uuids,
size_t len
);

/* ─── Full-text search (Phase 2) ─── */

/**
Expand Down
41 changes: 41 additions & 0 deletions include/lance/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "lance/lance.h"

#include <array>
#include <cstdint>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -533,6 +534,31 @@ class Dataset {
return out;
}

/// Number of segments that make up a logical vector index.
/// Throws lance::Error with code NotFound if the index does not exist.
uint64_t index_segment_count(const std::string& index_name) const {
uint64_t n = lance_dataset_index_segment_count(handle_.get(), index_name.c_str());
if (n == 0 && lance_last_error_code() != LANCE_OK) check_error();
return n;
}

/// UUIDs of the physical segments that make up a logical vector index.
/// Each UUID is a 16-byte array (RFC 4122 layout). Used by distributed
/// query engines to fan k-NN out across workers — see
/// `Scanner::index_segments`.
std::vector<std::array<uint8_t, 16>> index_segments(const std::string& index_name) const {
uint64_t count = index_segment_count(index_name);
std::vector<std::array<uint8_t, 16>> out(count);
if (count == 0) return out;
uint64_t written = 0;
if (lance_dataset_index_segments(handle_.get(), index_name.c_str(),
reinterpret_cast<uint8_t*>(out.data()),
static_cast<size_t>(count), &written) != 0)
check_error();
out.resize(static_cast<size_t>(written));
return out;
}

/// Access the underlying C handle (does not transfer ownership).
const LanceDataset* c_handle() const { return handle_.get(); }

Expand Down Expand Up @@ -601,6 +627,21 @@ class Scanner {
return substrait_filter(bytes.data(), bytes.size());
}

/// Restrict the next k-NN query to a subset of vector index segments.
/// Pass `len` 16-byte UUIDs concatenated as a single byte buffer
/// (total bytes = `len * 16`). Pass len=0 (and any pointer) to clear.
Scanner& index_segments(const uint8_t* uuids, size_t len) {
if (lance_scanner_set_index_segments(handle_.get(), uuids, len) != 0)
check_error();
return *this;
}

/// Restrict the next k-NN query to a subset of vector index segments
/// (typed vector overload).
Scanner& index_segments(const std::vector<std::array<uint8_t, 16>>& uuids) {
return index_segments(reinterpret_cast<const uint8_t*>(uuids.data()), uuids.size());
}

/// Materialize the scan as an ArrowArrayStream (blocking).
void to_arrow_stream(ArrowArrayStream* out) {
if (lance_scanner_to_arrow_stream(handle_.get(), out) != 0)
Expand Down
143 changes: 142 additions & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

use std::ffi::{CString, c_char};

use lance::index::DatasetIndexExt;
use lance_core::Result;
use lance_index::IndexType;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType};

use crate::dataset::LanceDataset;
use crate::error::{LanceErrorCode, ffi_try, set_last_error};
Expand Down Expand Up @@ -153,6 +154,146 @@ pub unsafe extern "C" fn lance_dataset_index_count(dataset: *const LanceDataset)
}
}

/// Count the segments that make up a logical index.
///
/// A logical index is a set of physical segments (one per distributed-build worker
/// or one per fragment range). Each segment has a stable UUID. Returns 0 if the
/// index does not exist (also sets `LANCE_ERR_NOT_FOUND`).
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_index_segment_count(
dataset: *const LanceDataset,
index_name: *const c_char,
) -> u64 {
if dataset.is_null() || index_name.is_null() {
set_last_error(
LanceErrorCode::InvalidArgument,
"dataset and index_name must not be NULL",
);
return 0;
}
let ds = unsafe { &*dataset };
let name = match unsafe { helpers::parse_c_string(index_name) } {
Ok(Some(s)) => s,
Ok(None) => {
set_last_error(
LanceErrorCode::InvalidArgument,
"index_name must not be empty",
);
return 0;
}
Err(err) => {
crate::error::set_lance_error(&err);
return 0;
}
};
let snap = ds.snapshot();
match block_on(snap.load_indices()) {
Ok(indices) => {
let count = indices
.iter()
.filter(|i| !lance_index::is_system_index(i) && i.name == name)
.count();
if count == 0 {
set_last_error(
LanceErrorCode::NotFound,
format!("index '{}' not found", name),
);
return 0;
}
crate::error::clear_last_error();
count as u64
}
Err(err) => {
crate::error::set_lance_error(&err);
0
}
}
}

/// Fill `out_uuids` with the UUIDs of the segments that make up a logical index.
///
/// Each UUID is written as 16 raw bytes (RFC 4122 layout).
///
/// - `capacity`: number of UUIDs the caller allocated space for in `out_uuids`
/// (byte length must be at least `capacity * 16`).
/// - `out_count`: if non-NULL, receives the number of UUIDs actually written.
///
/// Returns 0 on success, -1 on error. If the index has more segments than
/// `capacity`, returns `LANCE_ERR_INVALID_ARGUMENT` without writing anything.
/// Callers can retry with a larger buffer.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_index_segments(
dataset: *const LanceDataset,
index_name: *const c_char,
out_uuids: *mut u8,
capacity: usize,
out_count: *mut u64,
) -> i32 {
ffi_try!(
unsafe {
dataset_index_segments_inner(dataset, index_name, out_uuids, capacity, out_count)
},
neg
)
}

unsafe fn dataset_index_segments_inner(
dataset: *const LanceDataset,
index_name: *const c_char,
out_uuids: *mut u8,
capacity: usize,
out_count: *mut u64,
) -> Result<i32> {
if dataset.is_null() || index_name.is_null() || out_uuids.is_null() {
return Err(lance_core::Error::InvalidInput {
source: "dataset, index_name, and out_uuids must not be NULL".into(),
location: snafu::location!(),
});
}
let ds = unsafe { &*dataset };
let name = unsafe { helpers::parse_c_string(index_name)? }.ok_or_else(|| {
lance_core::Error::InvalidInput {
source: "index_name must not be empty".into(),
location: snafu::location!(),
}
})?;
let snap = ds.snapshot();
let indices = block_on(snap.load_indices())?;
let segments: Vec<_> = indices
.iter()
.filter(|i| !lance_index::is_system_index(i) && i.name == name)
.collect();
if segments.is_empty() {
return Err(lance_core::Error::IndexNotFound {
identity: format!("name='{}'", name),
location: snafu::location!(),
});
}
if segments.len() > capacity {
return Err(lance_core::Error::InvalidInput {
source: format!(
"out_uuids capacity ({}) too small for {} segments",
capacity,
segments.len()
)
.into(),
location: snafu::location!(),
});
}
// SAFETY: caller guarantees out_uuids has at least `capacity * 16` bytes,
// and we verified `segments.len() <= capacity` above.
for (i, seg) in segments.iter().enumerate() {
let bytes = seg.uuid.as_bytes();
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_uuids.add(i * 16), 16);
}
}
if !out_count.is_null() {
unsafe { *out_count = segments.len() as u64 };
}
Ok(0)
}

/// Drop an index by name.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_drop_index(
Expand Down
Loading
Loading