Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
add native Delta Lake partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Jun 30, 2023
1 parent 6c6420b commit 3da7243
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
supports loading multiple partitions (use `dict[str, pl.DataFrame]` type annotation), ...
- Implemented serialization formats:
- `PolarsParquetIOManager` - for reading and writing files in Apache Parquet format. Supports reading partitioned Parquet datasets (for example, often produced by Spark). All read/write options can be set via metadata values.
- `PolarsDeltaIOManager` - for reading and writing Delta Lake. All read/write options can be set via metadata values.
- `PolarsDeltaIOManager` - for reading and writing Delta Lake. All read/write options can be set via metadata values. `"partition_by"` metadata value can be set to use native Delta Lake partitioning (it's passed to `delta_write_options` of `write_delta`). In this case, all the asset partitions will be stored in the same Delta Table directory. You are responsible for filtering correct partitions when reading the data in the downstream assets.
- `BigQueryPolarsIOManager` - for reading and writing data from/to [BigQuery](https://cloud.google.com/bigquery). Supports writing partitioned tables (`"partition_expr"` input metadata key must be specified). Extra dependencies can be installed with `pip install 'dagster-polars[gcp]'`.

## Quickstart
Expand Down
37 changes: 37 additions & 0 deletions dagster_polars/io_managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InitResourceContext,
InputContext,
MetadataValue,
MultiPartitionKey,
OutputContext,
TableColumn,
TableMetadataValue,
Expand Down Expand Up @@ -189,3 +190,39 @@ def get_storage_options(path: UPath) -> dict:
pass

return storage_options

def get_path_for_partition(self, context: Union[InputContext, OutputContext], path: UPath, partition: str) -> UPath:
"""
Override this method if you want to use a different partitioning scheme
(for example, if the saving function handles partitioning instead).
The extension will be added later.
:param context:
:param path: asset path before partitioning
:param partition: formatted partition key
:return:
"""
return path / partition

def _get_paths_for_partitions(self, context: Union[InputContext, OutputContext]) -> Dict[str, "UPath"]:
"""Returns a dict of partition_keys into I/O paths for a given context."""
if not context.has_asset_partitions:
raise TypeError(
f"Detected {context.dagster_type.typing_type} input type " "but the asset is not partitioned"
)

def _formatted_multipartitioned_path(partition_key: MultiPartitionKey) -> str:
ordered_dimension_keys = [
key[1] for key in sorted(partition_key.keys_by_dimension.items(), key=lambda x: x[0])
]
return "/".join(ordered_dimension_keys)

formatted_partition_keys = [
_formatted_multipartitioned_path(pk) if isinstance(pk, MultiPartitionKey) else pk
for pk in context.asset_partition_keys
]

asset_path = self._get_path_without_extension(context)
return {
partition: self._with_extension(self.get_path_for_partition(context, asset_path, partition))
for partition in formatted_partition_keys
}
42 changes: 34 additions & 8 deletions dagster_polars/io_managers/delta.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from pprint import pformat
from typing import Union

import polars as pl
from dagster import InputContext, OutputContext
from deltalake import DeltaTable
Expand All @@ -16,30 +19,53 @@ class PolarsDeltaIOManager(BasePolarsUPathIOManager):
All read/write arguments can be passed via corresponding metadata values."""
)

def get_path_for_partition(self, context: Union[InputContext, OutputContext], path: UPath, partition: str) -> UPath:
if isinstance(context, InputContext):
if (
context.upstream_output is not None
and context.upstream_output.metadata is not None
and context.upstream_output.metadata.get("partition_by") is not None
):
# upstream asset has "partition_by" metadata set, so partitioning for it is handled by DeltaLake itself
return path

if isinstance(context, OutputContext):
if context.metadata is not None and context.metadata.get("partition_by") is not None:
# this asset has "partition_by" metadata set, so partitioning for it is handled by DeltaLake itself
return path

return path / partition # partitioning is handled by the IOManager

def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath):
assert context.metadata is not None

delta_write_options = context.metadata.get("delta_write_options")

if context.has_asset_partitions:
delta_write_options = delta_write_options or {}
partition_by = context.metadata.get("partition_by")

if partition_by is not None:
delta_write_options["partition_by"] = partition_by

if delta_write_options is not None:
context.log.debug(f"Writing with delta_write_options: {pformat(delta_write_options)}")

storage_options = self.get_storage_options(path)

df.write_delta(
str(path),
mode=context.metadata.get("mode", "overwrite"), # type: ignore
overwrite_schema=context.metadata.get("overwrite_schema", False),
storage_options=storage_options,
delta_write_options=context.metadata.get("delta_write_options"),
delta_write_options=delta_write_options,
)
table = DeltaTable(str(path), storage_options=storage_options)
context.add_output_metadata({"version": table.version()})

def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
assert context.metadata is not None

storage_options = {}

try:
storage_options.update(path._kwargs.copy())
except AttributeError:
pass

return pl.scan_delta(
str(path),
version=context.metadata.get("version"),
Expand Down
16 changes: 13 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def session_polars_delta_io_manager(session_scoped_dagster_instance: DagsterInst
return PolarsDeltaIOManager(base_dir=session_scoped_dagster_instance.storage_directory()) # to use with hypothesis


df_for_parquet = pl.DataFrame(
_df_for_parquet = pl.DataFrame(
{
"1": [0, 1, None],
"2": [0.0, 1.0, None],
Expand All @@ -59,9 +59,19 @@ def session_polars_delta_io_manager(session_scoped_dagster_instance: DagsterInst
)


@pytest_cases.fixture(scope="session")
def df_for_parquet() -> pl.DataFrame:
return _df_for_parquet


@pytest_cases.fixture(scope="session")
def df_for_delta() -> pl.DataFrame:
return _df_for_delta


# delta doesn't support Duration
# TODO: add timedeltas when supported
df_for_delta = pl.DataFrame(
_df_for_delta = pl.DataFrame(
{
"1": [0, 1, None],
"2": [0.0, 1.0, None],
Expand All @@ -76,7 +86,7 @@ def session_polars_delta_io_manager(session_scoped_dagster_instance: DagsterInst

@pytest_cases.fixture
@pytest_cases.parametrize(
"class_and_df", [(PolarsParquetIOManager, df_for_parquet), (PolarsDeltaIOManager, df_for_delta)]
"class_and_df", [(PolarsParquetIOManager, _df_for_parquet), (PolarsDeltaIOManager, _df_for_delta)]
)
def io_manager_and_df( # to use without hypothesis
class_and_df: Tuple[Type[BasePolarsUPathIOManager], pl.DataFrame], dagster_instance: DagsterInstance
Expand Down
42 changes: 41 additions & 1 deletion tests/test_polars_delta.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import shutil
from typing import Dict

import polars as pl
import polars.testing as pl_testing
from dagster import asset, materialize
from dagster import OpExecutionContext, StaticPartitionsDefinition, asset, materialize
from deltalake import DeltaTable
from hypothesis import given, settings
from polars.testing.parametric import dataframes

Expand Down Expand Up @@ -118,3 +120,41 @@ def overwrite_schema_asset() -> pl.DataFrame:
),
pl.read_delta(saved_path),
)


def test_polars_delta_native_partitioning(polars_delta_io_manager: PolarsDeltaIOManager, df_for_delta: pl.DataFrame):
manager = polars_delta_io_manager
df = df_for_delta

partitions_def = StaticPartitionsDefinition(["a", "b"])

@asset(io_manager_def=manager, partitions_def=partitions_def, metadata={"partition_by": "partition"})
def upstream_partitioned(context: OpExecutionContext) -> pl.DataFrame:
return df.with_columns(pl.lit(context.partition_key).alias("partition"))

@asset(io_manager_def=manager)
def downstream_load_multiple_partitions(upstream_partitioned: Dict[str, pl.LazyFrame]) -> None:
for _df in upstream_partitioned.values():
assert isinstance(_df, pl.LazyFrame), type(_df)
assert set(upstream_partitioned.keys()) == {"a", "b"}, upstream_partitioned.keys()

for partition_key in ["a", "b"]:
result = materialize(
[upstream_partitioned],
partition_key=partition_key,
)

handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.events_for_node("upstream_partitioned"))
)
saved_path = handled_output_events[0].event_specific_data.metadata["path"].value # type: ignore
assert isinstance(saved_path, str)
assert saved_path.endswith("upstream_partitioned.delta"), saved_path # DeltaLake should handle partitioning!
assert DeltaTable(saved_path).metadata().partition_columns == ["partition"]

materialize(
[
upstream_partitioned.to_source_asset(),
downstream_load_multiple_partitions,
],
)
4 changes: 3 additions & 1 deletion tests/test_polars_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# list(pl.TEMPORAL_DTYPES | pl.FLOAT_DTYPES | pl.INTEGER_DTYPES) + [pl.Boolean, pl.Utf8]]
@given(df=dataframes(excluded_dtypes=[pl.Categorical], min_size=5))
@settings(max_examples=100, deadline=None)
def test_polars_parquet_io_manager(session_polars_parquet_io_manager: PolarsParquetIOManager, df: pl.DataFrame):
def test_polars_parquet_io_manager_read_write(
session_polars_parquet_io_manager: PolarsParquetIOManager, df: pl.DataFrame
):
@asset(io_manager_def=session_polars_parquet_io_manager)
def upstream() -> pl.DataFrame:
return df
Expand Down

0 comments on commit 3da7243

Please sign in to comment.