diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 60498281..ad57a296 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -1,6 +1,11 @@ # Changelog -## 0.6.11 (2024-XX-XX) + +## 0.7.0 (2024-XX-XX) +- Rework `TableReference` support: + * Rename `TableReference` to `ExternalTableReference` + * Add support for `ExternalTableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas. + * Remove support for `ExternalTableReference` that points to table in schema of current stage. I.e. `ExternalTableReference` can only point to tables in external schemas. - Fix polars import in `pyproject.toml` when using OS X with rosetta2 ## 0.6.10 (2024-02-29) diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index 248686ef..d4a6c186 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -96,4 +96,4 @@ Orchestration Engine Special Table Types ------------------- -.. autoclass:: pydiverse.pipedag.backend.table.sql.TableReference +.. autoclass:: pydiverse.pipedag.backend.table.sql.ExternalTableReference diff --git a/src/pydiverse/pipedag/backend/table/sql/__init__.py b/src/pydiverse/pipedag/backend/table/sql/__init__.py index f9d8157e..c74d4faf 100644 --- a/src/pydiverse/pipedag/backend/table/sql/__init__.py +++ b/src/pydiverse/pipedag/backend/table/sql/__init__.py @@ -1,8 +1,8 @@ from __future__ import annotations -from .sql import SQLTableStore, TableReference +from .sql import ExternalTableReference, SQLTableStore __all__ = [ "SQLTableStore", - "TableReference", + "ExternalTableReference", ] diff --git a/src/pydiverse/pipedag/backend/table/sql/ddl.py b/src/pydiverse/pipedag/backend/table/sql/ddl.py index cc393f51..a94a64c8 100644 --- a/src/pydiverse/pipedag/backend/table/sql/ddl.py +++ b/src/pydiverse/pipedag/backend/table/sql/ddl.py @@ -706,6 +706,7 @@ def visit_create_table_as_select_ibm_db_sa(create: CreateTableAsSelect, compiler src_tables = [ f"{preparer.format_schema(tbl['schema'])}.{preparer.quote(tbl['name'])}" for tbl in create.source_tables + if tbl["shared_lock_allowed"] ] lock_statements += [f"LOCK TABLE {ref} IN SHARE MODE" for ref in src_tables] @@ -826,7 +827,11 @@ def visit_copy_table(copy_table: CopyTable, compiler, **kw): query, early_not_null=copy_table.early_not_null, source_tables=[ - dict(name=copy_table.from_name, schema=copy_table.from_schema.get()) + dict( + name=copy_table.from_name, + schema=copy_table.from_schema.get(), + shared_lock_allowed=True, + ) ], suffix=copy_table.suffix, ) diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py index a1d76632..ea5a4f22 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py @@ -127,8 +127,12 @@ def add_indexes( ) self.execute(query) - def resolve_alias(self, table, schema): - return PipedagDB2Reflection.resolve_alias(self.engine, table, schema) + def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + # The base implementation already takes care of converting Table objects + # based on ExternalTableReference objects to string table name and schema. + # For normal Table objects, it needs the stage schema name. + table_name, schema = super().resolve_alias(table, stage_name) + return PipedagDB2Reflection.resolve_alias(self.engine, table_name, schema) def check_materialization_details_supported(self, label: str | None) -> None: _ = label diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py index c983132d..775f3ff1 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py @@ -215,8 +215,12 @@ def _copy_object_to_transaction( definition = _mssql_update_definition(conn, name, src_schema, dest_schema) self.execute(definition, conn=conn) - def resolve_alias(self, table: str, schema: str): - return PipedagMSSqlReflection.resolve_alias(self.engine, table, schema) + def resolve_alias(self, table: Table, stage_name: str): + # The base implementation already takes care of converting Table objects + # based on ExternalTableReference objects to string table name and schema. + # For normal Table objects, it needs the stage schema name. + table_name, schema = super().resolve_alias(table, stage_name) + return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema) @MSSqlTableStore.register_table(pd) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 7a1dec37..66de2967 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -18,7 +18,10 @@ CreateTableAsSelect, Schema, ) -from pydiverse.pipedag.backend.table.sql.sql import SQLTableStore, TableReference +from pydiverse.pipedag.backend.table.sql.sql import ( + ExternalTableReference, + SQLTableStore, +) from pydiverse.pipedag.backend.table.util import ( DType, PandasDTypeBackend, @@ -57,7 +60,10 @@ def materialize( source_tables = [ dict( name=tbl.name, - schema=store.get_schema(tbl.stage.current_name).get(), + schema=store.get_schema(tbl.stage.current_name).get() + if tbl.external_schema is None + else tbl.external_schema, + shared_lock_allowed=tbl.shared_lock_allowed, ) for tbl in TaskContext.get().input_tables ] @@ -87,10 +93,9 @@ def materialize( @classmethod def retrieve( - cls, store, table, stage_name, as_type: type[sa.Table] + cls, store, table: Table, stage_name: str, as_type: type[sa.Table] ) -> sa.sql.expression.Selectable: - schema = store.get_schema(stage_name).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) alias_name = TaskContext.get().name_disambiguator.get_name(table_name) for retry_iteration in range(4): @@ -126,30 +131,40 @@ def lazy_query_str(cls, store, obj) -> str: @SQLTableStore.register_table() -class TableReferenceHook(TableHook[SQLTableStore]): +class ExternalTableReferenceHook(TableHook[SQLTableStore]): @classmethod def can_materialize(cls, type_) -> bool: - return issubclass(type_, TableReference) + return issubclass(type_, ExternalTableReference) @classmethod def can_retrieve(cls, type_) -> bool: return False @classmethod - def materialize(cls, store: SQLTableStore, table: Table, stage_name): - # For a table reference, we don't need to materialize anything. + def materialize(cls, store: SQLTableStore, table: Table, stage_name: str): + # For an external table reference, we don't need to materialize anything. # This is any table referenced by a table reference should already exist # in the schema. # Instead, we check that the table actually exists. - schema = store.get_schema(stage_name).get() + stage_schema = store.get_schema(stage_name).get() + if table.external_schema.upper() == stage_schema.upper(): + raise ValueError( + f"ExternalTableReference '{table.name}' is not allowed to reference " + f"tables in the transaction schema '{stage_schema}' of the current " + "stage." + ) + if stage_schema.upper().startswith(table.external_schema.upper() + "__"): + raise ValueError( + f"ExternalTableReference '{table.name}' is not allowed to reference " + f"tables in the schema '{table.external_schema}' of the current stage." + ) - inspector = sa.inspect(store.engine) - has_table = inspector.has_table(table.name, schema) + has_table = store.has_table_or_view(table.name, table.external_schema) if not has_table: raise ValueError( - f"Not table with name '{table.name}' found in schema '{schema}' " - "(reference by TableReference)." + f"No table with name '{table.name}' found in schema " + f"'{table.external_schema}' (reference by ExternalTableReference)." ) return @@ -193,7 +208,9 @@ def auto_table(cls, obj: pd.DataFrame): return super().auto_table(obj) @classmethod - def materialize(cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name): + def materialize( + cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name: str + ): df = table.obj.copy(deep=False) schema = store.get_schema(stage_name) @@ -303,8 +320,7 @@ def _build_retrieve_query( backend: PandasDTypeBackend, ) -> tuple[Any, dict[str, DType]]: engine = store.engine - schema = store.get_schema(stage_name).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) sql_table = sa.Table( table_name, @@ -441,7 +457,7 @@ def can_retrieve(cls, type_) -> bool: return type_ == polars.DataFrame @classmethod - def materialize(cls, store, table: Table[polars.DataFrame], stage_name): + def materialize(cls, store, table: Table[polars.DataFrame], stage_name: str): # Materialization for polars happens by first converting the dataframe to # a pyarrow backed pandas dataframe, and then calling the PandasTableHook # for materialization. @@ -486,8 +502,7 @@ def auto_table(cls, obj: polars.DataFrame): @classmethod def _read_db_query(cls, store: SQLTableStore, table: Table, stage_name: str): - schema = store.get_schema(stage_name).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) t = sa.table(table_name, schema=schema) q = sa.select("*").select_from(t) @@ -767,8 +782,7 @@ def retrieve( as_type: type[ibis.api.Table], ) -> ibis.api.Table: conn = cls.conn(store) - schema = store.get_schema(stage_name).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) for retry_iteration in range(4): # retry operation since it might have been terminated as a deadlock victim try: diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index b90be6ef..55089b58 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -99,7 +99,7 @@ class SQLTableStore(BaseTableStore): | ``pdt.lazy.SQLTableImpl`` * - pydiverse.pipedag - - | :py:class:`~.TableReference` + - | :py:class:`~.ExternalTableReference` - @@ -367,7 +367,7 @@ def engine_connect(self) -> sa.Connection: yield conn conn.commit() - def get_schema(self, name): + def get_schema(self, name: str) -> Schema: return Schema(name, self.schema_prefix, self.schema_suffix) def _execute(self, query, conn: sa.engine.Connection): @@ -882,12 +882,15 @@ def _deferred_copy_table( self.logger.exception(msg) raise CacheError(msg) from _e - def has_table_or_view(self, name, schema: Schema): + def has_table_or_view(self, name, schema: Schema | str): + if isinstance(schema, Schema): + schema = schema.get() inspector = sa.inspect(self.engine) - has_table = inspector.has_table(name, schema=schema.get()) + has_table = inspector.has_table(name, schema=schema) # workaround for sqlalchemy backends that fail to find views with has_table + # in particular DuckDB https://github.com/duckdb/duckdb/issues/10322 if not has_table: - has_table = name in inspector.get_view_names(schema=schema.get()) + has_table = name in inspector.get_view_names(schema=schema) return has_table def _swap_alias_with_table_copy(self, table: Table, table_copy: Table): @@ -1231,8 +1234,27 @@ def retrieve_raw_sql_metadata( task_hash=result.task_hash, ) - def resolve_alias(self, table: str, schema: str) -> tuple[str, str]: - return table, schema + def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + """ + Convert Table objects to string table name and schema. + + Take care of ExternalTableReference objects that turn into Table objects + with external_schema not None. For normal Table objects, the stage + schema name is needed. It will be prefixed/postfixed based on config. + Dialect specific implementations will also try to resolve table aliases + or synonyms since they may cause trouble with SQLAlchemy. + + :param table: Table object + :param stage_name: Stage name component to schema (will be different + naming pattern for currently processed transaction stage) + :return: string table name and schema + """ + schema = ( + self.get_schema(stage_name).get() + if table.external_schema is None + else table.external_schema + ) + return table.name, schema def get_objects_in_stage(self, stage: Stage): schema = self.get_schema(stage.transaction_name) @@ -1274,52 +1296,65 @@ def get_lock_schema(self) -> Schema: return self.get_schema(self.LOCK_SCHEMA) -class TableReference: +class ExternalTableReference: """Reference to a user-created table. - By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task, - you can tell pipedag that you yourself created a new table inside - the correct schema of the table store. - This may be useful if you need to perform a complex load operation to create - a table (e.g. load a table from an Oracle database into Postgres - using `oracle_fdw`). + By returning a `ExternalTableReference` wrapped in a :py:class:`~.Table` from, + a task you can tell pipedag about a table, a view or DB2 nickname in an external + `schema`. The schema may be a multi-part identifier like "[db_name].[schema_name]" + if the database supports this. It is passed to SQLAlchemy as-is. Only supported by :py:class:`~.SQLTableStore`. Warning ------- - Using table references is not recommended unless you know what you are doing. - It may lead unexpected behaviour. - It also requires accessing non-public parts of the pipedag API which can - change without notice. + When using a `ExternalTableReference`, pipedag has no way of knowing the cache + validity of the external object. Hence, the user should provide a cache function + for the `Task`. + It is now allowed to specify a `ExternalTableReference` to a table in schema of the + current stage. Example ------- - Making sure that the table is created in the correct schema is not trivial, - because the schema names usually are different from the stage names. - To get the correct schema name, you must use undocumented and non-public - parts of the pipedag API. To help you get started, we'll provide you with - this example, however, be warned that this might break without notice:: + You can use a `ExternalTableReference` to tell pipedag about a table that exists + in an external schema:: @materialize(version="1.0") def task(): - from pydiverse.pipedag.context import ConfigContext, TaskContext + return Table(ExternalTableReference("name_of_table", "schema")) - table_store = ConfigContext.get().store.table_store - task = TaskContext.get().task + By using a cache function, you can establish the cache (in-)validity of the + external table:: - # Name of the schema in which you must create the table - schema_name = table_store.get_schema(task.stage.transaction_name).get() + from datetime import date - # Use the table store's engine to create a table in the correct schema - with table_store.engine.begin() as conn: - conn.execute(...) + # The external table becomes cache invalid every day at midnight + def my_cache_fun(): + return date.today().strftime("%Y/%m/%d") - # Return a reference to the newly created table - return Table(TableReference(), "name_of_table") + @materialize(cache=my_cache_fun) + def task(): + return Table(ExternalTableReference("name_of_table", "schema")) """ - pass + def __init__(self, name: str, schema: str, shared_lock_allowed: bool = False): + """ + :param name: The name of the table, view, or nickname/alias + :param schema: The external schema of the object. A multi-part schema + is allowed with '.' separator as also supported by SQLAlchemy Table + schema argument. + :param shared_lock_allowed: Whether to disable acquiring a shared lock + when using the object in a SQL query. If set to `False`, no lock is + used. This is useful when the user is not allowed to lock the table. + If pipedag does not lock source tables for this dialect, this argument + has no effect. The default is `False`. + """ + self.name = name + self.schema = schema + self.shared_lock_allowed = shared_lock_allowed + + def __repr__(self): + return f"" # Load SQLTableStore Hooks diff --git a/src/pydiverse/pipedag/materialize/container.py b/src/pydiverse/pipedag/materialize/container.py index 7461ed29..6b984ab9 100644 --- a/src/pydiverse/pipedag/materialize/container.py +++ b/src/pydiverse/pipedag/materialize/container.py @@ -56,6 +56,8 @@ def __init__( ): self._name = None self.stage: Stage | None = None + self.external_schema: str | None = None + self.shared_lock_allowed: bool = True self.obj = obj self.name = name @@ -79,6 +81,19 @@ def __init__( if not isinstance(col, str): raise indexes_type_error + from pydiverse.pipedag.backend.table.sql import ExternalTableReference + + # ExternalTableReference can reference a table from an external schema + if isinstance(self.obj, ExternalTableReference): + self.external_schema = self.obj.schema + if self.name is not None: + raise ValueError( + "When using an ExternalTableReference, the name of the Table must " + "be set via the ExternalTableReference." + ) + self.name = self.obj.name + self.shared_lock_allowed = self.obj.shared_lock_allowed + # cache_key will be overridden shortly before handing over to downstream tasks # that use it to compute their input_hash for cache_invalidation due to input # change diff --git a/src/pydiverse/pipedag/materialize/store.py b/src/pydiverse/pipedag/materialize/store.py index 29ab35d9..c0dd1dea 100644 --- a/src/pydiverse/pipedag/materialize/store.py +++ b/src/pydiverse/pipedag/materialize/store.py @@ -459,7 +459,9 @@ def copy_cached_output_to_transaction_stage( def visitor(x): if isinstance(x, Table): - tables.append(x) + # Tables in external schemas should not get copied + if x.external_schema is None: + tables.append(x) elif isinstance(x, RawSql): raw_sqls.append(x) elif isinstance(x, Blob): diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index 4a5667a1..33928b6d 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -49,6 +49,8 @@ def json_default(o): "indexes": o.indexes, "cache_key": o.cache_key, "materialization_details": o.materialization_details, + "external_schema": o.external_schema, + "shared_lock_allowed": o.shared_lock_allowed, } if isinstance(o, RawSql): return { @@ -114,6 +116,8 @@ def get_stage(name: str): ) tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] + tbl.external_schema = d.get("external_schema") + tbl.shared_lock_allowed = d.get("shared_lock_allowed", True) return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"]) diff --git a/tests/test_sql_dialect/test_ibm_db2.py b/tests/test_sql_dialect/test_ibm_db2.py index 5d567c6e..e67ab193 100644 --- a/tests/test_sql_dialect/test_ibm_db2.py +++ b/tests/test_sql_dialect/test_ibm_db2.py @@ -6,10 +6,19 @@ import pytest import sqlalchemy as sa -from pydiverse.pipedag import Flow, RawSql, Stage, Table, materialize +from pydiverse.pipedag import ConfigContext, Flow, RawSql, Stage, Table, materialize +from pydiverse.pipedag.backend.table.sql import ExternalTableReference +from pydiverse.pipedag.backend.table.sql.ddl import ( + CreateSchema, + CreateTableAsSelect, + DropTable, + Schema, +) from tests.fixtures.instances import with_instances +from tests.util.sql import sql_table_expr from tests.util.tasks_library import ( assert_table_equal, + noop_sql, simple_dataframe, simple_lazy_table, ) @@ -30,11 +39,49 @@ def create_nicknames(table: sa.Table): return RawSql(simple_nicknames, "create_nicknames", separator="|") + @materialize(nout=2) + def create_external_nicknames(): + table_store = ConfigContext.get().store.table_store + schema = Schema("user_controlled_schema", prefix="", suffix="") + table_name = "external_table_for_nickname" + table_store.execute(CreateSchema(schema, if_not_exists=True)) + table_store.execute(DropTable(table_name, schema, if_exists=True)) + query = sql_table_expr({"col": [0, 1, 2, 3]}) + table_store.execute( + CreateTableAsSelect( + table_name, + schema, + query, + ) + ) + script_path = Path(__file__).parent / "scripts" / "simple_nicknames.sql" + simple_nicknames = Path(script_path).read_text() + simple_nicknames = simple_nicknames.replace("{{out_schema}}", schema.get()) + simple_nicknames = simple_nicknames.replace("{{out_table}}", table_name) + table_store.execute_raw_sql( + RawSql(simple_nicknames, "create_external_nicknames", separator="|") + ) + + return Table( + ExternalTableReference( + "nick1", schema=schema.get(), shared_lock_allowed=False + ) + ), Table( + ExternalTableReference( + "nick2", schema=schema.get(), shared_lock_allowed=True + ) + ) + with Flow("f") as f: with Stage("stage"): x = simple_dataframe() nicknames = create_nicknames(x) _ = nicknames + nick_1_ref, nick_2_ref = create_external_nicknames() + nick_1_ref_noop = noop_sql(nick_1_ref) + nick_2_ref_noop = noop_sql(nick_2_ref) + assert_table_equal(nick_1_ref, nick_1_ref_noop) + assert_table_equal(nick_2_ref, nick_2_ref_noop) # We run three times to ensure that the nicknames created in the first run # have to be dropped, since the same schema is reused. diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 35d7ee2a..1f2dced8 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -1,40 +1,72 @@ from __future__ import annotations import pandas as pd +import pytest +import sqlalchemy as sa +from sqlalchemy.exc import ProgrammingError +import tests.util.tasks_library as m from pydiverse.pipedag import * -from pydiverse.pipedag.backend.table.sql import TableReference -from pydiverse.pipedag.backend.table.sql.ddl import CreateTableAsSelect -from pydiverse.pipedag.context import TaskContext +from pydiverse.pipedag.backend.table.sql import ExternalTableReference +from pydiverse.pipedag.backend.table.sql.ddl import ( + CreateSchema, + CreateTableAsSelect, + CreateViewAsSelect, + DropTable, + DropView, + Schema, +) # Parameterize all tests in this file with several instance_id configurations from tests.fixtures.instances import DATABASE_INSTANCES, with_instances from tests.util import swallowing_raises from tests.util.sql import sql_table_expr -from tests.util.tasks_library import assert_table_equal pytestmark = [with_instances(DATABASE_INSTANCES)] +@pytest.mark.polars def test_table_store(): @materialize(version="1.0") def in_table(): - task = TaskContext.get().task table_store = ConfigContext.get().store.table_store - schema = table_store.get_schema(task.stage.transaction_name) - - # Manually materialize the table + schema = Schema("user_controlled_schema", prefix="", suffix="") + table_name = "external_table" + table_store.execute(CreateSchema(schema, if_not_exists=True)) + try: + table_store.execute(DropView("external_view", schema)) + except ProgrammingError: + pass + table_store.execute(DropTable(table_name, schema, if_exists=True)) query = sql_table_expr({"col": [0, 1, 2, 3]}) table_store.execute( CreateTableAsSelect( - "table_reference", + table_name, schema, query, ) ) + return Table(ExternalTableReference(table_name, schema=schema.get())) - # Return a table reference - return Table(TableReference(), "table_reference") + @materialize(version="1.0", input_type=sa.Table) + def in_view(tbl: sa.Table): + table_store = ConfigContext.get().store.table_store + schema = Schema("user_controlled_schema", prefix="", suffix="") + view_name = "external_view" + try: + # We cannot use if_exists=True here because DB2 does not support it + table_store.execute(DropView(view_name, schema)) + except ProgrammingError: + pass + query = sa.select(tbl.c.col).where(tbl.c.col > 1) + table_store.execute( + CreateViewAsSelect( + view_name, + schema, + query, + ) + ) + return Table(ExternalTableReference(view_name, schema=schema.get())) @materialize() def expected_out_table(): @@ -46,11 +78,38 @@ def expected_out_table(): ) ) + @materialize() + def expected_out_view(): + return Table( + pd.DataFrame( + { + "col": [2, 3], + } + ) + ) + with Flow() as f: with Stage("sql_table_reference"): - table = in_table() - expected = expected_out_table() - _ = assert_table_equal(table, expected, check_dtype=False) + external_table = in_table() + expected_external_table = expected_out_table() + _ = m.assert_table_equal( + external_table, expected_external_table, check_dtype=False + ) + + with Stage("sql_view_reference"): + external_view = in_view(external_table) + expected_external_view = expected_out_view() + _ = m.assert_table_equal( + external_view, expected_external_view, check_dtype=False + ) + external_view_polars = m.noop_polars(external_view) + external_view_lazy_polars = m.noop_lazy_polars(external_view) + _ = m.assert_table_equal( + external_view_polars, expected_external_view, check_dtype=False + ) + _ = m.assert_table_equal( + external_view_lazy_polars, expected_external_view, check_dtype=False + ) assert f.run().successful assert f.run().successful @@ -59,7 +118,11 @@ def expected_out_table(): def test_bad_table_reference(): @materialize() def bad_table_reference(): - return Table(TableReference(), "this_table_does_not_exist") + return Table( + ExternalTableReference( + name="this_table_does_not_exist", schema="ext_schema" + ), + ) with Flow() as f: with Stage("sql_table_reference"): diff --git a/tests/util/tasks_library.py b/tests/util/tasks_library.py index ddd1fcd7..cab7f011 100644 --- a/tests/util/tasks_library.py +++ b/tests/util/tasks_library.py @@ -2,11 +2,15 @@ import pandas as pd import sqlalchemy as sa -import sqlalchemy.dialects from pydiverse.pipedag import Blob, RawSql, Table, materialize from pydiverse.pipedag.debug import materialize_table +try: + import polars as pl +except ImportError: + pl = None + @materialize(input_type=pd.DataFrame, version="1.0") def noop(x): @@ -28,6 +32,16 @@ def noop_lazy(x): return x +@materialize(input_type=pl.DataFrame if pl else 0) +def noop_polars(x): + return Table(x) + + +@materialize(input_type=pl.LazyFrame if pl else 0) +def noop_lazy_polars(x): + return Table(x) + + @materialize(nout=2, version="1.0", input_type=pd.DataFrame) def create_tuple(x, y): return x, y