Skip to content

Commit 679d96a

Browse files
committed
feat: Add naive async filesystem implementation
The implementation simply wraps the synchronous file system already present in the code in asyncio coroutines.
1 parent 361dbfc commit 679d96a

File tree

6 files changed

+105
-1
lines changed

6 files changed

+105
-1
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ dev = [
5151
"pre-commit>=3.3.3",
5252
"pytest>=7.4.0",
5353
"pytest-cov>=4.1.0",
54+
"pytest-asyncio",
5455
"pydoclint",
5556
# for integration tests.
5657
"pandas[parquet]",

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pydantic-core==2.18.4
2929
pydoclint==0.5.3
3030
pyproject-hooks==1.1.0
3131
pytest==8.2.2
32+
pytest-asyncio==0.23.7
3233
pytest-cov==5.0.0
3334
python-dateutil==2.9.0.post0
3435
pytz==2024.1

src/lakefs_spec/asyn/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .spec import AsyncLakeFSFileSystem
2+
3+
__all__ = ["AsyncLakeFSFileSystem"]

src/lakefs_spec/asyn/spec.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from typing import Any
2+
3+
from fsspec.asyn import AsyncFileSystem
4+
5+
from lakefs_spec import LakeFSFileSystem, LakeFSTransaction
6+
from lakefs_spec.util import async_wrapper
7+
8+
9+
class AsyncLakeFSFileSystem(AsyncFileSystem):
10+
"""Asynchronous wrapper around a LakeFSFileSystem"""
11+
12+
protocol = "lakefs"
13+
transaction_type = LakeFSTransaction
14+
15+
def __init__(
16+
self,
17+
host: str | None = None,
18+
username: str | None = None,
19+
password: str | None = None,
20+
api_key: str | None = None,
21+
api_key_prefix: str | None = None,
22+
access_token: str | None = None,
23+
verify_ssl: bool = True,
24+
ssl_ca_cert: str | None = None,
25+
proxy: str | None = None,
26+
create_branch_ok: bool = True,
27+
source_branch: str = "main",
28+
**storage_options: Any,
29+
):
30+
super().__init__(**storage_options)
31+
32+
self._sync_fs = LakeFSFileSystem(
33+
host,
34+
username,
35+
password,
36+
api_key,
37+
api_key_prefix,
38+
access_token,
39+
verify_ssl,
40+
ssl_ca_cert,
41+
proxy,
42+
create_branch_ok,
43+
source_branch,
44+
**storage_options,
45+
)
46+
47+
async def _rm_file(self, path, **kwargs):
48+
return async_wrapper(self._sync_fs.rm_file)(path)
49+
50+
async def _cp_file(self, path1, path2, **kwargs):
51+
return async_wrapper(self._sync_fs.cp_file)(path1, path2, **kwargs)
52+
53+
async def _pipe_file(self, path, value, **kwargs):
54+
return async_wrapper(self._sync_fs.pipe_file)(path, value, **kwargs)
55+
56+
async def _cat_file(self, path, start=None, end=None, **kwargs):
57+
return async_wrapper(self._sync_fs.cat_file)(path, start, end, **kwargs)
58+
59+
async def _put_file(self, lpath, rpath, **kwargs):
60+
return async_wrapper(self._sync_fs.put_file)(lpath, rpath, **kwargs)
61+
62+
async def _get_file(self, rpath, lpath, **kwargs):
63+
return async_wrapper(self._sync_fs.get_file)(rpath, lpath, **kwargs)
64+
65+
async def _info(self, path, **kwargs):
66+
return async_wrapper(self._sync_fs.info)(path, **kwargs)
67+
68+
async def _ls(self, path, detail=True, **kwargs):
69+
return async_wrapper(self._sync_fs.ls)(path, detail, **kwargs)

src/lakefs_spec/util.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
from __future__ import annotations
66

7+
import asyncio
8+
import functools
79
import hashlib
810
import os
911
import re
10-
from typing import Any, Callable, Generator, Protocol
12+
from typing import Any, Callable, Coroutine, Generator, ParamSpec, Protocol, TypeVar
1113

1214
from lakefs_sdk import Pagination
1315
from lakefs_sdk import __version__ as __lakefs_sdk_version__
@@ -108,3 +110,18 @@ def parse(path: str) -> tuple[str, str, str]:
108110

109111
repo, ref, resource = results.groups()
110112
return repo, ref, resource
113+
114+
115+
P = ParamSpec("P")
116+
T = TypeVar("T")
117+
118+
119+
def async_wrapper(fn: Callable[P, T]) -> Callable[P, Coroutine[None, None, T]]:
120+
"""Wrap a synchronous function in an asyncio coroutine."""
121+
122+
@functools.wraps(fn)
123+
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
124+
loop = asyncio.get_running_loop()
125+
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))
126+
127+
return _wrapper

tests/test_util.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pytest
2+
3+
from lakefs_spec.util import async_wrapper
4+
5+
6+
@pytest.mark.asyncio
7+
async def test_async_wrapper():
8+
def sync_add(n: int) -> int:
9+
return n + 42
10+
11+
async_add = async_wrapper(sync_add)
12+
13+
assert await async_add(42) == sync_add(42)

0 commit comments

Comments
 (0)