Skip to content

Support Azure FileSystem with Pyarrow FileIO #2112

@NikitaMatskevich

Description

@NikitaMatskevich

Feature Request / Improvement

PyArrow is the default IO for Pyiceberg catalogs. In Azure environment it handles wider spectrum of auth strategies then Fsspec, including, for instance, Managed Identities. Also, prior to this PR #1663 (that is not merged yet) there was no support for wasb(s) with Fsspec.

Direct impact on Pyiceberg users:

  • Pyiceberg is usable in services with Managed Identities auth strategy.
  • Pyiceberg is usable with wasb(s) schemes in Azure.

Right now the only way to use Managed identities with pyiceberg is extremely hacky:

import os
from fsspec import AbstractFileSystem
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.catalog.rest import RestCatalog
from typing import Any

ADLS_ANON = "adls.anon"
ADLS_CONNECTION_STRING = "adls.connection-string"
ADLS_ACCOUNT_NAME = "adls.account-name"
ADLS_ACCOUNT_KEY = "adls.account-key"
ADLS_SAS_TOKEN = "adls.sas-token"
ADLS_TENANT_ID = "adls.tenant-id"
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"

Properties = dict[str, Any]

def my_adls(properties: Properties) -> AbstractFileSystem:
    from adlfs import AzureBlobFileSystem

    for key, sas_token in {
        key.replace(f"{ADLS_SAS_TOKEN}.", ""): value for key, value in properties.items() if key.startswith(ADLS_SAS_TOKEN)
    }.items():
        if ADLS_ACCOUNT_NAME not in properties:
            properties[ADLS_ACCOUNT_NAME] = key.split(".")[0]
        if ADLS_SAS_TOKEN not in properties:
            properties[ADLS_SAS_TOKEN] = sas_token

    return AzureBlobFileSystem(
        connection_string=properties.get(ADLS_CONNECTION_STRING),
        anon=properties.get(ADLS_ANON),
        account_name=properties.get(ADLS_ACCOUNT_NAME),
        account_key=properties.get(ADLS_ACCOUNT_KEY),
        sas_token=properties.get(ADLS_SAS_TOKEN),
        tenant_id=properties.get(ADLS_TENANT_ID),
        client_id=properties.get(ADLS_CLIENT_ID),
        client_secret=properties.get(ADLS_CLIENT_SECRET),
        account_host=properties.get(ADLS_ACCOUNT_HOST),
    )

injected_file_io = FsspecFileIO(properties={ADLS_ANON: False, ADLS_ACCOUNT_NAME: "my-account"})
injected_file_io.get_fs = lambda scheme: my_adls(injected_file_io.properties)

catalog = RestCatalog(
    name="test_catalog",
    uri="https://my-url/internal/catalog",
    properties={
        "io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
    }
)
catalog.file_io = injected_file_io

table = catalog.load_table("test.my_test_table")
table.io = injected_file_io
table.scan(limit=100).to_pandas()

As you can see, at least the "anon" flag must be passed to AzureBlobFileSystem, which is not currently done by Pyiceberg. Also, IO must be injected. With this PR it can be reduced to normal workflow:

catalog = load_catalog(uri="https://my-url/internal/catalog")
table = catalog.load_table("test.my_test_table")
table.scan(limit=100).to_pandas()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions