Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dev = [
"pre-commit>=3.3.3",
"pytest>=7.4.0",
"pytest-cov>=4.1.0",
"pytest-asyncio",
"pydoclint",
# for integration tests.
"pandas[parquet]",
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pydantic-core==2.18.4
pydoclint==0.5.3
pyproject-hooks==1.1.0
pytest==8.2.2
pytest-asyncio==0.23.7
pytest-cov==5.0.0
python-dateutil==2.9.0.post0
pytz==2024.1
Expand Down
3 changes: 3 additions & 0 deletions src/lakefs_spec/asyn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .spec import AsyncLakeFSFileSystem

__all__ = ["AsyncLakeFSFileSystem"]
69 changes: 69 additions & 0 deletions src/lakefs_spec/asyn/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any

from fsspec.asyn import AsyncFileSystem

from lakefs_spec import LakeFSFileSystem, LakeFSTransaction
from lakefs_spec.util import async_wrapper


class AsyncLakeFSFileSystem(AsyncFileSystem):
"""Asynchronous wrapper around a LakeFSFileSystem"""

protocol = "lakefs"
transaction_type = LakeFSTransaction

def __init__(
self,
host: str | None = None,
username: str | None = None,
password: str | None = None,
api_key: str | None = None,
api_key_prefix: str | None = None,
access_token: str | None = None,
verify_ssl: bool = True,
ssl_ca_cert: str | None = None,
proxy: str | None = None,
create_branch_ok: bool = True,
source_branch: str = "main",
**storage_options: Any,
):
super().__init__(**storage_options)

self._sync_fs = LakeFSFileSystem(
host,
username,
password,
api_key,
api_key_prefix,
access_token,
verify_ssl,
ssl_ca_cert,
proxy,
create_branch_ok,
source_branch,
**storage_options,
)

async def _rm_file(self, path, **kwargs):
return async_wrapper(self._sync_fs.rm_file)(path)

async def _cp_file(self, path1, path2, **kwargs):
return async_wrapper(self._sync_fs.cp_file)(path1, path2, **kwargs)

async def _pipe_file(self, path, value, **kwargs):
return async_wrapper(self._sync_fs.pipe_file)(path, value, **kwargs)

async def _cat_file(self, path, start=None, end=None, **kwargs):
return async_wrapper(self._sync_fs.cat_file)(path, start, end, **kwargs)

async def _put_file(self, lpath, rpath, **kwargs):
return async_wrapper(self._sync_fs.put_file)(lpath, rpath, **kwargs)

async def _get_file(self, rpath, lpath, **kwargs):
return async_wrapper(self._sync_fs.get_file)(rpath, lpath, **kwargs)

async def _info(self, path, **kwargs):
return async_wrapper(self._sync_fs.info)(path, **kwargs)

async def _ls(self, path, detail=True, **kwargs):
return async_wrapper(self._sync_fs.ls)(path, detail, **kwargs)
20 changes: 19 additions & 1 deletion src/lakefs_spec/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from __future__ import annotations

import asyncio
import functools
import hashlib
import os
import re
from typing import Any, Callable, Generator, Protocol
from typing import Any, Callable, Coroutine, Generator, Protocol, TypeVar

from lakefs_sdk import Pagination
from lakefs_sdk import __version__ as __lakefs_sdk_version__
from typing_extensions import ParamSpec

lakefs_sdk_version = tuple(int(v) for v in __lakefs_sdk_version__.split("."))
del __lakefs_sdk_version__
Expand Down Expand Up @@ -108,3 +111,18 @@ def parse(path: str) -> tuple[str, str, str]:

repo, ref, resource = results.groups()
return repo, ref, resource


P = ParamSpec("P")
T = TypeVar("T")


def async_wrapper(fn: Callable[P, T]) -> Callable[P, Coroutine[None, None, T]]:
"""Wrap a synchronous function in an asyncio coroutine."""

@functools.wraps(fn)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))

return _wrapper
13 changes: 13 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest

from lakefs_spec.util import async_wrapper


@pytest.mark.asyncio
async def test_async_wrapper():
def sync_add(n: int) -> int:
return n + 42

async_add = async_wrapper(sync_add)

assert await async_add(42) == sync_add(42)