diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 27254d70fe..5bce3470b8 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -49,22 +49,9 @@ jobs: matrix: java-version: [8, 11, 17] name: Build and Test with Java ${{ matrix.java-version }} - services: - localstack: - image: localstack/localstack:4.0 - ports: - - 4566:4566 - env: - SERVICES: s3,dynamodb,kms - AWS_ACCESS_KEY_ID: ACCESS_KEY - AWS_SECRET_ACCESS_KEY: SECRET_KEY - options: >- - --health-cmd "curl -s http://localhost:4566/_localstack/health" - --health-interval 5s - --health-timeout 3s - --health-retries 3 - --health-start-period 10s steps: + - name: Checkout repository + uses: actions/checkout@v4 - name: Install dependencies run: | sudo apt update @@ -76,8 +63,6 @@ jobs: - uses: rui314/setup-mold@v1 - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - - name: Checkout repository - uses: actions/checkout@v4 - uses: Swatinem/rust-cache@v2 with: workspaces: java/lance-jni -> ../target/rust-maven-plugin/lance-jni @@ -91,6 +76,9 @@ jobs: working-directory: java run: | mvn spotless:check + - name: Start localstack + run: | + docker compose -f docker-compose.yml up -d --wait - name: Running tests with Java ${{ matrix.java-version }} working-directory: java env: diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 7aeb0ba72f..c1896820f1 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -59,6 +59,11 @@ pub struct BlockingDataset { } impl BlockingDataset { + /// Get the storage options provider that was used when opening this dataset + pub fn get_storage_options_provider(&self) -> Option> { + self.inner.storage_options_provider() + } + pub fn drop(uri: &str, storage_options: HashMap) -> Result<()> { RT.block_on(async move { let registry = Arc::new(ObjectStoreRegistry::default()); @@ -122,7 +127,7 @@ impl BlockingDataset { builder = builder.with_version(ver as u64); } builder = builder.with_storage_options(storage_options); - if let Some(provider) = storage_options_provider { + if let Some(provider) = storage_options_provider.clone() { builder = builder.with_storage_options_provider(provider) } if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds { @@ -284,14 +289,11 @@ impl BlockingDataset { pub fn commit_transaction( &mut self, transaction: Transaction, - write_params: HashMap, + store_params: ObjectStoreParams, ) -> Result { let new_dataset = RT.block_on( CommitBuilder::new(Arc::new(self.clone().inner)) - .with_store_params(ObjectStoreParams { - storage_options: Some(write_params), - ..Default::default() - }) + .with_store_params(store_params) .execute(transaction), )?; Ok(BlockingDataset { inner: new_dataset }) @@ -334,6 +336,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local enable_stable_row_ids: JObject, // Optional data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -347,7 +350,8 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local mode, enable_stable_row_ids, data_storage_version, - storage_options_obj + storage_options_obj, + s3_credentials_refresh_offset_seconds_obj ) ) } @@ -364,6 +368,7 @@ fn inner_create_with_ffi_schema<'local>( enable_stable_row_ids: JObject, // Optional data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> Result> { let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; @@ -380,6 +385,8 @@ fn inner_create_with_ffi_schema<'local>( enable_stable_row_ids, data_storage_version, storage_options_obj, + JObject::null(), // No provider for schema-only creation + s3_credentials_refresh_offset_seconds_obj, reader, ) } @@ -411,6 +418,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local enable_stable_row_ids: JObject, // Optional data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -424,7 +432,44 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local mode, enable_stable_row_ids, data_storage_version, - storage_options_obj + storage_options_obj, + JObject::null(), + s3_credentials_refresh_offset_seconds_obj + ) + ) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStreamAndProvider<'local>( + mut env: JNIEnv<'local>, + _obj: JObject, + arrow_array_stream_addr: jlong, + path: JString, + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional +) -> JObject<'local> { + ok_or_throw!( + env, + inner_create_with_ffi_stream( + &mut env, + arrow_array_stream_addr, + path, + max_rows_per_file, + max_rows_per_group, + max_bytes_per_file, + mode, + enable_stable_row_ids, + data_storage_version, + storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj ) ) } @@ -434,13 +479,15 @@ fn inner_create_with_ffi_stream<'local>( env: &mut JNIEnv<'local>, arrow_array_stream_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -454,6 +501,8 @@ fn inner_create_with_ffi_stream<'local>( enable_stable_row_ids, data_storage_version, storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj, reader, ) } @@ -469,6 +518,8 @@ fn create_dataset<'local>( enable_stable_row_ids: JObject, data_storage_version: JObject, storage_options_obj: JObject, + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, reader: impl RecordBatchReader + Send + 'static, ) -> Result> { let path_str = path.extract(env)?; @@ -482,6 +533,8 @@ fn create_dataset<'local>( &enable_stable_row_ids, &data_storage_version, &storage_options_obj, + &storage_options_provider_obj, + &s3_credentials_refresh_offset_seconds_obj, )?; let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?; diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 98be13a3e4..9cee53a110 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -20,12 +20,13 @@ use lance::dataset::fragment::FileFragment; use lance_datafusion::utils::StreamingWriteSource; use crate::error::{Error, Result}; +use crate::ffi::JNIEnvExt; use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, IntoJava, JLance}; use crate::{ blocking_dataset::{BlockingDataset, NATIVE_DATASET}, traits::FromJString, utils::extract_write_params, - JNIEnvExt, RT, + RT, }; #[derive(Debug, Clone)] @@ -82,13 +83,15 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local dataset_uri: JString, arrow_array_addr: jlong, arrow_schema_addr: jlong, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw_with_return!( env, @@ -103,7 +106,9 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray<'local mode, enable_stable_row_ids, data_storage_version, - storage_options_obj + storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj ), JObject::default() ) @@ -115,13 +120,15 @@ fn inner_create_with_ffi_array<'local>( dataset_uri: JString, arrow_array_addr: jlong, arrow_schema_addr: jlong, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> Result> { let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray; let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; @@ -146,6 +153,8 @@ fn inner_create_with_ffi_array<'local>( enable_stable_row_ids, data_storage_version, storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj, reader, ) } @@ -156,13 +165,15 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>( _obj: JObject, dataset_uri: JString, arrow_array_stream_addr: jlong, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> JObject<'a> { ok_or_throw_with_return!( env, @@ -176,7 +187,9 @@ pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream<'a>( mode, enable_stable_row_ids, data_storage_version, - storage_options_obj + storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj ), JObject::null() ) @@ -187,13 +200,15 @@ fn inner_create_with_ffi_stream<'local>( env: &mut JNIEnv<'local>, dataset_uri: JString, arrow_array_stream_addr: jlong, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -208,6 +223,8 @@ fn inner_create_with_ffi_stream<'local>( enable_stable_row_ids, data_storage_version, storage_options_obj, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj, reader, ) } @@ -216,13 +233,15 @@ fn inner_create_with_ffi_stream<'local>( fn create_fragment<'a>( env: &mut JNIEnv<'a>, dataset_uri: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + storage_options_obj: JObject, // Map + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional source: impl StreamingWriteSource, ) -> Result> { let path_str = dataset_uri.extract(env)?; @@ -236,7 +255,10 @@ fn create_fragment<'a>( &enable_stable_row_ids, &data_storage_version, &storage_options_obj, + &storage_options_provider_obj, + &s3_credentials_refresh_offset_seconds_obj, )?; + let fragments = RT.block_on(FileFragment::create_fragments( &path_str, source, diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 87ddc6fc7c..4f58274e31 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -17,6 +17,7 @@ use lance::dataset::transaction::{ DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, UpdateMap, UpdateMapEntry, UpdateMode, }; +use lance::io::ObjectStoreParams; use lance::table::format::{Fragment, IndexMetadata}; use lance_core::datatypes::Schema as LanceSchema; use prost::Message; @@ -678,12 +679,35 @@ fn inner_commit_transaction<'local>( .call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])? .l()?; let write_param_jmap = JMap::from_env(env, &write_param_jobj)?; - let write_param = to_rust_map(env, &write_param_jmap)?; + let mut write_param = to_rust_map(env, &write_param_jmap)?; + + // Extract s3_credentials_refresh_offset_seconds from write_param + let s3_credentials_refresh_offset = write_param + .remove("s3_credentials_refresh_offset_seconds") + .and_then(|v| v.parse::().ok()) + .map(std::time::Duration::from_secs) + .unwrap_or_else(|| std::time::Duration::from_secs(10)); + + // Get the Dataset's storage_options_provider + let storage_options_provider = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; + dataset_guard.get_storage_options_provider() + }; + + // Build ObjectStoreParams using write_param for storage_options and provider from Dataset + let store_params = ObjectStoreParams { + storage_options: Some(write_param), + storage_options_provider, + s3_credentials_refresh_offset, + ..Default::default() + }; + let transaction = convert_to_rust_transaction(env, java_transaction, Some(&java_dataset))?; let new_blocking_ds = { let mut dataset_guard = - unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - dataset_guard.commit_transaction(transaction, write_param)? + unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; + dataset_guard.commit_transaction(transaction, store_params)? }; new_blocking_ds.into_java(env) } diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index 495ab229f4..5a1a8b9508 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -21,8 +21,10 @@ use lance_linalg::distance::DistanceType; use crate::error::{Error, Result}; use crate::ffi::JNIEnvExt; +use crate::storage_options::JavaStorageOptionsProvider; use lance_index::vector::Query; +use lance_io::object_store::StorageOptionsProvider; use std::collections::HashMap; use std::str::FromStr; @@ -45,6 +47,8 @@ pub fn extract_write_params( enable_stable_row_ids: &JObject, data_storage_version: &JObject, storage_options_obj: &JObject, + storage_options_provider_obj: &JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: &JObject, // Optional ) -> Result { let mut write_params = WriteParams::default(); @@ -71,8 +75,27 @@ pub fn extract_write_params( let storage_options: HashMap = extract_storage_options(env, storage_options_obj)?; + // Extract storage options provider if present + let storage_options_provider = env.get_optional(storage_options_provider_obj, |env, obj| { + let provider_obj = env + .call_method(obj, "get", "()Ljava/lang/Object;", &[])? + .l()?; + JavaStorageOptionsProvider::new(env, provider_obj) + })?; + + let storage_options_provider_arc: Option> = + storage_options_provider.map(|v| Arc::new(v) as Arc); + + // Extract s3_credentials_refresh_offset_seconds if present + let s3_credentials_refresh_offset = env + .get_long_opt(s3_credentials_refresh_offset_seconds_obj)? + .map(|v| std::time::Duration::from_secs(v as u64)) + .unwrap_or_else(|| std::time::Duration::from_secs(10)); + write_params.store_params = Some(ObjectStoreParams { storage_options: Some(storage_options), + storage_options_provider: storage_options_provider_arc, + s3_credentials_refresh_offset, ..Default::default() }); Ok(write_params) diff --git a/java/src/main/java/com/lancedb/lance/Dataset.java b/java/src/main/java/com/lancedb/lance/Dataset.java index 92badee32c..9a3aa5f1fd 100644 --- a/java/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/src/main/java/com/lancedb/lance/Dataset.java @@ -75,6 +75,39 @@ public class Dataset implements Closeable { private Dataset() {} + /** + * Creates a builder for writing a dataset. + * + *

This builder supports writing datasets either directly to a URI or through a LanceNamespace. + * Data can be provided via reader() or stream() methods. + * + *

Example usage with URI and reader: + * + *

{@code
+   * Dataset dataset = Dataset.write()
+   *     .reader(myReader)
+   *     .uri("s3://bucket/table.lance")
+   *     .mode(WriteMode.CREATE)
+   *     .execute();
+   * }
+ * + *

Example usage with namespace and empty table: + * + *

{@code
+   * Dataset dataset = Dataset.write()
+   *     .schema(mySchema)
+   *     .namespace(myNamespace)
+   *     .tableId(Arrays.asList("my_table"))
+   *     .mode(WriteMode.CREATE)
+   *     .execute();
+   * }
+ * + * @return A new WriteDatasetBuilder instance + */ + public static WriteDatasetBuilder write() { + return new WriteDatasetBuilder(); + } + /** * Creates an empty dataset. * @@ -83,7 +116,11 @@ private Dataset() {} * @param schema dataset schema * @param params write params * @return Dataset + * @deprecated Use {@link #write()} builder instead. For example: {@code + * Dataset.write().allocator(allocator).schema(schema).uri(path) + * .mode(WriteMode.CREATE).execute()} */ + @Deprecated public static Dataset create( BufferAllocator allocator, String path, Schema schema, WriteParams params) { Preconditions.checkNotNull(allocator); @@ -102,7 +139,8 @@ public static Dataset create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), - params.getStorageOptions()); + params.getStorageOptions(), + params.getS3CredentialsRefreshOffsetSeconds()); dataset.allocator = allocator; return dataset; } @@ -116,15 +154,41 @@ public static Dataset create( * @param path dataset uri * @param params write parameters * @return Dataset + * @deprecated Use {@link #write()} builder instead. For example: {@code + * Dataset.write().allocator(allocator).stream(stream).uri(path) + * .mode(WriteMode.CREATE).execute()} */ + @Deprecated public static Dataset create( BufferAllocator allocator, ArrowArrayStream stream, String path, WriteParams params) { + return create(allocator, stream, path, params, null); + } + + /** + * Create a dataset with given stream and storage options provider. + * + *

This method supports credential vending through the StorageOptionsProvider interface, which + * allows for dynamic credential refresh during long-running write operations. + * + * @param allocator buffer allocator + * @param stream arrow stream + * @param path dataset uri + * @param params write parameters + * @param storageOptionsProvider optional provider for dynamic storage options/credentials + * @return Dataset + */ + static Dataset create( + BufferAllocator allocator, + ArrowArrayStream stream, + String path, + WriteParams params, + StorageOptionsProvider storageOptionsProvider) { Preconditions.checkNotNull(allocator); Preconditions.checkNotNull(stream); Preconditions.checkNotNull(path); Preconditions.checkNotNull(params); Dataset dataset = - createWithFfiStream( + createWithFfiStreamAndProvider( stream.memoryAddress(), path, params.getMaxRowsPerFile(), @@ -133,7 +197,9 @@ public static Dataset create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), - params.getStorageOptions()); + params.getStorageOptions(), + Optional.ofNullable(storageOptionsProvider), + params.getS3CredentialsRefreshOffsetSeconds()); dataset.allocator = allocator; return dataset; } @@ -147,7 +213,8 @@ private static native Dataset createWithFfiSchema( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, - Map storageOptions); + Map storageOptions, + Optional s3CredentialsRefreshOffsetSeconds); private static native Dataset createWithFfiStream( long arrowStreamMemoryAddress, @@ -158,14 +225,30 @@ private static native Dataset createWithFfiStream( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, - Map storageOptions); + Map storageOptions, + Optional s3CredentialsRefreshOffsetSeconds); + + private static native Dataset createWithFfiStreamAndProvider( + long arrowStreamMemoryAddress, + String path, + Optional maxRowsPerFile, + Optional maxRowsPerGroup, + Optional maxBytesPerFile, + Optional mode, + Optional enableStableRowIds, + Optional dataStorageVersion, + Map storageOptions, + Optional storageOptionsProvider, + Optional s3CredentialsRefreshOffsetSeconds); /** * Open a dataset from the specified path. * * @param path file path * @return Dataset + * @deprecated Use {@link #open()} builder instead: {@code Dataset.open().uri(path).build()} */ + @Deprecated public static Dataset open(String path) { return open(new RootAllocator(Long.MAX_VALUE), true, path, new ReadOptions.Builder().build()); } @@ -176,7 +259,10 @@ public static Dataset open(String path) { * @param path file path * @param options the open options * @return Dataset + * @deprecated Use {@link #open()} builder instead: {@code + * Dataset.open().uri(path).readOptions(options).build()} */ + @Deprecated public static Dataset open(String path, ReadOptions options) { return open(new RootAllocator(Long.MAX_VALUE), true, path, options); } @@ -187,7 +273,10 @@ public static Dataset open(String path, ReadOptions options) { * @param path file path * @param allocator Arrow buffer allocator * @return Dataset + * @deprecated Use {@link #open()} builder instead: {@code + * Dataset.open().allocator(allocator).uri(path).build()} */ + @Deprecated public static Dataset open(String path, BufferAllocator allocator) { return open(allocator, path, new ReadOptions.Builder().build()); } @@ -199,7 +288,10 @@ public static Dataset open(String path, BufferAllocator allocator) { * @param path file path * @param options the open options * @return Dataset + * @deprecated Use {@link #open()} builder instead: {@code + * Dataset.open().allocator(allocator).uri(path).readOptions(options).build()} */ + @Deprecated public static Dataset open(BufferAllocator allocator, String path, ReadOptions options) { return open(allocator, false, path, options); } diff --git a/java/src/main/java/com/lancedb/lance/Fragment.java b/java/src/main/java/com/lancedb/lance/Fragment.java index 7a0bfee67c..6077c68a32 100644 --- a/java/src/main/java/com/lancedb/lance/Fragment.java +++ b/java/src/main/java/com/lancedb/lance/Fragment.java @@ -15,6 +15,7 @@ import com.lancedb.lance.fragment.FragmentMergeResult; import com.lancedb.lance.fragment.FragmentUpdateResult; +import com.lancedb.lance.io.StorageOptionsProvider; import com.lancedb.lance.ipc.LanceScanner; import com.lancedb.lance.ipc.ScanOptions; @@ -197,6 +198,27 @@ private native FragmentUpdateResult nativeUpdateColumns( String leftOn, String rightOn); + /** + * Create a new fragment writer builder. + * + *

Example usage: + * + *

{@code
+   * List fragments = Fragment.write()
+   *     .datasetUri("s3://bucket/dataset.lance")
+   *     .allocator(allocator)
+   *     .data(vectorSchemaRoot)
+   *     .storageOptions(storageOptions)
+   *     .s3CredentialsRefreshOffsetSeconds(10)
+   *     .execute();
+   * }
+ * + * @return a new fragment writer builder + */ + public static WriteFragmentBuilder write() { + return new WriteFragmentBuilder(); + } + /** * Create a fragment from the given data. * @@ -205,9 +227,36 @@ private native FragmentUpdateResult nativeUpdateColumns( * @param root the vector schema root * @param params the write params * @return the fragment metadata + * @deprecated Use {@link #write()} builder instead. For example: {@code Fragment.write() + * .datasetUri(uri).allocator(allocator).data(root).writeParams(params).execute()} */ + @Deprecated public static List create( String datasetUri, BufferAllocator allocator, VectorSchemaRoot root, WriteParams params) { + return create(datasetUri, allocator, root, params, null); + } + + /** + * Create a fragment from the given data with optional storage options provider. + * + * @param datasetUri the dataset uri + * @param allocator the buffer allocator + * @param root the vector schema root + * @param params the write params + * @param storageOptionsProvider optional provider for dynamic storage options with automatic + * credential refresh + * @return the fragment metadata + * @deprecated Use {@link #write()} builder instead. For example: {@code Fragment.write() + * .datasetUri(uri).allocator(allocator).data(root).writeParams(params) + * .storageOptionsProvider(provider).execute()} + */ + @Deprecated + public static List create( + String datasetUri, + BufferAllocator allocator, + VectorSchemaRoot root, + WriteParams params, + StorageOptionsProvider storageOptionsProvider) { Preconditions.checkNotNull(datasetUri); Preconditions.checkNotNull(allocator); Preconditions.checkNotNull(root); @@ -225,7 +274,9 @@ public static List create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), - params.getStorageOptions()); + params.getStorageOptions(), + Optional.ofNullable(storageOptionsProvider), + params.getS3CredentialsRefreshOffsetSeconds()); } } @@ -236,9 +287,34 @@ public static List create( * @param stream the arrow stream * @param params the write params * @return the fragment metadata + * @deprecated Use {@link #write()} builder instead. For example: {@code Fragment.write() + * .datasetUri(uri).data(stream).writeParams(params).execute()} */ + @Deprecated public static List create( String datasetUri, ArrowArrayStream stream, WriteParams params) { + return create(datasetUri, stream, params, null); + } + + /** + * Create a fragment from the given arrow stream with optional storage options provider. + * + * @param datasetUri the dataset uri + * @param stream the arrow stream + * @param params the write params + * @param storageOptionsProvider optional provider for dynamic storage options with automatic + * credential refresh + * @return the fragment metadata + * @deprecated Use {@link #write()} builder instead. For example: {@code + * Fragment.write().datasetUri(uri).data(stream).writeParams(params) + * .storageOptionsProvider(provider).execute()} + */ + @Deprecated + public static List create( + String datasetUri, + ArrowArrayStream stream, + WriteParams params, + StorageOptionsProvider storageOptionsProvider) { Preconditions.checkNotNull(datasetUri); Preconditions.checkNotNull(stream); Preconditions.checkNotNull(params); @@ -251,7 +327,9 @@ public static List create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), - params.getStorageOptions()); + params.getStorageOptions(), + Optional.ofNullable(storageOptionsProvider), + params.getS3CredentialsRefreshOffsetSeconds()); } /** @@ -269,7 +347,9 @@ private static native List createWithFfiArray( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, - Map storageOptions); + Map storageOptions, + Optional storageOptionsProvider, + Optional s3CredentialsRefreshOffsetSeconds); /** * Create a fragment from the given arrow stream. @@ -285,5 +365,7 @@ private static native List createWithFfiStream( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, - Map storageOptions); + Map storageOptions, + Optional storageOptionsProvider, + Optional s3CredentialsRefreshOffsetSeconds); } diff --git a/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java index 1114bdcf87..268f274a2f 100644 --- a/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java +++ b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java @@ -129,10 +129,10 @@ public OpenDatasetBuilder readOptions(ReadOptions options) { } /** - * Sets whether to ignore storage options from the namespace's describe_table(). + * Sets whether to ignore storage options from the namespace's describeTable(). * * @param ignoreNamespaceTableStorageOptions If true, storage options returned from - * describe_table() will be ignored (treated as null) + * describeTable() will be ignored (treated as null) * @return this builder instance */ public OpenDatasetBuilder ignoreNamespaceTableStorageOptions( @@ -145,7 +145,7 @@ public OpenDatasetBuilder ignoreNamespaceTableStorageOptions( * Opens the dataset with the configured parameters. * *

If a namespace is configured, this automatically fetches the table location and storage - * options from the namespace via describe_table(). + * options from the namespace via describeTable(). * * @return Dataset * @throws IllegalArgumentException if required parameters are missing or invalid diff --git a/java/src/main/java/com/lancedb/lance/Transaction.java b/java/src/main/java/com/lancedb/lance/Transaction.java index 057ca5466c..f62b7018a8 100644 --- a/java/src/main/java/com/lancedb/lance/Transaction.java +++ b/java/src/main/java/com/lancedb/lance/Transaction.java @@ -118,6 +118,7 @@ public static class Builder { private Operation operation; private Map writeParams; private Map transactionProperties; + private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); public Builder(Dataset dataset) { this.dataset = dataset; @@ -139,6 +140,21 @@ public Builder writeParams(Map writeParams) { return this; } + /** + * Sets the S3 credentials refresh offset in seconds. + * + *

This parameter controls how long before credential expiration to refresh them. For + * example, if credentials expire at T+60s and this is set to 10, credentials will be refreshed + * at T+50s. + * + * @param s3CredentialsRefreshOffsetSeconds Refresh offset in seconds + * @return this builder instance + */ + public Builder s3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOffsetSeconds) { + this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds); + return this; + } + public Builder operation(Operation operation) { validateState(); this.operation = operation; @@ -154,8 +170,16 @@ private void validateState() { public Transaction build() { Preconditions.checkState(operation != null, "TransactionBuilder has no operations"); + + // Merge s3_credentials_refresh_offset_seconds into writeParams if present + Map finalWriteParams = + writeParams != null ? new HashMap<>(writeParams) : new HashMap<>(); + s3CredentialsRefreshOffsetSeconds.ifPresent( + value -> + finalWriteParams.put("s3_credentials_refresh_offset_seconds", String.valueOf(value))); + return new Transaction( - dataset, readVersion, uuid, operation, writeParams, transactionProperties); + dataset, readVersion, uuid, operation, finalWriteParams, transactionProperties); } } } diff --git a/java/src/main/java/com/lancedb/lance/WriteDatasetBuilder.java b/java/src/main/java/com/lancedb/lance/WriteDatasetBuilder.java new file mode 100644 index 0000000000..e9e7729d03 --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/WriteDatasetBuilder.java @@ -0,0 +1,452 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.io.StorageOptionsProvider; +import com.lancedb.lance.namespace.LanceNamespace; +import com.lancedb.lance.namespace.LanceNamespaceStorageOptionsProvider; +import com.lancedb.lance.namespace.model.CreateEmptyTableRequest; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Builder for writing datasets. + * + *

This builder provides a fluent API for creating or writing to datasets either directly to a + * URI or through a LanceNamespace. When using a namespace, the table location and storage options + * are automatically managed with credential vending support. + * + *

Example usage with URI and reader: + * + *

{@code
+ * Dataset dataset = Dataset.write(allocator)
+ *     .reader(myReader)
+ *     .uri("s3://bucket/table.lance")
+ *     .mode(WriteMode.CREATE)
+ *     .execute();
+ * }
+ * + *

Example usage with namespace: + * + *

{@code
+ * Dataset dataset = Dataset.write(allocator)
+ *     .reader(myReader)
+ *     .namespace(myNamespace)
+ *     .tableId(Arrays.asList("my_table"))
+ *     .mode(WriteMode.CREATE)
+ *     .execute();
+ * }
+ */ +public class WriteDatasetBuilder { + private BufferAllocator allocator; + private ArrowReader reader; + private ArrowArrayStream stream; + private String uri; + private LanceNamespace namespace; + private List tableId; + private WriteParams.WriteMode mode = WriteParams.WriteMode.CREATE; + private Schema schema; + private Map storageOptions = new HashMap<>(); + private boolean ignoreNamespaceStorageOptions = false; + private Optional maxRowsPerFile = Optional.empty(); + private Optional maxRowsPerGroup = Optional.empty(); + private Optional maxBytesPerFile = Optional.empty(); + private Optional enableStableRowIds = Optional.empty(); + private Optional dataStorageVersion = Optional.empty(); + private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); + + /** Creates a new builder instance. Package-private, use Dataset.write() instead. */ + WriteDatasetBuilder() { + // allocator is optional and can be set via allocator() method + } + + /** + * Sets the buffer allocator to use for Arrow operations. + * + *

If not provided, a default RootAllocator will be created automatically. + * + * @param allocator The buffer allocator + * @return this builder instance + */ + public WriteDatasetBuilder allocator(BufferAllocator allocator) { + Preconditions.checkNotNull(allocator, "allocator must not be null"); + this.allocator = allocator; + return this; + } + + /** + * Sets the ArrowReader containing the data to write. + * + *

Either reader() or stream() or schema() (for empty tables) must be provided. + * + * @param reader ArrowReader containing the data + * @return this builder instance + */ + public WriteDatasetBuilder reader(ArrowReader reader) { + Preconditions.checkNotNull(reader); + this.reader = reader; + return this; + } + + /** + * Sets the ArrowArrayStream containing the data to write. + * + *

Either reader() or stream() or schema() (for empty tables) must be provided. + * + * @param stream ArrowArrayStream containing the data + * @return this builder instance + */ + public WriteDatasetBuilder stream(ArrowArrayStream stream) { + Preconditions.checkNotNull(stream); + this.stream = stream; + return this; + } + + /** + * Sets the dataset URI. + * + *

Either uri() or namespace()+tableId() must be specified, but not both. + * + * @param uri The dataset URI (e.g., "s3://bucket/table.lance" or "file:///path/to/table.lance") + * @return this builder instance + */ + public WriteDatasetBuilder uri(String uri) { + this.uri = uri; + return this; + } + + /** + * Sets the namespace. + * + *

Must be used together with tableId(). Either uri() or namespace()+tableId() must be + * specified, but not both. + * + * @param namespace The namespace implementation to use for table operations + * @return this builder instance + */ + public WriteDatasetBuilder namespace(LanceNamespace namespace) { + this.namespace = namespace; + return this; + } + + /** + * Sets the table identifier. + * + *

Must be used together with namespace(). Either uri() or namespace()+tableId() must be + * specified, but not both. + * + * @param tableId The table identifier (e.g., Arrays.asList("my_table")) + * @return this builder instance + */ + public WriteDatasetBuilder tableId(List tableId) { + this.tableId = tableId; + return this; + } + + /** + * Sets the write mode. + * + * @param mode The write mode (CREATE, APPEND, or OVERWRITE) + * @return this builder instance + */ + public WriteDatasetBuilder mode(WriteParams.WriteMode mode) { + Preconditions.checkNotNull(mode); + this.mode = mode; + return this; + } + + /** + * Sets the schema for the dataset. + * + *

If the reader and stream not provided, this is used to create an empty dataset + * + * @param schema The dataset schema + * @return this builder instance + */ + public WriteDatasetBuilder schema(Schema schema) { + this.schema = schema; + return this; + } + + /** + * Sets storage options for the dataset. + * + * @param storageOptions Storage configuration options + * @return this builder instance + */ + public WriteDatasetBuilder storageOptions(Map storageOptions) { + this.storageOptions = new HashMap<>(storageOptions); + return this; + } + + /** + * Sets whether to ignore storage options from the namespace's describeTable() or + * createEmptyTable(). + * + * @param ignoreNamespaceStorageOptions If true, storage options returned from namespace will be + * ignored + * @return this builder instance + */ + public WriteDatasetBuilder ignoreNamespaceStorageOptions(boolean ignoreNamespaceStorageOptions) { + this.ignoreNamespaceStorageOptions = ignoreNamespaceStorageOptions; + return this; + } + + /** + * Sets the maximum number of rows per file. + * + * @param maxRowsPerFile Maximum rows per file + * @return this builder instance + */ + public WriteDatasetBuilder maxRowsPerFile(int maxRowsPerFile) { + this.maxRowsPerFile = Optional.of(maxRowsPerFile); + return this; + } + + /** + * Sets the maximum number of rows per group. + * + * @param maxRowsPerGroup Maximum rows per group + * @return this builder instance + */ + public WriteDatasetBuilder maxRowsPerGroup(int maxRowsPerGroup) { + this.maxRowsPerGroup = Optional.of(maxRowsPerGroup); + return this; + } + + /** + * Sets the maximum number of bytes per file. + * + * @param maxBytesPerFile Maximum bytes per file + * @return this builder instance + */ + public WriteDatasetBuilder maxBytesPerFile(long maxBytesPerFile) { + this.maxBytesPerFile = Optional.of(maxBytesPerFile); + return this; + } + + /** + * Sets whether to enable stable row IDs. + * + * @param enableStableRowIds Whether to enable stable row IDs + * @return this builder instance + */ + public WriteDatasetBuilder enableStableRowIds(boolean enableStableRowIds) { + this.enableStableRowIds = Optional.of(enableStableRowIds); + return this; + } + + /** + * Sets the data storage version. + * + * @param dataStorageVersion The Lance file version to use + * @return this builder instance + */ + public WriteDatasetBuilder dataStorageVersion(WriteParams.LanceFileVersion dataStorageVersion) { + this.dataStorageVersion = Optional.of(dataStorageVersion); + return this; + } + + /** + * Sets the S3 credentials refresh offset in seconds. + * + *

This parameter controls how long before credential expiration to refresh them. For example, + * if credentials expire at T+60s and this is set to 10, credentials will be refreshed at T+50s. + * + * @param s3CredentialsRefreshOffsetSeconds Refresh offset in seconds + * @return this builder instance + */ + public WriteDatasetBuilder s3CredentialsRefreshOffsetSeconds( + long s3CredentialsRefreshOffsetSeconds) { + this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds); + return this; + } + + /** + * Executes the write operation and returns the created dataset. + * + *

If a namespace is configured via namespace()+tableId(), this automatically handles table + * creation or retrieval through the namespace API with credential vending support. + * + * @return Dataset + * @throws IllegalArgumentException if required parameters are missing or invalid + */ + public Dataset execute() { + // Auto-create allocator if not provided + if (allocator == null) { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + // Validate that exactly one of uri or namespace is provided + boolean hasUri = uri != null; + boolean hasNamespace = namespace != null && tableId != null; + + if (hasUri && hasNamespace) { + throw new IllegalArgumentException( + "Cannot specify both uri() and namespace()+tableId(). Use one or the other."); + } + if (!hasUri && !hasNamespace) { + if (namespace != null) { + throw new IllegalArgumentException( + "namespace() is set but tableId() is missing. Both must be provided together."); + } else if (tableId != null) { + throw new IllegalArgumentException( + "tableId() is set but namespace() is missing. Both must be provided together."); + } else { + throw new IllegalArgumentException("Either uri() or namespace()+tableId() must be called."); + } + } + + // Validate data source - exactly one of reader, stream, or schema must be provided + int dataSourceCount = 0; + if (reader != null) dataSourceCount++; + if (stream != null) dataSourceCount++; + if (schema != null && reader == null && stream == null) dataSourceCount++; + + if (dataSourceCount == 0) { + throw new IllegalArgumentException( + "Must provide data via reader(), stream(), or schema() (for empty tables)."); + } + if (dataSourceCount > 1) { + throw new IllegalArgumentException( + "Cannot specify multiple data sources. " + + "Use only one of: reader(), stream(), or schema()."); + } + + // Handle namespace-based writing + if (hasNamespace) { + return executeWithNamespace(); + } + + // Handle URI-based writing + return executeWithUri(); + } + + private Dataset executeWithNamespace() { + String tableUri; + Map namespaceStorageOptions = null; + + // Mode-specific namespace operations + if (mode == WriteParams.WriteMode.CREATE) { + // Call namespace.createEmptyTable() to create new table + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(tableId); + + CreateEmptyTableResponse response = namespace.createEmptyTable(request); + + tableUri = response.getLocation(); + if (tableUri == null || tableUri.isEmpty()) { + throw new IllegalArgumentException("Namespace did not return a table location"); + } + + namespaceStorageOptions = ignoreNamespaceStorageOptions ? null : response.getStorageOptions(); + } else { + // For APPEND/OVERWRITE modes, call namespace.describeTable() + DescribeTableRequest request = new DescribeTableRequest(); + request.setId(tableId); + + DescribeTableResponse response = namespace.describeTable(request); + + tableUri = response.getLocation(); + if (tableUri == null || tableUri.isEmpty()) { + throw new IllegalArgumentException("Namespace did not return a table location"); + } + + namespaceStorageOptions = ignoreNamespaceStorageOptions ? null : response.getStorageOptions(); + } + + // Merge storage options (namespace options + user options, with namespace taking precedence) + Map mergedStorageOptions = new HashMap<>(storageOptions); + if (namespaceStorageOptions != null && !namespaceStorageOptions.isEmpty()) { + mergedStorageOptions.putAll(namespaceStorageOptions); + } + + // Build WriteParams with merged storage options + WriteParams.Builder paramsBuilder = + new WriteParams.Builder().withMode(mode).withStorageOptions(mergedStorageOptions); + + maxRowsPerFile.ifPresent(paramsBuilder::withMaxRowsPerFile); + maxRowsPerGroup.ifPresent(paramsBuilder::withMaxRowsPerGroup); + maxBytesPerFile.ifPresent(paramsBuilder::withMaxBytesPerFile); + enableStableRowIds.ifPresent(paramsBuilder::withEnableStableRowIds); + dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); + s3CredentialsRefreshOffsetSeconds.ifPresent( + paramsBuilder::withS3CredentialsRefreshOffsetSeconds); + + WriteParams params = paramsBuilder.build(); + + // Create storage options provider for credential refresh during long-running writes + StorageOptionsProvider storageOptionsProvider = + ignoreNamespaceStorageOptions + ? null + : new LanceNamespaceStorageOptionsProvider(namespace, tableId); + + // Use Dataset.create() which handles CREATE/APPEND/OVERWRITE modes + return createDatasetWithStream(tableUri, params, storageOptionsProvider); + } + + private Dataset executeWithUri() { + WriteParams.Builder paramsBuilder = + new WriteParams.Builder().withMode(mode).withStorageOptions(storageOptions); + + maxRowsPerFile.ifPresent(paramsBuilder::withMaxRowsPerFile); + maxRowsPerGroup.ifPresent(paramsBuilder::withMaxRowsPerGroup); + maxBytesPerFile.ifPresent(paramsBuilder::withMaxBytesPerFile); + enableStableRowIds.ifPresent(paramsBuilder::withEnableStableRowIds); + dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); + s3CredentialsRefreshOffsetSeconds.ifPresent( + paramsBuilder::withS3CredentialsRefreshOffsetSeconds); + + WriteParams params = paramsBuilder.build(); + + return createDatasetWithStream(uri, params, null); + } + + private Dataset createDatasetWithStream( + String path, WriteParams params, StorageOptionsProvider storageOptionsProvider) { + // If stream is directly provided, use it + if (stream != null) { + return Dataset.create(allocator, stream, path, params, storageOptionsProvider); + } + + // If reader is provided, convert to stream + if (reader != null) { + try (ArrowArrayStream tempStream = ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader, tempStream); + return Dataset.create(allocator, tempStream, path, params, storageOptionsProvider); + } + } + + // If only schema is provided (empty table), use Dataset.create with schema + if (schema != null) { + return Dataset.create(allocator, path, schema, params); + } + + throw new IllegalStateException("No data source provided"); + } +} diff --git a/java/src/main/java/com/lancedb/lance/WriteFragmentBuilder.java b/java/src/main/java/com/lancedb/lance/WriteFragmentBuilder.java new file mode 100644 index 0000000000..10b4dd6735 --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/WriteFragmentBuilder.java @@ -0,0 +1,275 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.io.StorageOptionsProvider; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; + +import java.util.List; +import java.util.Map; + +/** + * Builder for writing fragments. + * + *

This builder provides a fluent API for creating fragments with various configuration options. + * It supports both VectorSchemaRoot and ArrowArrayStream as data sources. + * + *

Example usage: + * + *

{@code
+ * List fragments = Fragment.write()
+ *     .datasetUri("s3://bucket/dataset.lance")
+ *     .allocator(allocator)
+ *     .data(vectorSchemaRoot)
+ *     .storageOptions(storageOptions)
+ *     .s3CredentialsRefreshOffsetSeconds(10)
+ *     .execute();
+ * }
+ */ +public class WriteFragmentBuilder { + private String datasetUri; + private BufferAllocator allocator; + private VectorSchemaRoot vectorSchemaRoot; + private ArrowArrayStream arrowArrayStream; + private WriteParams writeParams; + private WriteParams.Builder writeParamsBuilder; + private StorageOptionsProvider storageOptionsProvider; + + WriteFragmentBuilder() {} + + /** + * Set the dataset URI where fragments will be written. + * + * @param datasetUri the dataset URI + * @return this builder + */ + public WriteFragmentBuilder datasetUri(String datasetUri) { + this.datasetUri = datasetUri; + return this; + } + + /** + * Set the buffer allocator for Arrow operations. + * + * @param allocator the buffer allocator + * @return this builder + */ + public WriteFragmentBuilder allocator(BufferAllocator allocator) { + this.allocator = allocator; + return this; + } + + /** + * Set the data to write using a VectorSchemaRoot. + * + * @param root the vector schema root containing the data + * @return this builder + */ + public WriteFragmentBuilder data(VectorSchemaRoot root) { + Preconditions.checkState( + this.arrowArrayStream == null, "Cannot set both VectorSchemaRoot and ArrowArrayStream"); + this.vectorSchemaRoot = root; + return this; + } + + /** + * Set the data to write using an ArrowArrayStream. + * + * @param stream the arrow array stream containing the data + * @return this builder + */ + public WriteFragmentBuilder data(ArrowArrayStream stream) { + Preconditions.checkState( + this.vectorSchemaRoot == null, "Cannot set both VectorSchemaRoot and ArrowArrayStream"); + this.arrowArrayStream = stream; + return this; + } + + /** + * Set the write parameters. + * + * @param params the write parameters + * @return this builder + */ + public WriteFragmentBuilder writeParams(WriteParams params) { + this.writeParams = params; + return this; + } + + /** + * Set storage options for object store access. + * + * @param storageOptions the storage options + * @return this builder + */ + public WriteFragmentBuilder storageOptions(Map storageOptions) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withStorageOptions(storageOptions); + return this; + } + + /** + * Set the storage options provider for dynamic credential refresh. + * + * @param provider the storage options provider + * @return this builder + */ + public WriteFragmentBuilder storageOptionsProvider(StorageOptionsProvider provider) { + this.storageOptionsProvider = provider; + return this; + } + + /** + * Set the S3 credentials refresh offset in seconds. + * + *

This parameter controls how long before credential expiration to refresh them. For example, + * if credentials expire at T+60s and this is set to 10, credentials will be refreshed at T+50s. + * + * @param seconds refresh offset in seconds + * @return this builder + */ + public WriteFragmentBuilder s3CredentialsRefreshOffsetSeconds(long seconds) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withS3CredentialsRefreshOffsetSeconds(seconds); + return this; + } + + /** + * Set the maximum number of rows per file. + * + * @param maxRowsPerFile maximum rows per file + * @return this builder + */ + public WriteFragmentBuilder maxRowsPerFile(int maxRowsPerFile) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withMaxRowsPerFile(maxRowsPerFile); + return this; + } + + /** + * Set the maximum number of rows per group. + * + * @param maxRowsPerGroup maximum rows per group + * @return this builder + */ + public WriteFragmentBuilder maxRowsPerGroup(int maxRowsPerGroup) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withMaxRowsPerGroup(maxRowsPerGroup); + return this; + } + + /** + * Set the maximum number of bytes per file. + * + * @param maxBytesPerFile maximum bytes per file + * @return this builder + */ + public WriteFragmentBuilder maxBytesPerFile(long maxBytesPerFile) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withMaxBytesPerFile(maxBytesPerFile); + return this; + } + + /** + * Set the write mode. + * + * @param mode the write mode + * @return this builder + */ + public WriteFragmentBuilder mode(WriteParams.WriteMode mode) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withMode(mode); + return this; + } + + /** + * Enable or disable stable row IDs. + * + * @param enable whether to enable stable row IDs + * @return this builder + */ + public WriteFragmentBuilder enableStableRowIds(boolean enable) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withEnableStableRowIds(enable); + return this; + } + + /** + * Set the data storage version. + * + * @param version the data storage version + * @return this builder + */ + public WriteFragmentBuilder dataStorageVersion(WriteParams.LanceFileVersion version) { + ensureWriteParamsBuilder(); + this.writeParamsBuilder.withDataStorageVersion(version); + return this; + } + + /** + * Execute the fragment write operation. + * + * @return the list of fragment metadata for the created fragments + */ + public List execute() { + validate(); + + // Build the write params if builder was used + WriteParams finalWriteParams = buildWriteParams(); + + if (vectorSchemaRoot != null) { + return Fragment.create( + datasetUri, allocator, vectorSchemaRoot, finalWriteParams, storageOptionsProvider); + } else { + return Fragment.create( + datasetUri, arrowArrayStream, finalWriteParams, storageOptionsProvider); + } + } + + private void ensureWriteParamsBuilder() { + if (this.writeParamsBuilder == null) { + this.writeParamsBuilder = new WriteParams.Builder(); + } + } + + private WriteParams buildWriteParams() { + if (writeParams != null) { + return writeParams; + } else if (writeParamsBuilder != null) { + return writeParamsBuilder.build(); + } else { + return new WriteParams.Builder().build(); + } + } + + private void validate() { + Preconditions.checkNotNull(datasetUri, "datasetUri is required"); + Preconditions.checkState( + vectorSchemaRoot != null || arrowArrayStream != null, + "Either VectorSchemaRoot or ArrowArrayStream must be provided"); + Preconditions.checkState( + vectorSchemaRoot == null || arrowArrayStream == null, + "Cannot set both VectorSchemaRoot and ArrowArrayStream"); + Preconditions.checkState( + vectorSchemaRoot == null || allocator != null, + "allocator is required when using VectorSchemaRoot"); + Preconditions.checkState( + writeParams == null || writeParamsBuilder == null, + "Cannot use both writeParams() and individual parameter methods"); + } +} diff --git a/java/src/main/java/com/lancedb/lance/WriteParams.java b/java/src/main/java/com/lancedb/lance/WriteParams.java index 85884042a0..a49cc57c3a 100644 --- a/java/src/main/java/com/lancedb/lance/WriteParams.java +++ b/java/src/main/java/com/lancedb/lance/WriteParams.java @@ -56,6 +56,7 @@ public String getVersionString() { private final Optional enableStableRowIds; private final Optional dataStorageVersion; private Map storageOptions = new HashMap<>(); + private final Optional s3CredentialsRefreshOffsetSeconds; private WriteParams( Optional maxRowsPerFile, @@ -64,7 +65,8 @@ private WriteParams( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, - Map storageOptions) { + Map storageOptions, + Optional s3CredentialsRefreshOffsetSeconds) { this.maxRowsPerFile = maxRowsPerFile; this.maxRowsPerGroup = maxRowsPerGroup; this.maxBytesPerFile = maxBytesPerFile; @@ -72,6 +74,7 @@ private WriteParams( this.enableStableRowIds = enableStableRowIds; this.dataStorageVersion = dataStorageVersion; this.storageOptions = storageOptions; + this.s3CredentialsRefreshOffsetSeconds = s3CredentialsRefreshOffsetSeconds; } public Optional getMaxRowsPerFile() { @@ -107,6 +110,10 @@ public Map getStorageOptions() { return storageOptions; } + public Optional getS3CredentialsRefreshOffsetSeconds() { + return s3CredentialsRefreshOffsetSeconds; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -127,6 +134,7 @@ public static class Builder { private Optional enableStableRowIds = Optional.empty(); private Optional dataStorageVersion = Optional.empty(); private Map storageOptions = new HashMap<>(); + private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); public Builder withMaxRowsPerFile(int maxRowsPerFile) { this.maxRowsPerFile = Optional.of(maxRowsPerFile); @@ -163,6 +171,11 @@ public Builder withEnableStableRowIds(boolean enableStableRowIds) { return this; } + public Builder withS3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOffsetSeconds) { + this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds); + return this; + } + public WriteParams build() { return new WriteParams( maxRowsPerFile, @@ -171,7 +184,8 @@ public WriteParams build() { mode, enableStableRowIds, dataStorageVersion, - storageOptions); + storageOptions, + s3CredentialsRefreshOffsetSeconds); } } } diff --git a/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java index 039fbbce66..eabdc68ca6 100644 --- a/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java +++ b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java @@ -13,14 +13,20 @@ */ package com.lancedb.lance; +import com.lancedb.lance.namespace.DirectoryNamespace; import com.lancedb.lance.namespace.LanceNamespace; +import com.lancedb.lance.namespace.LanceNamespaceStorageOptionsProvider; +import com.lancedb.lance.namespace.model.CreateEmptyTableRequest; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; import com.lancedb.lance.namespace.model.DescribeTableRequest; import com.lancedb.lance.namespace.model.DescribeTableResponse; +import com.lancedb.lance.operation.Append; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -40,6 +46,7 @@ import software.amazon.awssdk.services.s3.model.S3Object; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -53,7 +60,7 @@ /** * Integration tests for Lance with S3 and credential refresh using StorageOptionsProvider. * - *

This test simulates a mock credential provider that returns incrementing credentials and + *

This test simulates a tracking credential provider that returns incrementing credentials and * verifies that the credential refresh mechanism works correctly. * *

These tests require LocalStack to be running. Run with: docker compose up -d @@ -120,62 +127,100 @@ private static void deleteBucket() { } /** - * Mock LanceNamespace implementation for testing. + * Tracking LanceNamespace implementation for testing. * - *

This implementation: - Returns table location and storage options via describeTable() - - * Tracks the number of times describeTable has been called - Returns credentials with short - * expiration times for testing refresh + *

This implementation wraps DirectoryNamespace and tracks API calls. It returns incrementing + * credentials with expiration timestamps to test the credential refresh mechanism. */ - static class MockLanceNamespace implements LanceNamespace { - private final Map tableLocations = new HashMap<>(); + static class TrackingNamespace implements LanceNamespace { + private final String bucketName; private final Map baseStorageOptions; private final int credentialExpiresInSeconds; - private final AtomicInteger callCount = new AtomicInteger(0); + private final AtomicInteger describeCallCount = new AtomicInteger(0); + private final AtomicInteger createCallCount = new AtomicInteger(0); + private final DirectoryNamespace inner; - public MockLanceNamespace(Map storageOptions, int credentialExpiresInSeconds) { + public TrackingNamespace( + String bucketName, Map storageOptions, int credentialExpiresInSeconds) { + this.bucketName = bucketName; this.baseStorageOptions = new HashMap<>(storageOptions); this.credentialExpiresInSeconds = credentialExpiresInSeconds; + + // Create underlying DirectoryNamespace with storage options + Map dirProps = new HashMap<>(); + for (Map.Entry entry : storageOptions.entrySet()) { + dirProps.put("storage." + entry.getKey(), entry.getValue()); + } + + // Set root based on bucket type + if (bucketName.startsWith("/") || bucketName.startsWith("file://")) { + dirProps.put("root", bucketName + "/namespace_root"); + } else { + dirProps.put("root", "s3://" + bucketName + "/namespace_root"); + } + + this.inner = new DirectoryNamespace(); + try (BufferAllocator allocator = new RootAllocator()) { + this.inner.initialize(dirProps, allocator); + } } - @Override - public void initialize(Map configProperties, BufferAllocator allocator) { - // Not needed for test + public int getDescribeCallCount() { + return describeCallCount.get(); } - public void registerTable(String tableName, String location) { - tableLocations.put(tableName, location); + public int getCreateCallCount() { + return createCallCount.get(); } - public int getCallCount() { - return callCount.get(); + @Override + public void initialize(Map configProperties, BufferAllocator allocator) { + // Already initialized in constructor } @Override public String namespaceId() { - return "MockLanceNamespace { }"; + return "TrackingNamespace { inner: " + inner.namespaceId() + " }"; } - @Override - public DescribeTableResponse describeTable(DescribeTableRequest request) { - int count = callCount.incrementAndGet(); + /** + * Modifies storage options to add incrementing credentials with expiration timestamp. + * + * @param storageOptions Original storage options + * @param count Call count to use for credential generation + * @return Modified storage options with new credentials + */ + private Map modifyStorageOptions( + Map storageOptions, int count) { + Map modified = + storageOptions != null ? new HashMap<>(storageOptions) : new HashMap<>(); - String tableName = String.join("/", request.getId()); - String location = tableLocations.get(tableName); - if (location == null) { - throw new IllegalArgumentException("Table not found: " + tableName); - } + modified.put("aws_access_key_id", "AKID_" + count); + modified.put("aws_secret_access_key", "SECRET_" + count); + modified.put("aws_session_token", "TOKEN_" + count); - // Create storage options with expiration - Map storageOptions = new HashMap<>(baseStorageOptions); long expiresAtMillis = System.currentTimeMillis() + (credentialExpiresInSeconds * 1000L); - storageOptions.put("expires_at_millis", String.valueOf(expiresAtMillis)); + modified.put("expires_at_millis", String.valueOf(expiresAtMillis)); - DescribeTableResponse response = new DescribeTableResponse(); - response.setLocation(location); - response.setStorageOptions(storageOptions); - if (request.getVersion() != null) { - response.setVersion(request.getVersion()); - } + return modified; + } + + @Override + public CreateEmptyTableResponse createEmptyTable(CreateEmptyTableRequest request) { + int count = createCallCount.incrementAndGet(); + + CreateEmptyTableResponse response = inner.createEmptyTable(request); + response.setStorageOptions(modifyStorageOptions(response.getStorageOptions(), count)); + + return response; + } + + @Override + public DescribeTableResponse describeTable(DescribeTableRequest request) { + int count = describeCallCount.incrementAndGet(); + + DescribeTableResponse response = inner.describeTable(request); + response.setStorageOptions(modifyStorageOptions(response.getStorageOptions(), count)); return response; } @@ -184,10 +229,7 @@ public DescribeTableResponse describeTable(DescribeTableRequest request) { @Test void testOpenDatasetWithoutRefresh() throws Exception { try (BufferAllocator allocator = new RootAllocator()) { - // Create test dataset directly on S3 - String tableName = UUID.randomUUID().toString(); - String tableUri = "s3://" + BUCKET_NAME + "/" + tableName + ".lance"; - + // Set up storage options Map storageOptions = new HashMap<>(); storageOptions.put("allow_http", "true"); storageOptions.put("aws_access_key_id", ACCESS_KEY); @@ -195,7 +237,11 @@ void testOpenDatasetWithoutRefresh() throws Exception { storageOptions.put("aws_endpoint", ENDPOINT_URL); storageOptions.put("aws_region", REGION); - // Create schema and write dataset + // Create tracking namespace with 60-second expiration (long enough to not expire during test) + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + // Create schema and data Schema schema = new Schema( Arrays.asList( @@ -218,26 +264,54 @@ void testOpenDatasetWithoutRefresh() throws Exception { bVector.setValueCount(2); root.setRowCount(2); - WriteParams writeParams = - new WriteParams.Builder().withStorageOptions(storageOptions).build(); - - // Create dataset using Dataset.create - try (Dataset dataset = Dataset.create(allocator, tableUri, schema, writeParams)) { - // Add data via fragments - List fragments = - Fragment.create(tableUri, allocator, root, writeParams); - FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); - try (Dataset updatedDataset = - Dataset.commit(allocator, tableUri, appendOp, Optional.of(1L), storageOptions)) { - assertEquals(2, updatedDataset.version()); - assertEquals(2, updatedDataset.countRows()); - } + // Create a test reader that returns our VectorSchemaRoot + ArrowReader testReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + // Create dataset through namespace + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(testReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .execute()) { + assertEquals(2, dataset.countRows()); } } - // Create mock namespace with 60-second expiration (long enough to not expire during test) - MockLanceNamespace namespace = new MockLanceNamespace(storageOptions, 60); - namespace.registerTable(tableName, tableUri); + // Verify createEmptyTable was called + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); // Open dataset through namespace WITH refresh enabled // Use 10-second refresh offset, so credentials effectively expire at T+50s @@ -246,7 +320,7 @@ void testOpenDatasetWithoutRefresh() throws Exception { .setS3CredentialsRefreshOffsetSeconds(10) // Refresh 10s before expiration .build(); - int callCountBeforeOpen = namespace.getCallCount(); + int callCountBeforeOpen = namespace.getDescribeCallCount(); try (Dataset dsFromNamespace = Dataset.open() .allocator(allocator) @@ -256,7 +330,7 @@ void testOpenDatasetWithoutRefresh() throws Exception { .build()) { // With the fix, describeTable should only be called once during open // to get the table location and initial storage options - int callCountAfterOpen = namespace.getCallCount(); + int callCountAfterOpen = namespace.getDescribeCallCount(); assertEquals( 1, callCountAfterOpen - callCountBeforeOpen, @@ -272,10 +346,10 @@ void testOpenDatasetWithoutRefresh() throws Exception { List fragments = dsFromNamespace.getFragments(); assertEquals(1, fragments.size()); List versions = dsFromNamespace.listVersions(); - assertEquals(2, versions.size()); + assertEquals(1, versions.size()); // With the fix, credentials are cached so no additional calls are made - int finalCallCount = namespace.getCallCount(); + int finalCallCount = namespace.getDescribeCallCount(); int totalCalls = finalCallCount - callCountBeforeOpen; assertEquals( 1, @@ -289,10 +363,7 @@ void testOpenDatasetWithoutRefresh() throws Exception { @Test void testStorageOptionsProviderWithRefresh() throws Exception { try (BufferAllocator allocator = new RootAllocator()) { - // Create test dataset - String tableName = UUID.randomUUID().toString(); - String tableUri = "s3://" + BUCKET_NAME + "/" + tableName + ".lance"; - + // Set up storage options Map storageOptions = new HashMap<>(); storageOptions.put("allow_http", "true"); storageOptions.put("aws_access_key_id", ACCESS_KEY); @@ -300,7 +371,11 @@ void testStorageOptionsProviderWithRefresh() throws Exception { storageOptions.put("aws_endpoint", ENDPOINT_URL); storageOptions.put("aws_region", REGION); - // Create schema and write dataset + // Create tracking namespace with 5-second expiration for faster testing + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 5); + String tableName = UUID.randomUUID().toString(); + + // Create schema and data Schema schema = new Schema( Arrays.asList( @@ -323,25 +398,55 @@ void testStorageOptionsProviderWithRefresh() throws Exception { bVector.setValueCount(2); root.setRowCount(2); - WriteParams writeParams = - new WriteParams.Builder().withStorageOptions(storageOptions).build(); - - // Create dataset using Dataset.create - try (Dataset dataset = Dataset.create(allocator, tableUri, schema, writeParams)) { - // Add data via fragments - List fragments = - Fragment.create(tableUri, allocator, root, writeParams); - FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); - try (Dataset updatedDataset = - Dataset.commit(allocator, tableUri, appendOp, Optional.of(1L), storageOptions)) { - assertEquals(2, updatedDataset.countRows()); - } + // Create a test reader that returns our VectorSchemaRoot + ArrowReader testReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + // Create dataset through namespace with refresh enabled + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(testReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .s3CredentialsRefreshOffsetSeconds(2) // Refresh 2s before expiration + .execute()) { + assertEquals(2, dataset.countRows()); } } - // Create mock namespace with 5-second expiration for faster testing - MockLanceNamespace namespace = new MockLanceNamespace(storageOptions, 5); - namespace.registerTable(tableName, tableUri); + // Verify createEmptyTable was called + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); // Open dataset through namespace with refresh enabled // Use 2-second refresh offset so credentials effectively expire at T+3s (5s - 2s) @@ -350,7 +455,7 @@ void testStorageOptionsProviderWithRefresh() throws Exception { .setS3CredentialsRefreshOffsetSeconds(2) // Refresh 2s before expiration .build(); - int callCountBeforeOpen = namespace.getCallCount(); + int callCountBeforeOpen = namespace.getDescribeCallCount(); try (Dataset dsFromNamespace = Dataset.open() .allocator(allocator) @@ -359,7 +464,7 @@ void testStorageOptionsProviderWithRefresh() throws Exception { .readOptions(readOptions) .build()) { // With the fix, describeTable should only be called once during open - int callCountAfterOpen = namespace.getCallCount(); + int callCountAfterOpen = namespace.getDescribeCallCount(); assertEquals( 1, callCountAfterOpen - callCountBeforeOpen, @@ -370,7 +475,7 @@ void testStorageOptionsProviderWithRefresh() throws Exception { assertEquals(2, dsFromNamespace.countRows()); // Record call count after initial reads - int callCountAfterInitialReads = namespace.getCallCount(); + int callCountAfterInitialReads = namespace.getDescribeCallCount(); int callsAfterFirstRead = callCountAfterInitialReads - callCountBeforeOpen; assertEquals( 1, @@ -387,9 +492,9 @@ void testStorageOptionsProviderWithRefresh() throws Exception { List fragments = dsFromNamespace.getFragments(); assertEquals(1, fragments.size()); List versions = dsFromNamespace.listVersions(); - assertEquals(2, versions.size()); + assertEquals(1, versions.size()); - int finalCallCount = namespace.getCallCount(); + int finalCallCount = namespace.getDescribeCallCount(); int totalCallsAfterExpiration = finalCallCount - callCountBeforeOpen; assertEquals( 2, @@ -400,4 +505,936 @@ void testStorageOptionsProviderWithRefresh() throws Exception { } } } + + @Test + void testWriteDatasetBuilderWithNamespaceCreate() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + // Create schema and data + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 10); + bVector.set(1, 20); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + // Create a test reader that returns our VectorSchemaRoot + ArrowReader testReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + int callCountBefore = namespace.getCreateCallCount(); + + // Use the write builder to create a dataset through namespace + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(testReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .execute()) { + + // Verify createEmptyTable was called + int callCountAfter = namespace.getCreateCallCount(); + assertEquals( + 1, callCountAfter - callCountBefore, "createEmptyTable should be called once"); + + // Verify dataset was created successfully + assertEquals(2, dataset.countRows()); + assertEquals(schema, dataset.getSchema()); + } + } + } + } + + @Test + void testWriteDatasetBuilderWithNamespaceCreateCallCounts() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace with 60-second expiration (long enough that no refresh happens) + // Credentials expire at T+60s. With a 1s refresh offset, refresh would happen at T+59s. + // Since writes complete well under 59 seconds, NO credential refresh should occur. + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + // Verify initial call counts + assertEquals(0, namespace.getCreateCallCount(), "createEmptyTable should not be called yet"); + assertEquals(0, namespace.getDescribeCallCount(), "describeTable should not be called yet"); + + // Create schema and data + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 10); + bVector.set(1, 20); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + // Create a test reader that returns our VectorSchemaRoot + ArrowReader testReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + // Use the write builder to create a dataset through namespace + // Set a 1-second refresh offset. Credentials expire at T+60s, so refresh at T+59s. + // Write completes instantly, so NO describeTable call should happen for refresh. + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(testReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .s3CredentialsRefreshOffsetSeconds(1) + .execute()) { + + // Verify createEmptyTable was called exactly ONCE + assertEquals( + 1, namespace.getCreateCallCount(), "createEmptyTable should be called exactly once"); + + // Verify describeTable was NOT called during CREATE + // Initial credentials come from createEmptyTable response, and since credentials + // don't expire during the fast write, NO refresh (describeTable) is needed + assertEquals( + 0, + namespace.getDescribeCallCount(), + "describeTable should NOT be called during CREATE - " + + "initial credentials come from createEmptyTable response and don't expire"); + + // Verify dataset was created successfully + assertEquals(2, dataset.countRows()); + assertEquals(schema, dataset.getSchema()); + } + } + + // Verify counts after dataset is closed + assertEquals( + 1, namespace.getCreateCallCount(), "createEmptyTable should still be 1 after close"); + assertEquals( + 0, + namespace.getDescribeCallCount(), + "describeTable should still be 0 after close (no refresh needed)"); + + // Now open the dataset through namespace with long-lived credentials (60s expiration) + // With 1s refresh offset, credentials are valid for 59s - plenty of time for reads + ReadOptions readOptions = + new ReadOptions.Builder().setS3CredentialsRefreshOffsetSeconds(1).build(); + + try (Dataset dsFromNamespace = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .readOptions(readOptions) + .build()) { + + // createEmptyTable should NOT be called during open (only during CREATE) + assertEquals( + 1, + namespace.getCreateCallCount(), + "createEmptyTable should still be 1 (not called during open)"); + + // describeTable is called exactly ONCE during open to get table location + assertEquals( + 1, + namespace.getDescribeCallCount(), + "describeTable should be called exactly once during open"); + + // Verify we can read the data multiple times + assertEquals(2, dsFromNamespace.countRows()); + assertEquals(2, dsFromNamespace.countRows()); + assertEquals(2, dsFromNamespace.countRows()); + + // After multiple reads, no additional describeTable calls should be made + // (credentials are cached and don't expire during this fast test) + assertEquals( + 1, + namespace.getDescribeCallCount(), + "describeTable should still be 1 after reads (credentials cached, no refresh needed)"); + } + + // Final verification + assertEquals(1, namespace.getCreateCallCount(), "Final: createEmptyTable = 1"); + assertEquals(1, namespace.getDescribeCallCount(), "Final: describeTable = 1"); + } + } + + @Test + void testWriteDatasetBuilderWithNamespaceAppend() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 10); + bVector.set(1, 20); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + // Create a test reader that returns our VectorSchemaRoot + ArrowReader testReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + // Create initial dataset through namespace + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(testReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .execute()) { + assertEquals(2, dataset.countRows()); + } + + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); + int initialDescribeCount = namespace.getDescribeCallCount(); + + // Now append data using the write builder with namespace + ArrowReader appendReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + // Use the write builder to append to dataset through namespace + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(appendReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.APPEND) + .execute()) { + + // Verify describeTable was called + int callCountAfter = namespace.getDescribeCallCount(); + assertEquals( + 1, + callCountAfter - initialDescribeCount, + "describeTable should be called once for append"); + + // Verify data was appended successfully + assertEquals(4, dataset.countRows()); // Original 2 + appended 2 + } + } + } + } + + @Test + void testWriteDatasetBuilderWithNamespaceOverwrite() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + // Create initial dataset with 1 row + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(1); + bVector.allocateNew(1); + + aVector.set(0, 1); + bVector.set(0, 2); + + aVector.setValueCount(1); + bVector.setValueCount(1); + root.setRowCount(1); + + ArrowReader createReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(createReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.CREATE) + .execute()) { + assertEquals(1, dataset.countRows()); + } + + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); + assertEquals(0, namespace.getDescribeCallCount(), "describeTable should not be called yet"); + + // Now overwrite with 2 rows + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 10); + bVector.set(0, 20); + aVector.set(1, 100); + bVector.set(1, 200); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + ArrowReader overwriteReader = + new ArrowReader(allocator) { + boolean firstRead = true; + + @Override + public boolean loadNextBatch() { + if (firstRead) { + firstRead = false; + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() {} + + @Override + protected Schema readSchema() { + return schema; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() { + return root; + } + }; + + try (Dataset dataset = + Dataset.write() + .allocator(allocator) + .reader(overwriteReader) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .mode(WriteParams.WriteMode.OVERWRITE) + .execute()) { + + // Verify describeTable was called for overwrite + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should still be 1"); + int describeCountAfterOverwrite = namespace.getDescribeCallCount(); + assertEquals( + 1, describeCountAfterOverwrite, "describeTable should be called once for overwrite"); + + // Verify data was overwritten successfully + assertEquals(2, dataset.countRows()); + assertEquals( + 2, dataset.listVersions().size()); // Version 1 (create) + Version 2 (overwrite) + } + + // Verify we can open and read the dataset through namespace + try (Dataset ds = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .build()) { + assertEquals(2, ds.countRows(), "Should have 2 rows after overwrite"); + assertEquals(2, ds.listVersions().size(), "Should have 2 versions"); + } + } + } + } + + @Test + void testDistributedWriteWithNamespace() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + // Step 1: Create empty table via namespace + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(Arrays.asList(tableName)); + CreateEmptyTableResponse response = namespace.createEmptyTable(request); + + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); + assertEquals(0, namespace.getDescribeCallCount(), "describeTable should not be called yet"); + + String tableUri = response.getLocation(); + Map namespaceStorageOptions = response.getStorageOptions(); + + // Merge storage options + Map mergedOptions = new HashMap<>(storageOptions); + if (namespaceStorageOptions != null) { + mergedOptions.putAll(namespaceStorageOptions); + } + + // Create storage options provider + LanceNamespaceStorageOptionsProvider storageOptionsProvider = + new LanceNamespaceStorageOptionsProvider(namespace, Arrays.asList(tableName)); + + WriteParams writeParams = new WriteParams.Builder().withStorageOptions(mergedOptions).build(); + + // Step 2: Write multiple fragments in parallel (simulated) + List allFragments = new ArrayList<>(); + + // Fragment 1: 2 rows + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 3); + bVector.set(1, 4); + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + List fragment1 = + Fragment.create(tableUri, allocator, root, writeParams, storageOptionsProvider); + allFragments.addAll(fragment1); + } + + // Fragment 2: 2 rows + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + aVector.set(0, 10); + bVector.set(0, 20); + aVector.set(1, 30); + bVector.set(1, 40); + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + List fragment2 = + Fragment.create(tableUri, allocator, root, writeParams, storageOptionsProvider); + allFragments.addAll(fragment2); + } + + // Fragment 3: 1 row + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(1); + bVector.allocateNew(1); + aVector.set(0, 100); + bVector.set(0, 200); + aVector.setValueCount(1); + bVector.setValueCount(1); + root.setRowCount(1); + + List fragment3 = + Fragment.create(tableUri, allocator, root, writeParams, storageOptionsProvider); + allFragments.addAll(fragment3); + } + + // Step 3: Commit all fragments as one operation + FragmentOperation.Overwrite overwriteOp = + new FragmentOperation.Overwrite(allFragments, schema); + + try (Dataset dataset = + Dataset.commit(allocator, tableUri, overwriteOp, Optional.empty(), mergedOptions)) { + assertEquals(5, dataset.countRows(), "Should have 5 total rows from all fragments"); + assertEquals(1, dataset.listVersions().size(), "Should have 1 version after commit"); + } + + // Step 4: Open dataset through namespace and verify + try (Dataset dsFromNamespace = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .build()) { + assertEquals(5, dsFromNamespace.countRows(), "Should read 5 rows through namespace"); + } + } + } + + @Test + void testFragmentCreateAndCommitWithNamespace() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace with 60-second expiration + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + Schema schema = + new Schema( + Arrays.asList( + new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + // Create empty table via namespace + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(Arrays.asList(tableName)); + CreateEmptyTableResponse response = namespace.createEmptyTable(request); + + assertEquals(1, namespace.getCreateCallCount(), "createEmptyTable should be called once"); + + String tableUri = response.getLocation(); + Map namespaceStorageOptions = response.getStorageOptions(); + + // Merge storage options + Map mergedOptions = new HashMap<>(storageOptions); + if (namespaceStorageOptions != null) { + mergedOptions.putAll(namespaceStorageOptions); + } + + // Create storage options provider + LanceNamespaceStorageOptionsProvider provider = + new LanceNamespaceStorageOptionsProvider(namespace, Arrays.asList(tableName)); + + WriteParams writeParams = new WriteParams.Builder().withStorageOptions(mergedOptions).build(); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + IntVector valueVector = (IntVector) root.getVector("value"); + + // Write first fragment + idVector.allocateNew(3); + valueVector.allocateNew(3); + + idVector.set(0, 1); + valueVector.set(0, 100); + idVector.set(1, 2); + valueVector.set(1, 200); + idVector.set(2, 3); + valueVector.set(2, 300); + + idVector.setValueCount(3); + valueVector.setValueCount(3); + root.setRowCount(3); + + // Create fragment with StorageOptionsProvider + List fragments1 = + Fragment.create(tableUri, allocator, root, writeParams, provider); + + assertEquals(1, fragments1.size()); + + // Write second fragment with different data + idVector.set(0, 4); + valueVector.set(0, 400); + idVector.set(1, 5); + valueVector.set(1, 500); + idVector.set(2, 6); + valueVector.set(2, 600); + root.setRowCount(3); + + // Create another fragment with the same provider + List fragments2 = + Fragment.create(tableUri, allocator, root, writeParams, provider); + + assertEquals(1, fragments2.size()); + + // Commit first fragment to the dataset using Overwrite (for empty table) + FragmentOperation.Overwrite overwriteOp = + new FragmentOperation.Overwrite(fragments1, schema); + try (Dataset updatedDataset = + Dataset.commit(allocator, tableUri, overwriteOp, Optional.empty(), mergedOptions)) { + assertEquals(1, updatedDataset.version()); + assertEquals(3, updatedDataset.countRows()); + + // Append second fragment + FragmentOperation.Append appendOp2 = new FragmentOperation.Append(fragments2); + try (Dataset finalDataset = + Dataset.commit(allocator, tableUri, appendOp2, Optional.of(1L), mergedOptions)) { + assertEquals(2, finalDataset.version()); + assertEquals(6, finalDataset.countRows()); + } + } + } + + // Verify we can open and read the dataset through namespace + try (Dataset ds = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .build()) { + assertEquals(6, ds.countRows(), "Should have 6 rows total"); + assertEquals(2, ds.listVersions().size(), "Should have 2 versions"); + } + } + } + + @Test + void testTransactionCommitWithNamespace() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Set up storage options + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create tracking namespace + TrackingNamespace namespace = new TrackingNamespace(BUCKET_NAME, storageOptions, 60); + String tableName = UUID.randomUUID().toString(); + + Schema schema = + new Schema( + Arrays.asList( + new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); + + // Create empty table via namespace + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(Arrays.asList(tableName)); + CreateEmptyTableResponse response = namespace.createEmptyTable(request); + + String tableUri = response.getLocation(); + Map namespaceStorageOptions = response.getStorageOptions(); + + // Merge storage options + Map mergedOptions = new HashMap<>(storageOptions); + if (namespaceStorageOptions != null) { + mergedOptions.putAll(namespaceStorageOptions); + } + + // Create storage options provider + LanceNamespaceStorageOptionsProvider provider = + new LanceNamespaceStorageOptionsProvider(namespace, Arrays.asList(tableName)); + + // First, write some initial data using Fragment.create and commit + WriteParams writeParams = new WriteParams.Builder().withStorageOptions(mergedOptions).build(); + + List initialFragments; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + org.apache.arrow.vector.VarCharVector nameVector = + (org.apache.arrow.vector.VarCharVector) root.getVector("name"); + + idVector.allocateNew(2); + nameVector.allocateNew(2); + + idVector.set(0, 1); + nameVector.setSafe(0, "Alice".getBytes()); + idVector.set(1, 2); + nameVector.setSafe(1, "Bob".getBytes()); + + idVector.setValueCount(2); + nameVector.setValueCount(2); + root.setRowCount(2); + + initialFragments = Fragment.create(tableUri, allocator, root, writeParams, provider); + } + + // Commit initial fragments + FragmentOperation.Overwrite overwriteOp = + new FragmentOperation.Overwrite(initialFragments, schema); + try (Dataset dataset = + Dataset.commit(allocator, tableUri, overwriteOp, Optional.empty(), mergedOptions)) { + assertEquals(1, dataset.version()); + assertEquals(2, dataset.countRows()); + } + + // Now test Transaction.commit with provider + // Open dataset with provider using mergedOptions (which has expires_at_millis) + ReadOptions readOptions = + new ReadOptions.Builder() + .setStorageOptions(mergedOptions) + .setStorageOptionsProvider(provider) + .build(); + + try (Dataset datasetWithProvider = Dataset.open(allocator, tableUri, readOptions)) { + // Create more fragments to append + List newFragments; + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + org.apache.arrow.vector.VarCharVector nameVector = + (org.apache.arrow.vector.VarCharVector) root.getVector("name"); + + idVector.allocateNew(2); + nameVector.allocateNew(2); + + idVector.set(0, 3); + nameVector.setSafe(0, "Charlie".getBytes()); + idVector.set(1, 4); + nameVector.setSafe(1, "Diana".getBytes()); + + idVector.setValueCount(2); + nameVector.setValueCount(2); + root.setRowCount(2); + + newFragments = Fragment.create(tableUri, allocator, root, writeParams, provider); + } + + // Create and commit transaction + Append appendOp = Append.builder().fragments(newFragments).build(); + Transaction transaction = + new Transaction.Builder(datasetWithProvider) + .readVersion(datasetWithProvider.version()) + .operation(appendOp) + .build(); + + try (Dataset committedDataset = transaction.commit()) { + assertEquals(2, committedDataset.version()); + assertEquals(4, committedDataset.countRows()); + } + } + + // Verify we can open and read the dataset through namespace + try (Dataset ds = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .build()) { + assertEquals(4, ds.countRows(), "Should have 4 rows total"); + assertEquals(2, ds.listVersions().size(), "Should have 2 versions"); + } + } + } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8f2895537c..992bbd48be 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1581,6 +1581,15 @@ impl Dataset { .and_then(|params| params.storage_options.as_ref()) } + /// Returns the storage options provider used when opening this dataset, if any. + pub fn storage_options_provider( + &self, + ) -> Option> { + self.store_params + .as_ref() + .and_then(|params| params.storage_options_provider.clone()) + } + pub fn data_dir(&self) -> Path { self.base.child(DATA_DIR) }