Skip to content

Commit 2e8583d

Browse files
authored
feat(java): support credential vending at write time (#5309)
1 parent 40dd02c commit 2e8583d

File tree

14 files changed

+2269
-174
lines changed

14 files changed

+2269
-174
lines changed

.github/workflows/java.yml

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,9 @@ jobs:
4949
matrix:
5050
java-version: [8, 11, 17]
5151
name: Build and Test with Java ${{ matrix.java-version }}
52-
services:
53-
localstack:
54-
image: localstack/localstack:4.0
55-
ports:
56-
- 4566:4566
57-
env:
58-
SERVICES: s3,dynamodb,kms
59-
AWS_ACCESS_KEY_ID: ACCESS_KEY
60-
AWS_SECRET_ACCESS_KEY: SECRET_KEY
61-
options: >-
62-
--health-cmd "curl -s http://localhost:4566/_localstack/health"
63-
--health-interval 5s
64-
--health-timeout 3s
65-
--health-retries 3
66-
--health-start-period 10s
6752
steps:
53+
- name: Checkout repository
54+
uses: actions/checkout@v4
6855
- name: Install dependencies
6956
run: |
7057
sudo apt update
@@ -76,8 +63,6 @@ jobs:
7663
- uses: rui314/setup-mold@v1
7764
- name: Install cargo-llvm-cov
7865
uses: taiki-e/install-action@cargo-llvm-cov
79-
- name: Checkout repository
80-
uses: actions/checkout@v4
8166
- uses: Swatinem/rust-cache@v2
8267
with:
8368
workspaces: java/lance-jni -> ../target/rust-maven-plugin/lance-jni
@@ -91,6 +76,9 @@ jobs:
9176
working-directory: java
9277
run: |
9378
mvn spotless:check
79+
- name: Start localstack
80+
run: |
81+
docker compose -f docker-compose.yml up -d --wait
9482
- name: Running tests with Java ${{ matrix.java-version }}
9583
working-directory: java
9684
env:

java/lance-jni/src/blocking_dataset.rs

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ pub struct BlockingDataset {
5959
}
6060

6161
impl BlockingDataset {
62+
/// Get the storage options provider that was used when opening this dataset
63+
pub fn get_storage_options_provider(&self) -> Option<Arc<dyn StorageOptionsProvider>> {
64+
self.inner.storage_options_provider()
65+
}
66+
6267
pub fn drop(uri: &str, storage_options: HashMap<String, String>) -> Result<()> {
6368
RT.block_on(async move {
6469
let registry = Arc::new(ObjectStoreRegistry::default());
@@ -122,7 +127,7 @@ impl BlockingDataset {
122127
builder = builder.with_version(ver as u64);
123128
}
124129
builder = builder.with_storage_options(storage_options);
125-
if let Some(provider) = storage_options_provider {
130+
if let Some(provider) = storage_options_provider.clone() {
126131
builder = builder.with_storage_options_provider(provider)
127132
}
128133
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
@@ -284,14 +289,11 @@ impl BlockingDataset {
284289
pub fn commit_transaction(
285290
&mut self,
286291
transaction: Transaction,
287-
write_params: HashMap<String, String>,
292+
store_params: ObjectStoreParams,
288293
) -> Result<Self> {
289294
let new_dataset = RT.block_on(
290295
CommitBuilder::new(Arc::new(self.clone().inner))
291-
.with_store_params(ObjectStoreParams {
292-
storage_options: Some(write_params),
293-
..Default::default()
294-
})
296+
.with_store_params(store_params)
295297
.execute(transaction),
296298
)?;
297299
Ok(BlockingDataset { inner: new_dataset })
@@ -334,6 +336,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
334336
enable_stable_row_ids: JObject, // Optional<Boolean>
335337
data_storage_version: JObject, // Optional<String>
336338
storage_options_obj: JObject, // Map<String, String>
339+
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
337340
) -> JObject<'local> {
338341
ok_or_throw!(
339342
env,
@@ -347,7 +350,8 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local
347350
mode,
348351
enable_stable_row_ids,
349352
data_storage_version,
350-
storage_options_obj
353+
storage_options_obj,
354+
s3_credentials_refresh_offset_seconds_obj
351355
)
352356
)
353357
}
@@ -364,6 +368,7 @@ fn inner_create_with_ffi_schema<'local>(
364368
enable_stable_row_ids: JObject, // Optional<Boolean>
365369
data_storage_version: JObject, // Optional<String>
366370
storage_options_obj: JObject, // Map<String, String>
371+
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
367372
) -> Result<JObject<'local>> {
368373
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
369374
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
@@ -380,6 +385,8 @@ fn inner_create_with_ffi_schema<'local>(
380385
enable_stable_row_ids,
381386
data_storage_version,
382387
storage_options_obj,
388+
JObject::null(), // No provider for schema-only creation
389+
s3_credentials_refresh_offset_seconds_obj,
383390
reader,
384391
)
385392
}
@@ -411,6 +418,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
411418
enable_stable_row_ids: JObject, // Optional<Boolean>
412419
data_storage_version: JObject, // Optional<String>
413420
storage_options_obj: JObject, // Map<String, String>
421+
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
414422
) -> JObject<'local> {
415423
ok_or_throw!(
416424
env,
@@ -424,7 +432,44 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStream<'local
424432
mode,
425433
enable_stable_row_ids,
426434
data_storage_version,
427-
storage_options_obj
435+
storage_options_obj,
436+
JObject::null(),
437+
s3_credentials_refresh_offset_seconds_obj
438+
)
439+
)
440+
}
441+
442+
#[no_mangle]
443+
pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiStreamAndProvider<'local>(
444+
mut env: JNIEnv<'local>,
445+
_obj: JObject,
446+
arrow_array_stream_addr: jlong,
447+
path: JString,
448+
max_rows_per_file: JObject, // Optional<Integer>
449+
max_rows_per_group: JObject, // Optional<Integer>
450+
max_bytes_per_file: JObject, // Optional<Long>
451+
mode: JObject, // Optional<String>
452+
enable_stable_row_ids: JObject, // Optional<Boolean>
453+
data_storage_version: JObject, // Optional<String>
454+
storage_options_obj: JObject, // Map<String, String>
455+
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
456+
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
457+
) -> JObject<'local> {
458+
ok_or_throw!(
459+
env,
460+
inner_create_with_ffi_stream(
461+
&mut env,
462+
arrow_array_stream_addr,
463+
path,
464+
max_rows_per_file,
465+
max_rows_per_group,
466+
max_bytes_per_file,
467+
mode,
468+
enable_stable_row_ids,
469+
data_storage_version,
470+
storage_options_obj,
471+
storage_options_provider_obj,
472+
s3_credentials_refresh_offset_seconds_obj
428473
)
429474
)
430475
}
@@ -434,13 +479,15 @@ fn inner_create_with_ffi_stream<'local>(
434479
env: &mut JNIEnv<'local>,
435480
arrow_array_stream_addr: jlong,
436481
path: JString,
437-
max_rows_per_file: JObject, // Optional<Integer>
438-
max_rows_per_group: JObject, // Optional<Integer>
439-
max_bytes_per_file: JObject, // Optional<Long>
440-
mode: JObject, // Optional<String>
441-
enable_stable_row_ids: JObject, // Optional<Boolean>
442-
data_storage_version: JObject, // Optional<String>
443-
storage_options_obj: JObject, // Map<String, String>
482+
max_rows_per_file: JObject, // Optional<Integer>
483+
max_rows_per_group: JObject, // Optional<Integer>
484+
max_bytes_per_file: JObject, // Optional<Long>
485+
mode: JObject, // Optional<String>
486+
enable_stable_row_ids: JObject, // Optional<Boolean>
487+
data_storage_version: JObject, // Optional<String>
488+
storage_options_obj: JObject, // Map<String, String>
489+
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
490+
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
444491
) -> Result<JObject<'local>> {
445492
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
446493
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
@@ -454,6 +501,8 @@ fn inner_create_with_ffi_stream<'local>(
454501
enable_stable_row_ids,
455502
data_storage_version,
456503
storage_options_obj,
504+
storage_options_provider_obj,
505+
s3_credentials_refresh_offset_seconds_obj,
457506
reader,
458507
)
459508
}
@@ -469,6 +518,8 @@ fn create_dataset<'local>(
469518
enable_stable_row_ids: JObject,
470519
data_storage_version: JObject,
471520
storage_options_obj: JObject,
521+
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
522+
s3_credentials_refresh_offset_seconds_obj: JObject,
472523
reader: impl RecordBatchReader + Send + 'static,
473524
) -> Result<JObject<'local>> {
474525
let path_str = path.extract(env)?;
@@ -482,6 +533,8 @@ fn create_dataset<'local>(
482533
&enable_stable_row_ids,
483534
&data_storage_version,
484535
&storage_options_obj,
536+
&storage_options_provider_obj,
537+
&s3_credentials_refresh_offset_seconds_obj,
485538
)?;
486539

487540
let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?;

0 commit comments

Comments
 (0)