Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Jun 30, 2023
1 parent 5418bbf commit 6c6420b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def dagster_instance(tmp_path_factory: TempPathFactory) -> DagsterInstance:
return DagsterInstance.ephemeral(tempdir=str(tmp_path_factory.mktemp("dagster_home")))


@pytest.fixture
def polars_parquet_io_manager(dagster_instance: DagsterInstance) -> PolarsParquetIOManager:
return PolarsParquetIOManager(base_dir=dagster_instance.storage_directory())


@pytest.fixture
def polars_delta_io_manager(dagster_instance: DagsterInstance) -> PolarsDeltaIOManager:
return PolarsDeltaIOManager(base_dir=dagster_instance.storage_directory())


@pytest.fixture(scope="session")
def session_scoped_dagster_instance(tmp_path_factory: TempPathFactory) -> DagsterInstance:
return DagsterInstance.ephemeral(tempdir=str(tmp_path_factory.mktemp("dagster_home_session")))
Expand Down
67 changes: 67 additions & 0 deletions tests/test_polars_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,70 @@ def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
assert isinstance(saved_path, str)
pl_testing.assert_frame_equal(df, pl.read_delta(saved_path))
shutil.rmtree(saved_path) # cleanup manually because of hypothesis


def test_polars_delta_io_manager_append(polars_delta_io_manager: PolarsDeltaIOManager):
df = pl.DataFrame(
{
"a": [1, 2, 3],
}
)

@asset(io_manager_def=polars_delta_io_manager, metadata={"mode": "append"})
def append_asset() -> pl.DataFrame:
return df

result = materialize(
[append_asset],
)

handled_output_events = list(filter(lambda evt: evt.is_handled_output, result.events_for_node("append_asset")))
saved_path = handled_output_events[0].event_specific_data.metadata["path"].value # type: ignore[index,union-attr]
assert isinstance(saved_path, str)

materialize(
[append_asset],
)

pl_testing.assert_frame_equal(pl.concat([df, df]), pl.read_delta(saved_path))


def test_polars_delta_io_manager_overwrite_schema(polars_delta_io_manager: PolarsDeltaIOManager):
@asset(io_manager_def=polars_delta_io_manager)
def overwrite_schema_asset() -> pl.DataFrame: # type: ignore
return pl.DataFrame(
{
"a": [1, 2, 3],
}
)

result = materialize(
[overwrite_schema_asset],
)

handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.events_for_node("overwrite_schema_asset"))
)
saved_path = handled_output_events[0].event_specific_data.metadata["path"].value # type: ignore[index,union-attr]
assert isinstance(saved_path, str)

@asset(io_manager_def=polars_delta_io_manager, metadata={"overwrite_schema": True, "mode": "overwrite"})
def overwrite_schema_asset() -> pl.DataFrame:
return pl.DataFrame(
{
"b": ["1", "2", "3"],
}
)

materialize(
[overwrite_schema_asset],
)

pl_testing.assert_frame_equal(
pl.DataFrame(
{
"b": ["1", "2", "3"],
}
),
pl.read_delta(saved_path),
)
12 changes: 6 additions & 6 deletions tests/test_upath_io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from dagster_polars import BasePolarsUPathIOManager, PolarsDeltaIOManager, PolarsParquetIOManager


def test_polars_upath_io_manager_stats_metadata(manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, _ = manager_and_df
def test_polars_upath_io_manager_stats_metadata(io_manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, _ = io_manager_and_df

df = pl.DataFrame({"a": [0, 1, None], "b": ["a", "b", "c"]})

Expand Down Expand Up @@ -57,8 +57,8 @@ def upstream() -> pl.DataFrame:
)


def test_polars_upath_io_manager_type_annotations(manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, df = manager_and_df
def test_polars_upath_io_manager_type_annotations(io_manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, df = io_manager_and_df

@asset(io_manager_def=manager)
def upstream() -> pl.DataFrame:
Expand Down Expand Up @@ -113,8 +113,8 @@ def downstream_multi_partitioned_lazy(upstream_partitioned: Dict[str, pl.LazyFra
)


def test_polars_upath_io_manager_nested_dtypes(manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, df = manager_and_df
def test_polars_upath_io_manager_nested_dtypes(io_manager_and_df: Tuple[BasePolarsUPathIOManager, pl.DataFrame]):
manager, df = io_manager_and_df

@asset(io_manager_def=manager)
def upstream() -> pl.DataFrame:
Expand Down

0 comments on commit 6c6420b

Please sign in to comment.