Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
⭐ add PolarsDeltaIOManager (#7)
Browse files Browse the repository at this point in the history
* ⭐ add `PolarsDeltaIOManager`
* 🗑️ remove legacy `polars_parquet_io_manager`
  • Loading branch information
danielgafni committed Jun 30, 2023
1 parent 53225ee commit 17cdb3a
Show file tree
Hide file tree
Showing 14 changed files with 672 additions and 204 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:
virtualenvs-in-project: false
installer-parallel: true
- name: Install dependencies
run: poetry install --all-extras --sync && pip install polars~=${{ matrix.polars_version }}
run: poetry install --all-extras --sync && pip install --ignore-installed polars~=${{ matrix.polars_version }}
- name: Print polars info
run: python -c 'import polars; print(polars.show_versions())'
- name: Run tests
run: pytest -v .

Expand Down Expand Up @@ -70,6 +72,6 @@ jobs:
virtualenvs-in-project: false
installer-parallel: true
- name: Install dependencies
run: poetry install --all-extras --sync && pip install polars~=${{ matrix.polars_version }}
run: poetry install --all-extras --sync && pip install --ignore-installed polars~=${{ matrix.polars_version }}
- name: Run pre-commit hooks
run: pre-commit run --all-files
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
- inherits all the features of the `UPathIOManager` - works with local and remote filesystems (like S3),
supports loading multiple partitions (use `dict[str, pl.DataFrame]` type annotation), ...
- Implemented serialization formats:
- `PolarsParquetIOManager` - for reading and writing files in Apache Parquet format. Supports reading partitioned Parquet datasets (for example, often produced by Spark).
- `BigQueryPolarsIOManager` - for reading and writing data from/to [BigQuery](https://cloud.google.com/bigquery). Supports writing partitioned tables (`"partition_expr"` input metadata key must be specified).
- `PolarsParquetIOManager` - for reading and writing files in Apache Parquet format. Supports reading partitioned Parquet datasets (for example, often produced by Spark). All read/write options can be set via metadata values.
- `PolarsDeltaIOManager` - for reading and writing Delta Lake. All read/write options can be set via metadata values. `"partition_by"` metadata value can be set to use native Delta Lake partitioning (it's passed to `delta_write_options` of `write_delta`). In this case, all the asset partitions will be stored in the same Delta Table directory. You are responsible for filtering correct partitions when reading the data in the downstream assets. Extra dependencies can be installed with `pip install 'dagster-polars[deltalake]'`.
- `BigQueryPolarsIOManager` - for reading and writing data from/to [BigQuery](https://cloud.google.com/bigquery). Supports writing partitioned tables (`"partition_expr"` input metadata key must be specified). Extra dependencies can be installed with `pip install 'dagster-polars[gcp]'`.

## Quickstart

Expand Down Expand Up @@ -67,7 +68,6 @@ poetry run pre-commit install
poetry run pytest
```

## TODO
- [ ] Add `PolarsDeltaIOManager`
- [ ] Data validation like in [dagster-pandas](https://docs.dagster.io/integrations/pandas#validating-pandas-dataframes-with-dagster-types)
- [ ] Maybe use `DagsterTypeLoader` ?
## Ideas
- Data validation like in [dagster-pandas](https://docs.dagster.io/integrations/pandas#validating-pandas-dataframes-with-dagster-types)
- Maybe use `DagsterTypeLoader` ?
5 changes: 3 additions & 2 deletions dagster_polars/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster_polars._version import __version__
from dagster_polars.io_managers.base import BasePolarsUPathIOManager
from dagster_polars.io_managers.parquet import PolarsParquetIOManager, polars_parquet_io_manager
from dagster_polars.io_managers.delta import PolarsDeltaIOManager
from dagster_polars.io_managers.parquet import PolarsParquetIOManager

__all__ = ["PolarsParquetIOManager", "BasePolarsUPathIOManager", "polars_parquet_io_manager", "__version__"]
__all__ = ["PolarsParquetIOManager", "PolarsDeltaIOManager", "BasePolarsUPathIOManager", "__version__"]
48 changes: 48 additions & 0 deletions dagster_polars/io_managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InitResourceContext,
InputContext,
MetadataValue,
MultiPartitionKey,
OutputContext,
TableColumn,
TableMetadataValue,
Expand Down Expand Up @@ -178,3 +179,50 @@ def load_from_path(self, path: UPath, context: InputContext) -> Union[pl.DataFra

def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, MetadataValue]:
return get_polars_metadata(context, obj)

@staticmethod
def get_storage_options(path: UPath) -> dict:
storage_options = {}

try:
storage_options.update(path._kwargs.copy())
except AttributeError:
pass

return storage_options

def get_path_for_partition(self, context: Union[InputContext, OutputContext], path: UPath, partition: str) -> UPath:
"""
Override this method if you want to use a different partitioning scheme
(for example, if the saving function handles partitioning instead).
The extension will be added later.
:param context:
:param path: asset path before partitioning
:param partition: formatted partition key
:return:
"""
return path / partition

def _get_paths_for_partitions(self, context: Union[InputContext, OutputContext]) -> Dict[str, "UPath"]:
"""Returns a dict of partition_keys into I/O paths for a given context."""
if not context.has_asset_partitions:
raise TypeError(
f"Detected {context.dagster_type.typing_type} input type " "but the asset is not partitioned"
)

def _formatted_multipartitioned_path(partition_key: MultiPartitionKey) -> str:
ordered_dimension_keys = [
key[1] for key in sorted(partition_key.keys_by_dimension.items(), key=lambda x: x[0])
]
return "/".join(ordered_dimension_keys)

formatted_partition_keys = [
_formatted_multipartitioned_path(pk) if isinstance(pk, MultiPartitionKey) else pk
for pk in context.asset_partition_keys
]

asset_path = self._get_path_without_extension(context)
return {
partition: self._with_extension(self.get_path_for_partition(context, asset_path, partition))
for partition in formatted_partition_keys
}
75 changes: 75 additions & 0 deletions dagster_polars/io_managers/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from pprint import pformat
from typing import Union

import polars as pl
from dagster import InputContext, OutputContext
from deltalake import DeltaTable
from upath import UPath

from dagster_polars.io_managers.base import BasePolarsUPathIOManager


class PolarsDeltaIOManager(BasePolarsUPathIOManager):
extension: str = ".delta"

assert BasePolarsUPathIOManager.__doc__ is not None
__doc__ = (
BasePolarsUPathIOManager.__doc__
+ """\nWorks with Delta files.
All read/write arguments can be passed via corresponding metadata values."""
)

def get_path_for_partition(self, context: Union[InputContext, OutputContext], path: UPath, partition: str) -> UPath:
if isinstance(context, InputContext):
if (
context.upstream_output is not None
and context.upstream_output.metadata is not None
and context.upstream_output.metadata.get("partition_by") is not None
):
# upstream asset has "partition_by" metadata set, so partitioning for it is handled by DeltaLake itself
return path

if isinstance(context, OutputContext):
if context.metadata is not None and context.metadata.get("partition_by") is not None:
# this asset has "partition_by" metadata set, so partitioning for it is handled by DeltaLake itself
return path

return path / partition # partitioning is handled by the IOManager

def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath):
assert context.metadata is not None

delta_write_options = context.metadata.get("delta_write_options")

if context.has_asset_partitions:
delta_write_options = delta_write_options or {}
partition_by = context.metadata.get("partition_by")

if partition_by is not None:
delta_write_options["partition_by"] = partition_by

if delta_write_options is not None:
context.log.debug(f"Writing with delta_write_options: {pformat(delta_write_options)}")

storage_options = self.get_storage_options(path)

df.write_delta(
str(path),
mode=context.metadata.get("mode", "overwrite"), # type: ignore
overwrite_schema=context.metadata.get("overwrite_schema", False),
storage_options=storage_options,
delta_write_options=delta_write_options,
)
table = DeltaTable(str(path), storage_options=storage_options)
context.add_output_metadata({"version": table.version()})

def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
assert context.metadata is not None

return pl.scan_delta(
str(path),
version=context.metadata.get("version"),
delta_table_options=context.metadata.get("delta_table_options"),
pyarrow_options=context.metadata.get("pyarrow_options"),
storage_options=self.get_storage_options(path),
)
45 changes: 32 additions & 13 deletions dagster_polars/io_managers/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import fsspec
import polars as pl
import pyarrow.dataset as ds
from dagster import InitResourceContext, InputContext, OutputContext, io_manager
from dagster import InputContext, OutputContext
from upath import UPath

from dagster_polars.io_managers.base import BasePolarsUPathIOManager
Expand All @@ -12,27 +12,46 @@
class PolarsParquetIOManager(BasePolarsUPathIOManager):
extension: str = ".parquet"

__doc__ = BasePolarsUPathIOManager.__doc__ + """\nWorks with Parquet files""" # type: ignore
assert BasePolarsUPathIOManager.__doc__ is not None
__doc__ = (
BasePolarsUPathIOManager.__doc__
+ """\nWorks with Parquet files.
All read/write arguments can be passed via corresponding metadata values."""
)

def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath):
assert context.metadata is not None

with path.open("wb") as file:
df.write_parquet(file)
df.write_parquet(
file,
compression=context.metadata.get("compression", "zstd"),
compression_level=context.metadata.get("compression_level"),
statistics=context.metadata.get("statistics", False),
row_group_size=context.metadata.get("row_group_size"),
use_pyarrow=context.metadata.get("use_pyarrow", False),
pyarrow_options=context.metadata.get("pyarrow_options"),
)

def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
assert context.metadata is not None

fs: Union[fsspec.AbstractFileSystem, None] = None

try:
fs = path._accessor._fs
except AttributeError:
pass

return pl.scan_pyarrow_dataset(ds.dataset(str(path), filesystem=fs))


# old non-pythonic IOManager, you are encouraged to use the `PolarsParquetIOManager` instead
@io_manager(
config_schema=PolarsParquetIOManager.to_config_schema(),
description=PolarsParquetIOManager.__doc__,
)
def polars_parquet_io_manager(context: InitResourceContext):
return PolarsParquetIOManager.from_resource_context(context)
return pl.scan_pyarrow_dataset(
ds.dataset(
str(path),
filesystem=fs,
format=context.metadata.get("format", "parquet"),
partitioning=context.metadata.get("partitioning"),
partition_base_dir=context.metadata.get("partition_base_dir"),
exclude_invalid_files=context.metadata.get("exclude_invalid_files", True),
ignore_prefixes=context.metadata.get("ignore_prefixes", [".", "_"]),
),
allow_pyarrow_filter=context.metadata.get("allow_pyarrow_filter", True),
)
71 changes: 70 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ python = "^3.8"
dagster = "^1.3.5"
polars = ">=0.17.0"
pyarrow = ">=8.0.0"
deltalake = "^0.10.0"
dagster-gcp = "^0.19.5"


[tool.poetry.extras]
gcp = ["dagster-gcp"]

deltalake = ["deltalake"]


[tool.poetry.group.dev.dependencies]
Expand All @@ -50,6 +51,7 @@ tox-gh = "^1.0.0"
pre-commit = "^3.3.2"
dagit = "^1.3.9"
black = "^23.3.0"
pytest-cases = "^3.6.14"

[build-system]
requires = ["poetry-core"]
Expand Down
Loading

0 comments on commit 17cdb3a

Please sign in to comment.