Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
🐛 properly overwrite DeltaLake partitions (#50)
Browse files Browse the repository at this point in the history
Previously they have been overwriting the whole table
  • Loading branch information
danielgafni authored Jan 17, 2024
1 parent aebcc34 commit a902ebd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
3 changes: 3 additions & 0 deletions dagster_polars/io_managers/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ def dump_df_to_path(
partition_by = context.metadata.get("partition_by")

if partition_by is not None:
assert context.partition_key is not None, 'can\'t set "partition_by" for an asset without partitions'

delta_write_options["partition_by"] = partition_by
delta_write_options["partition_filters"] = [(partition_by, "=", context.partition_key)]

if delta_write_options is not None:
context.log.debug(f"Writing with delta_write_options: {pformat(delta_write_options)}")
Expand Down
19 changes: 15 additions & 4 deletions tests/test_polars_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,35 @@ def test_polars_delta_native_partitioning(polars_delta_io_manager: PolarsDeltaIO
def upstream_partitioned(context: OpExecutionContext) -> pl.DataFrame:
return df.with_columns(pl.lit(context.partition_key).alias("partition"))

lenghts = {}

@asset(io_manager_def=manager)
def downstream_load_multiple_partitions(upstream_partitioned: Dict[str, pl.LazyFrame]) -> None:
for partition, _df in upstream_partitioned.items():
assert isinstance(_df, pl.LazyFrame), type(_df)
assert (_df.collect().select(pl.col("partition").eq(partition).alias("eq")))["eq"].all()
for partition, _ldf in upstream_partitioned.items():
assert isinstance(_ldf, pl.LazyFrame), type(_ldf)
_df = _ldf.collect()
assert (_df.select(pl.col("partition").eq(partition).alias("eq")))["eq"].all()
lenghts[partition] = len(_df)

assert set(upstream_partitioned.keys()) == {"a", "b"}, upstream_partitioned.keys()

saved_path = None

for partition_key in ["a", "b"]:
result = materialize(
[upstream_partitioned],
partition_key=partition_key,
)

saved_path = get_saved_path(result, "upstream_partitioned")
assert saved_path.endswith("upstream_partitioned.delta"), saved_path # DeltaLake should handle partitioning!
assert DeltaTable(saved_path).metadata().partition_columns == ["partition"]

assert saved_path is not None
written_df = pl.read_delta(saved_path)

assert len(written_df) == len(df) * 2
assert set(written_df["partition"].unique()) == {"a", "b"}

materialize(
[
upstream_partitioned.to_source_asset(),
Expand Down

0 comments on commit a902ebd

Please sign in to comment.