Skip to content

Commit b7c8393

Browse files
authored
feat: support credentials vending for file reader and session (#5256)
Allow user to directly create a LanceFileReader or LanceFileSession with storage options provider to refresh vended credentials. We also add a few methods in LanceFileSession for generic object store usage, these are more as workarounds because there is no practical solutions today for an IO library with pluggable credentials provider. The long term plan is to use OpenDAL for related workloads.
1 parent 2c2d8bb commit b7c8393

File tree

9 files changed

+670
-87
lines changed

9 files changed

+670
-87
lines changed

python/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ test:
99
.PHONY: test
1010

1111
integtest:
12-
pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py
12+
pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py python/tests/test_namespace_integration.py
1313
.PHONY: integtest
1414

1515
doctest:

python/python/lance/dataset.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5133,6 +5133,7 @@ def write_dataset(
51335133
namespace: Optional[LanceNamespace] = None,
51345134
table_id: Optional[List[str]] = None,
51355135
ignore_namespace_table_storage_options: bool = False,
5136+
s3_credentials_refresh_offset_seconds: Optional[int] = None,
51365137
) -> LanceDataset:
51375138
"""Write a given data_obj to the given uri
51385139
@@ -5240,6 +5241,13 @@ def write_dataset(
52405241
not be created, so credentials will not be automatically refreshed.
52415242
This is useful when you want to use your own credentials instead of the
52425243
namespace-provided credentials.
5244+
s3_credentials_refresh_offset_seconds : optional, int
5245+
The number of seconds before credential expiration to trigger a refresh.
5246+
Default is 60 seconds. Only applicable when using AWS S3 with temporary
5247+
credentials. For example, if set to 60, credentials will be refreshed
5248+
when they have less than 60 seconds remaining before expiration. This
5249+
should be set shorter than the credential lifetime to avoid using
5250+
expired credentials.
52435251
52445252
Notes
52455253
-----
@@ -5368,6 +5376,12 @@ def write_dataset(
53685376
if storage_options_provider is not None:
53695377
params["storage_options_provider"] = storage_options_provider
53705378

5379+
# Add s3_credentials_refresh_offset_seconds if specified
5380+
if s3_credentials_refresh_offset_seconds is not None:
5381+
params["s3_credentials_refresh_offset_seconds"] = (
5382+
s3_credentials_refresh_offset_seconds
5383+
)
5384+
53715385
if commit_lock:
53725386
if not callable(commit_lock):
53735387
raise TypeError(f"commit_lock must be a function, got {type(commit_lock)}")

python/python/lance/file.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import pyarrow as pa
88

9+
from .io import StorageOptionsProvider
910
from .lance import (
1011
LanceBufferDescriptor,
1112
LanceColumnMetadata,
@@ -66,6 +67,8 @@ def __init__(
6667
storage_options: Optional[Dict[str, str]] = None,
6768
columns: Optional[List[str]] = None,
6869
*,
70+
storage_options_provider: Optional[StorageOptionsProvider] = None,
71+
s3_credentials_refresh_offset_seconds: Optional[int] = None,
6972
_inner_reader: Optional[_LanceFileReader] = None,
7073
):
7174
"""
@@ -80,6 +83,12 @@ def __init__(
8083
storage_options : optional, dict
8184
Extra options to be used for a particular storage connection. This is
8285
used to store connection parameters like credentials, endpoint, etc.
86+
storage_options_provider : optional
87+
A provider that can provide storage options dynamically. This is useful
88+
for credentials that need to be refreshed or vended on-demand.
89+
s3_credentials_refresh_offset_seconds : optional, int
90+
How early (in seconds) before expiration to refresh S3 credentials.
91+
Default is 60 seconds. Only applies when using storage_options_provider.
8392
columns: list of str, default None
8493
List of column names to be fetched.
8594
All columns are fetched if None or unspecified.
@@ -90,7 +99,11 @@ def __init__(
9099
if isinstance(path, Path):
91100
path = str(path)
92101
self._reader = _LanceFileReader(
93-
path, storage_options=storage_options, columns=columns
102+
path,
103+
storage_options=storage_options,
104+
storage_options_provider=storage_options_provider,
105+
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
106+
columns=columns,
94107
)
95108

96109
def read_all(self, *, batch_size: int = 1024, batch_readahead=16) -> ReaderResults:
@@ -202,7 +215,11 @@ class LanceFileSession:
202215
"""
203216

204217
def __init__(
205-
self, base_path: str, storage_options: Optional[Dict[str, str]] = None
218+
self,
219+
base_path: str,
220+
storage_options: Optional[Dict[str, str]] = None,
221+
storage_options_provider: Optional[StorageOptionsProvider] = None,
222+
s3_credentials_refresh_offset_seconds: Optional[int] = None,
206223
):
207224
"""
208225
Creates a new file session
@@ -216,10 +233,21 @@ def __init__(
216233
storage_options : optional, dict
217234
Extra options to be used for a particular storage connection. This is
218235
used to store connection parameters like credentials, endpoint, etc.
236+
storage_options_provider : optional
237+
A provider that can provide storage options dynamically. This is useful
238+
for credentials that need to be refreshed or vended on-demand.
239+
s3_credentials_refresh_offset_seconds : optional, int
240+
How early (in seconds) before expiration to refresh S3 credentials.
241+
Default is 60 seconds. Only applies when using storage_options_provider.
219242
"""
220243
if isinstance(base_path, Path):
221244
base_path = str(base_path)
222-
self._session = _LanceFileSession(base_path, storage_options=storage_options)
245+
self._session = _LanceFileSession(
246+
base_path,
247+
storage_options=storage_options,
248+
storage_options_provider=storage_options_provider,
249+
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
250+
)
223251

224252
def open_reader(
225253
self, path: str, columns: Optional[List[str]] = None
@@ -281,6 +309,39 @@ def open_writer(
281309
_inner_writer=inner,
282310
)
283311

312+
def contains(self, path: str) -> bool:
313+
"""
314+
Check if a file exists at the given path (relative to this session's base path).
315+
316+
Parameters
317+
----------
318+
path : str
319+
Path relative to `base_path` to check for existence.
320+
321+
Returns
322+
-------
323+
bool
324+
True if the file exists, False otherwise.
325+
"""
326+
return self._session.contains(path)
327+
328+
def list(self, path: Optional[str] = None) -> List[str]:
329+
"""
330+
List all files at the given path (relative to this session's base path).
331+
332+
Parameters
333+
----------
334+
path : str, optional
335+
Path relative to `base_path` to list files from. If None, lists files
336+
from the base path.
337+
338+
Returns
339+
-------
340+
List[str]
341+
List of file paths.
342+
"""
343+
return self._session.list(path)
344+
284345

285346
class LanceFileWriter:
286347
"""
@@ -299,7 +360,8 @@ def __init__(
299360
data_cache_bytes: Optional[int] = None,
300361
version: Optional[str] = None,
301362
storage_options: Optional[Dict[str, str]] = None,
302-
storage_options_provider=None,
363+
storage_options_provider: Optional[StorageOptionsProvider] = None,
364+
s3_credentials_refresh_offset_seconds: Optional[int] = None,
303365
max_page_bytes: Optional[int] = None,
304366
_inner_writer: Optional[_LanceFileWriter] = None,
305367
**kwargs,
@@ -330,6 +392,9 @@ def __init__(
330392
A storage options provider that can fetch and refresh storage options
331393
dynamically. This is useful for credentials that expire and need to be
332394
refreshed automatically.
395+
s3_credentials_refresh_offset_seconds : optional, int
396+
How early (in seconds) before expiration to refresh S3 credentials.
397+
Default is 60 seconds. Only applies when using storage_options_provider.
333398
max_page_bytes : optional, int
334399
The maximum size of a page in bytes, if a single array would create a
335400
page larger than this then it will be split into multiple pages. The
@@ -347,6 +412,7 @@ def __init__(
347412
version=version,
348413
storage_options=storage_options,
349414
storage_options_provider=storage_options_provider,
415+
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
350416
max_page_bytes=max_page_bytes,
351417
**kwargs,
352418
)

python/python/lance/lance/__init__.pyi

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class LanceFileWriter:
102102
version: Optional[str],
103103
storage_options: Optional[Dict[str, str]],
104104
storage_options_provider: Optional[StorageOptionsProvider],
105+
s3_credentials_refresh_offset_seconds: Optional[int],
105106
keep_original_array: Optional[bool],
106107
max_page_bytes: Optional[int],
107108
): ...
@@ -112,7 +113,11 @@ class LanceFileWriter:
112113

113114
class LanceFileSession:
114115
def __init__(
115-
self, base_path: str, storage_options: Optional[Dict[str, str]] = None
116+
self,
117+
base_path: str,
118+
storage_options: Optional[Dict[str, str]] = None,
119+
storage_options_provider: Optional[StorageOptionsProvider] = None,
120+
s3_credentials_refresh_offset_seconds: Optional[int] = None,
116121
): ...
117122
def open_reader(
118123
self, path: str, columns: Optional[List[str]] = None
@@ -126,12 +131,16 @@ class LanceFileSession:
126131
keep_original_array: Optional[bool] = None,
127132
max_page_bytes: Optional[int] = None,
128133
) -> LanceFileWriter: ...
134+
def contains(self, path: str) -> bool: ...
135+
def list(self, path: Optional[str] = None) -> List[str]: ...
129136

130137
class LanceFileReader:
131138
def __init__(
132139
self,
133140
path: str,
134141
storage_options: Optional[Dict[str, str]],
142+
storage_options_provider: Optional[StorageOptionsProvider],
143+
s3_credentials_refresh_offset_seconds: Optional[int],
135144
columns: Optional[List[str]] = None,
136145
): ...
137146
def read_all(

python/python/tests/test_file.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,3 +648,112 @@ def write_thread_data(thread_id, writer, num_records):
648648
pc.equal(result_table.column("thread_id"), thread_id)
649649
)
650650
assert thread_rows.num_rows == records_per_thread
651+
652+
653+
def test_session_list_all_files(tmp_path):
654+
"""Test that LanceFileSession.list() returns all files with relative paths"""
655+
session = LanceFileSession(str(tmp_path))
656+
schema = pa.schema([pa.field("x", pa.int64())])
657+
658+
# Write files at different levels
659+
with session.open_writer("file1.lance", schema=schema) as writer:
660+
writer.write_batch(pa.table({"x": [1]}))
661+
662+
with session.open_writer("file2.lance", schema=schema) as writer:
663+
writer.write_batch(pa.table({"x": [2]}))
664+
665+
with session.open_writer("subdir/file3.lance", schema=schema) as writer:
666+
writer.write_batch(pa.table({"x": [3]}))
667+
668+
with session.open_writer("subdir/file4.lance", schema=schema) as writer:
669+
writer.write_batch(pa.table({"x": [4]}))
670+
671+
with session.open_writer("other/file5.lance", schema=schema) as writer:
672+
writer.write_batch(pa.table({"x": [5]}))
673+
674+
# List all files
675+
files = sorted(session.list())
676+
677+
# Verify relative paths (no absolute paths)
678+
assert files == [
679+
"file1.lance",
680+
"file2.lance",
681+
"other/file5.lance",
682+
"subdir/file3.lance",
683+
"subdir/file4.lance",
684+
]
685+
686+
# Verify no absolute paths
687+
for f in files:
688+
assert not f.startswith("/")
689+
assert str(tmp_path) not in f
690+
691+
692+
def test_session_list_with_prefix(tmp_path):
693+
"""Test that LanceFileSession.list() filters by prefix correctly"""
694+
session = LanceFileSession(str(tmp_path))
695+
schema = pa.schema([pa.field("x", pa.int64())])
696+
697+
# Write files in different directories
698+
with session.open_writer("file1.lance", schema=schema) as writer:
699+
writer.write_batch(pa.table({"x": [1]}))
700+
701+
with session.open_writer("subdir/file2.lance", schema=schema) as writer:
702+
writer.write_batch(pa.table({"x": [2]}))
703+
704+
with session.open_writer("subdir/file3.lance", schema=schema) as writer:
705+
writer.write_batch(pa.table({"x": [3]}))
706+
707+
with session.open_writer("other/file4.lance", schema=schema) as writer:
708+
writer.write_batch(pa.table({"x": [4]}))
709+
710+
# List with prefix "subdir"
711+
subdir_files = sorted(session.list("subdir"))
712+
assert subdir_files == ["subdir/file2.lance", "subdir/file3.lance"]
713+
714+
# List with prefix "other"
715+
other_files = sorted(session.list("other"))
716+
assert other_files == ["other/file4.lance"]
717+
718+
# List with non-existent prefix
719+
empty = session.list("nonexistent")
720+
assert empty == []
721+
722+
723+
def test_session_list_with_trailing_slash(tmp_path):
724+
"""Test that LanceFileSession.list() handles trailing slashes correctly"""
725+
session = LanceFileSession(str(tmp_path))
726+
schema = pa.schema([pa.field("x", pa.int64())])
727+
728+
with session.open_writer("dir/file.lance", schema=schema) as writer:
729+
writer.write_batch(pa.table({"x": [1]}))
730+
731+
# Both with and without trailing slash should work
732+
files_no_slash = session.list("dir")
733+
files_with_slash = session.list("dir/")
734+
735+
assert files_no_slash == files_with_slash
736+
assert files_no_slash == ["dir/file.lance"]
737+
738+
739+
def test_session_contains(tmp_path):
740+
"""Test that LanceFileSession.contains() works correctly"""
741+
session = LanceFileSession(str(tmp_path))
742+
schema = pa.schema([pa.field("x", pa.int64())])
743+
744+
# File doesn't exist yet
745+
assert not session.contains("test.lance")
746+
747+
# Write a file
748+
with session.open_writer("test.lance", schema=schema) as writer:
749+
writer.write_batch(pa.table({"x": [1]}))
750+
751+
# File exists now
752+
assert session.contains("test.lance")
753+
754+
# Nested file
755+
with session.open_writer("subdir/nested.lance", schema=schema) as writer:
756+
writer.write_batch(pa.table({"x": [2]}))
757+
758+
assert session.contains("subdir/nested.lance")
759+
assert not session.contains("subdir/nonexistent.lance")

0 commit comments

Comments
 (0)