From d42ac3c81e3c891dcf9bd5c6133d4a7eb802d5b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Sat, 24 Feb 2024 18:40:59 +0100 Subject: [PATCH 01/20] Enable the creation of TableReferences that reference tables in external schemas --- .../pipedag/backend/table/sql/hooks.py | 28 ++++++------- .../pipedag/backend/table/sql/sql.py | 37 +++++++++++++++--- .../pipedag/materialize/container.py | 1 + src/pydiverse/pipedag/materialize/store.py | 8 +++- src/pydiverse/pipedag/util/json.py | 2 + .../test_sql_table_reference.py | 39 ++++++++++++++++++- 6 files changed, 94 insertions(+), 21 deletions(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 7a1dec37..9b963587 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -57,11 +57,11 @@ def materialize( source_tables = [ dict( name=tbl.name, - schema=store.get_schema(tbl.stage.current_name).get(), + schema=store.get_schema(tbl.stage.current_name, tbl).get(), ) for tbl in TaskContext.get().input_tables ] - schema = store.get_schema(stage_name) + schema = store.get_schema(stage_name, table) store.check_materialization_details_supported( resolve_materialization_details_label(table) @@ -87,9 +87,9 @@ def materialize( @classmethod def retrieve( - cls, store, table, stage_name, as_type: type[sa.Table] + cls, store, table: Table, stage_name: str, as_type: type[sa.Table] ) -> sa.sql.expression.Selectable: - schema = store.get_schema(stage_name).get() + schema = store.get_schema(stage_name, table).get() table_name, schema = store.resolve_alias(table.name, schema) alias_name = TaskContext.get().name_disambiguator.get_name(table_name) @@ -136,12 +136,12 @@ def can_retrieve(cls, type_) -> bool: return False @classmethod - def materialize(cls, store: SQLTableStore, table: Table, stage_name): + def materialize(cls, store: SQLTableStore, table: Table, stage_name: str): # For a table reference, we don't need to materialize anything. # This is any table referenced by a table reference should already exist # in the schema. # Instead, we check that the table actually exists. - schema = store.get_schema(stage_name).get() + schema = store.get_schema(stage_name, table).get() inspector = sa.inspect(store.engine) has_table = inspector.has_table(table.name, schema) @@ -193,9 +193,11 @@ def auto_table(cls, obj: pd.DataFrame): return super().auto_table(obj) @classmethod - def materialize(cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name): + def materialize( + cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name: str + ): df = table.obj.copy(deep=False) - schema = store.get_schema(stage_name) + schema = store.get_schema(stage_name, table) if store.print_materialize: store.logger.info( @@ -303,7 +305,7 @@ def _build_retrieve_query( backend: PandasDTypeBackend, ) -> tuple[Any, dict[str, DType]]: engine = store.engine - schema = store.get_schema(stage_name).get() + schema = store.get_schema(stage_name, table).get() table_name, schema = store.resolve_alias(table.name, schema) sql_table = sa.Table( @@ -441,13 +443,13 @@ def can_retrieve(cls, type_) -> bool: return type_ == polars.DataFrame @classmethod - def materialize(cls, store, table: Table[polars.DataFrame], stage_name): + def materialize(cls, store, table: Table[polars.DataFrame], stage_name: str): # Materialization for polars happens by first converting the dataframe to # a pyarrow backed pandas dataframe, and then calling the PandasTableHook # for materialization. df = table.obj - schema = store.get_schema(stage_name) + schema = store.get_schema(stage_name, table) if store.print_materialize: store.logger.info( @@ -486,7 +488,7 @@ def auto_table(cls, obj: polars.DataFrame): @classmethod def _read_db_query(cls, store: SQLTableStore, table: Table, stage_name: str): - schema = store.get_schema(stage_name).get() + schema = store.get_schema(stage_name, table).get() table_name, schema = store.resolve_alias(table.name, schema) t = sa.table(table_name, schema=schema) @@ -767,7 +769,7 @@ def retrieve( as_type: type[ibis.api.Table], ) -> ibis.api.Table: conn = cls.conn(store) - schema = store.get_schema(stage_name).get() + schema = store.get_schema(stage_name, table).get() table_name, schema = store.resolve_alias(table.name, schema) for retry_iteration in range(4): # retry operation since it might have been terminated as a deadlock victim diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index b90be6ef..42fe0685 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -367,7 +367,9 @@ def engine_connect(self) -> sa.Connection: yield conn conn.commit() - def get_schema(self, name): + def get_schema(self, name: str, table: Table | None = None): + if table is not None and table.external_schema is not None: + return Schema(table.external_schema, "", "") return Schema(name, self.schema_prefix, self.schema_suffix) def _execute(self, query, conn: sa.engine.Connection): @@ -1278,11 +1280,12 @@ class TableReference: """Reference to a user-created table. By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task, - you can tell pipedag that you yourself created a new table inside - the correct schema of the table store. + you can tell pipedag that you yourself created a new table + either inside the correct schema of the current stage or in an `external_schema`. This may be useful if you need to perform a complex load operation to create a table (e.g. load a table from an Oracle database into Postgres - using `oracle_fdw`). + using `oracle_fdw`) or to tell pipedag about the existence of a table in an external + schema. Only supported by :py:class:`~.SQLTableStore`. @@ -1290,11 +1293,32 @@ class TableReference: ------- Using table references is not recommended unless you know what you are doing. It may lead unexpected behaviour. - It also requires accessing non-public parts of the pipedag API which can + In case of creating a reference to a table in the schema of the current stage, + it also requires accessing non-public parts of the pipedag API which can change without notice. Example ------- + You can use a `TableReference` to tell pipedag about a table that exists + in an external schema:: + + @materialize(version="1.0") + def task(): + return Table(TableReference("external_schema"), "name_of_table") + + By using a cache function, you can establish the cache (in-)validity of the + external table:: + + from datetime import date + + # The external table becomes cache invalid every day at midnight + def my_cache_fun(): + return date.today().strftime("%Y/%m/%d") + + @materialize(cache=my_cache_fun) + def task(): + return Table(TableReference("external_schema"), "name_of_table") + Making sure that the table is created in the correct schema is not trivial, because the schema names usually are different from the stage names. To get the correct schema name, you must use undocumented and non-public @@ -1319,7 +1343,8 @@ def task(): return Table(TableReference(), "name_of_table") """ - pass + def __init__(self, external_schema: str | None = None): + self.external_schema = external_schema # Load SQLTableStore Hooks diff --git a/src/pydiverse/pipedag/materialize/container.py b/src/pydiverse/pipedag/materialize/container.py index 7461ed29..99b5ec80 100644 --- a/src/pydiverse/pipedag/materialize/container.py +++ b/src/pydiverse/pipedag/materialize/container.py @@ -56,6 +56,7 @@ def __init__( ): self._name = None self.stage: Stage | None = None + self.external_schema: str | None = None self.obj = obj self.name = name diff --git a/src/pydiverse/pipedag/materialize/store.py b/src/pydiverse/pipedag/materialize/store.py index 29ab35d9..d4bc27d5 100644 --- a/src/pydiverse/pipedag/materialize/store.py +++ b/src/pydiverse/pipedag/materialize/store.py @@ -8,6 +8,7 @@ from pydiverse.pipedag import Blob, Stage, Table, backend from pydiverse.pipedag._typing import Materializable, T +from pydiverse.pipedag.backend.table.sql import TableReference from pydiverse.pipedag.context import ConfigContext, RunContext, TaskContext from pydiverse.pipedag.context.run_context import StageState from pydiverse.pipedag.core.config import PipedagConfig @@ -313,6 +314,9 @@ def preparation_mutator(x): if isinstance(x, Table): if x.obj is None: raise TypeError("Underlying table object can't be None") + # TableReference can reference a table from an external schema + if isinstance(x.obj, TableReference): + x.external_schema = x.obj.external_schema tables.append(x) elif isinstance(x, RawSql): if x.sql is None: @@ -459,7 +463,9 @@ def copy_cached_output_to_transaction_stage( def visitor(x): if isinstance(x, Table): - tables.append(x) + # Tables in external schemas should not get copied + if x.external_schema is None: + tables.append(x) elif isinstance(x, RawSql): raw_sqls.append(x) elif isinstance(x, Blob): diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index 4a5667a1..c07df289 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -49,6 +49,7 @@ def json_default(o): "indexes": o.indexes, "cache_key": o.cache_key, "materialization_details": o.materialization_details, + "external_schema": o.external_schema, } if isinstance(o, RawSql): return { @@ -114,6 +115,7 @@ def get_stage(name: str): ) tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] + tbl.external_schema = d.get("external_schema") return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"]) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 35d7ee2a..8ab9dab8 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -4,7 +4,12 @@ from pydiverse.pipedag import * from pydiverse.pipedag.backend.table.sql import TableReference -from pydiverse.pipedag.backend.table.sql.ddl import CreateTableAsSelect +from pydiverse.pipedag.backend.table.sql.ddl import ( + CreateSchema, + CreateTableAsSelect, + DropSchema, + Schema, +) from pydiverse.pipedag.context import TaskContext # Parameterize all tests in this file with several instance_id configurations @@ -36,6 +41,22 @@ def in_table(): # Return a table reference return Table(TableReference(), "table_reference") + @materialize(version="1.0") + def in_table_external_schema(): + table_store = ConfigContext.get().store.table_store + schema = Schema("user_controlled_schema", prefix="", suffix="") + table_store.execute(DropSchema(schema, if_exists=True, cascade=True)) + table_store.execute(CreateSchema(schema)) + query = sql_table_expr({"col": [4, 5, 6, 7]}) + table_store.execute( + CreateTableAsSelect( + "external_table", + schema, + query, + ) + ) + return Table(TableReference(external_schema=schema.get()), "external_table") + @materialize() def expected_out_table(): return Table( @@ -46,11 +67,27 @@ def expected_out_table(): ) ) + @materialize() + def expected_out_table_external_schema(): + return Table( + pd.DataFrame( + { + "col": [4, 5, 6, 7], + } + ) + ) + with Flow() as f: with Stage("sql_table_reference"): table = in_table() expected = expected_out_table() _ = assert_table_equal(table, expected, check_dtype=False) + with Stage("sql_table_reference_external_schema"): + external_table = in_table_external_schema() + expected_external_table = expected_out_table_external_schema() + _ = assert_table_equal( + external_table, expected_external_table, check_dtype=False + ) assert f.run().successful assert f.run().successful From 45adc0bae873b575d450d548f290cbe46ca5e3c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Sat, 24 Feb 2024 19:00:50 +0100 Subject: [PATCH 02/20] Fix tests on DB2 and MSSQL --- tests/test_table_hooks/test_sql_table_reference.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 8ab9dab8..999e124c 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -7,7 +7,7 @@ from pydiverse.pipedag.backend.table.sql.ddl import ( CreateSchema, CreateTableAsSelect, - DropSchema, + DropTable, Schema, ) from pydiverse.pipedag.context import TaskContext @@ -45,8 +45,8 @@ def in_table(): def in_table_external_schema(): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") - table_store.execute(DropSchema(schema, if_exists=True, cascade=True)) - table_store.execute(CreateSchema(schema)) + table_store.execute(CreateSchema(schema, if_not_exists=True)) + table_store.execute(DropTable("external_table", schema, if_exists=True)) query = sql_table_expr({"col": [4, 5, 6, 7]}) table_store.execute( CreateTableAsSelect( From 6d77863232c309255a0bb241cb02fa60be3404ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Sat, 24 Feb 2024 19:02:35 +0100 Subject: [PATCH 03/20] Improve readability --- tests/test_table_hooks/test_sql_table_reference.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 999e124c..55d030da 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -45,17 +45,18 @@ def in_table(): def in_table_external_schema(): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") + table_name = "external_table" table_store.execute(CreateSchema(schema, if_not_exists=True)) - table_store.execute(DropTable("external_table", schema, if_exists=True)) + table_store.execute(DropTable(table_name, schema, if_exists=True)) query = sql_table_expr({"col": [4, 5, 6, 7]}) table_store.execute( CreateTableAsSelect( - "external_table", + table_name, schema, query, ) ) - return Table(TableReference(external_schema=schema.get()), "external_table") + return Table(TableReference(external_schema=schema.get()), table_name) @materialize() def expected_out_table(): From 8c23cd2ee9a1f9f88089e887bf300a9fa90a5968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Sat, 24 Feb 2024 21:42:25 +0100 Subject: [PATCH 04/20] Add `repr` output for `TableReference` --- src/pydiverse/pipedag/backend/table/sql/sql.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index 42fe0685..a0b0e85c 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1346,6 +1346,12 @@ def task(): def __init__(self, external_schema: str | None = None): self.external_schema = external_schema + def __repr__(self): + return ( + f"" + ) + # Load SQLTableStore Hooks import pydiverse.pipedag.backend.table.sql.hooks # noqa From 02ce6a06978c6a766af067e84cc7276e885d8fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Sat, 24 Feb 2024 22:02:04 +0100 Subject: [PATCH 05/20] Add changelog entry --- docs/source/changelog.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 1b4bb03b..a0b0fbbc 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -1,5 +1,8 @@ # Changelog +## 0.6.10 (2024-XX-XX) +- Add support for `TableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas + ## 0.6.9 (2024-01-24) - Update dependencies and remove some upper boundaries - Polars dependency moved to >= 0.18.12 due to incompatible interface change From a7f504d6f349e2615d520eba1116117d4f539654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Mon, 26 Feb 2024 14:05:22 +0100 Subject: [PATCH 06/20] Disable TableReference to schema of current stage --- .../pipedag/backend/table/sql/hooks.py | 6 +++ .../pipedag/backend/table/sql/sql.py | 42 +++--------------- .../test_sql_table_reference.py | 44 +++---------------- 3 files changed, 19 insertions(+), 73 deletions(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 9b963587..dbae2721 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -142,6 +142,12 @@ def materialize(cls, store: SQLTableStore, table: Table, stage_name: str): # in the schema. # Instead, we check that the table actually exists. schema = store.get_schema(stage_name, table).get() + stage_schema = store.get_schema(stage_name).get() + if schema == stage_schema: + raise ValueError( + f"TableReference '{table.name}' is not allowed to reference tables " + "in the same schema as the current stage." + ) inspector = sa.inspect(store.engine) has_table = inspector.has_table(table.name, schema) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index a0b0e85c..3c60cc71 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1280,22 +1280,17 @@ class TableReference: """Reference to a user-created table. By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task, - you can tell pipedag that you yourself created a new table - either inside the correct schema of the current stage or in an `external_schema`. - This may be useful if you need to perform a complex load operation to create - a table (e.g. load a table from an Oracle database into Postgres - using `oracle_fdw`) or to tell pipedag about the existence of a table in an external - schema. + you can tell pipedag about a table in an `external_schema`. Only supported by :py:class:`~.SQLTableStore`. Warning ------- - Using table references is not recommended unless you know what you are doing. - It may lead unexpected behaviour. - In case of creating a reference to a table in the schema of the current stage, - it also requires accessing non-public parts of the pipedag API which can - change without notice. + When using a `TableReference`, pipedag has no way of knowing the cache validity + of the table. Hence, the user should provide a cache function for the `Task` + or version the `Task`. + It is now allowed to specify a `TableReference` to a table in schema of the + current stage. Example ------- @@ -1318,32 +1313,9 @@ def my_cache_fun(): @materialize(cache=my_cache_fun) def task(): return Table(TableReference("external_schema"), "name_of_table") - - Making sure that the table is created in the correct schema is not trivial, - because the schema names usually are different from the stage names. - To get the correct schema name, you must use undocumented and non-public - parts of the pipedag API. To help you get started, we'll provide you with - this example, however, be warned that this might break without notice:: - - @materialize(version="1.0") - def task(): - from pydiverse.pipedag.context import ConfigContext, TaskContext - - table_store = ConfigContext.get().store.table_store - task = TaskContext.get().task - - # Name of the schema in which you must create the table - schema_name = table_store.get_schema(task.stage.transaction_name).get() - - # Use the table store's engine to create a table in the correct schema - with table_store.engine.begin() as conn: - conn.execute(...) - - # Return a reference to the newly created table - return Table(TableReference(), "name_of_table") """ - def __init__(self, external_schema: str | None = None): + def __init__(self, external_schema: str): self.external_schema = external_schema def __repr__(self): diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 55d030da..bccee9d7 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -10,7 +10,6 @@ DropTable, Schema, ) -from pydiverse.pipedag.context import TaskContext # Parameterize all tests in this file with several instance_id configurations from tests.fixtures.instances import DATABASE_INSTANCES, with_instances @@ -24,31 +23,12 @@ def test_table_store(): @materialize(version="1.0") def in_table(): - task = TaskContext.get().task - table_store = ConfigContext.get().store.table_store - schema = table_store.get_schema(task.stage.transaction_name) - - # Manually materialize the table - query = sql_table_expr({"col": [0, 1, 2, 3]}) - table_store.execute( - CreateTableAsSelect( - "table_reference", - schema, - query, - ) - ) - - # Return a table reference - return Table(TableReference(), "table_reference") - - @materialize(version="1.0") - def in_table_external_schema(): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") table_name = "external_table" table_store.execute(CreateSchema(schema, if_not_exists=True)) table_store.execute(DropTable(table_name, schema, if_exists=True)) - query = sql_table_expr({"col": [4, 5, 6, 7]}) + query = sql_table_expr({"col": [0, 1, 2, 3]}) table_store.execute( CreateTableAsSelect( table_name, @@ -68,24 +48,10 @@ def expected_out_table(): ) ) - @materialize() - def expected_out_table_external_schema(): - return Table( - pd.DataFrame( - { - "col": [4, 5, 6, 7], - } - ) - ) - with Flow() as f: with Stage("sql_table_reference"): - table = in_table() - expected = expected_out_table() - _ = assert_table_equal(table, expected, check_dtype=False) - with Stage("sql_table_reference_external_schema"): - external_table = in_table_external_schema() - expected_external_table = expected_out_table_external_schema() + external_table = in_table() + expected_external_table = expected_out_table() _ = assert_table_equal( external_table, expected_external_table, check_dtype=False ) @@ -97,7 +63,9 @@ def expected_out_table_external_schema(): def test_bad_table_reference(): @materialize() def bad_table_reference(): - return Table(TableReference(), "this_table_does_not_exist") + return Table( + TableReference(external_schema="ext_schema"), "this_table_does_not_exist" + ) with Flow() as f: with Stage("sql_table_reference"): From b51a3b784a42fa7eae0bc5b9cab880ebf29c5f58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Tue, 27 Feb 2024 00:12:42 +0100 Subject: [PATCH 07/20] Add external view to test --- .../pipedag/backend/table/sql/sql.py | 6 ++-- .../test_sql_table_reference.py | 36 +++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index 3c60cc71..2f1af540 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1280,15 +1280,15 @@ class TableReference: """Reference to a user-created table. By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task, - you can tell pipedag about a table in an `external_schema`. + you can tell pipedag about a table or a view in an `external_schema`. Only supported by :py:class:`~.SQLTableStore`. Warning ------- When using a `TableReference`, pipedag has no way of knowing the cache validity - of the table. Hence, the user should provide a cache function for the `Task` - or version the `Task`. + of the external object. Hence, the user should provide a cache function for the + `Task` or version the `Task`. It is now allowed to specify a `TableReference` to a table in schema of the current stage. diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index bccee9d7..939acf25 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -1,13 +1,16 @@ from __future__ import annotations import pandas as pd +import sqlalchemy as sa from pydiverse.pipedag import * from pydiverse.pipedag.backend.table.sql import TableReference from pydiverse.pipedag.backend.table.sql.ddl import ( CreateSchema, CreateTableAsSelect, + CreateViewAsSelect, DropTable, + DropView, Schema, ) @@ -27,6 +30,7 @@ def in_table(): schema = Schema("user_controlled_schema", prefix="", suffix="") table_name = "external_table" table_store.execute(CreateSchema(schema, if_not_exists=True)) + table_store.execute(DropView("external_view", schema, if_exists=True)) table_store.execute(DropTable(table_name, schema, if_exists=True)) query = sql_table_expr({"col": [0, 1, 2, 3]}) table_store.execute( @@ -38,6 +42,22 @@ def in_table(): ) return Table(TableReference(external_schema=schema.get()), table_name) + @materialize(version="1.0", input_type=sa.Table) + def in_view(tbl: sa.Table): + table_store = ConfigContext.get().store.table_store + schema = Schema("user_controlled_schema", prefix="", suffix="") + view_name = "external_view" + table_store.execute(DropView(view_name, schema, if_exists=True)) + query = sa.select(tbl.c.col).where(tbl.c.col > 1).order_by(tbl.c.col) + table_store.execute( + CreateViewAsSelect( + view_name, + schema, + query, + ) + ) + return Table(TableReference(external_schema=schema.get()), view_name) + @materialize() def expected_out_table(): return Table( @@ -48,6 +68,16 @@ def expected_out_table(): ) ) + @materialize() + def expected_out_view(): + return Table( + pd.DataFrame( + { + "col": [2, 3], + } + ) + ) + with Flow() as f: with Stage("sql_table_reference"): external_table = in_table() @@ -55,6 +85,12 @@ def expected_out_table(): _ = assert_table_equal( external_table, expected_external_table, check_dtype=False ) + with Stage("sql_view_reference"): + external_view = in_view(external_table) + expected_external_view = expected_out_view() + _ = assert_table_equal( + external_view, expected_external_view, check_dtype=False + ) assert f.run().successful assert f.run().successful From 8cc63d06b9b191feccf6ae7b271bacbd282fe496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Tue, 27 Feb 2024 00:26:40 +0100 Subject: [PATCH 08/20] Add test for polars hook --- .../test_table_hooks/test_sql_table_reference.py | 16 +++++++++++++--- tests/util/tasks_library.py | 16 +++++++++++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 939acf25..2e8d475c 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -1,8 +1,10 @@ from __future__ import annotations import pandas as pd +import pytest import sqlalchemy as sa +import tests.util.tasks_library as m from pydiverse.pipedag import * from pydiverse.pipedag.backend.table.sql import TableReference from pydiverse.pipedag.backend.table.sql.ddl import ( @@ -18,11 +20,11 @@ from tests.fixtures.instances import DATABASE_INSTANCES, with_instances from tests.util import swallowing_raises from tests.util.sql import sql_table_expr -from tests.util.tasks_library import assert_table_equal pytestmark = [with_instances(DATABASE_INSTANCES)] +@pytest.mark.polars def test_table_store(): @materialize(version="1.0") def in_table(): @@ -82,15 +84,23 @@ def expected_out_view(): with Stage("sql_table_reference"): external_table = in_table() expected_external_table = expected_out_table() - _ = assert_table_equal( + _ = m.assert_table_equal( external_table, expected_external_table, check_dtype=False ) with Stage("sql_view_reference"): external_view = in_view(external_table) expected_external_view = expected_out_view() - _ = assert_table_equal( + _ = m.assert_table_equal( external_view, expected_external_view, check_dtype=False ) + external_view_polars = m.noop_polars(external_view) + external_view_lazy_polars = m.noop_lazy_polars(external_view) + _ = m.assert_table_equal( + external_view_polars, expected_external_view, check_dtype=False + ) + _ = m.assert_table_equal( + external_view_lazy_polars, expected_external_view, check_dtype=False + ) assert f.run().successful assert f.run().successful diff --git a/tests/util/tasks_library.py b/tests/util/tasks_library.py index 85d673de..95ce40cb 100644 --- a/tests/util/tasks_library.py +++ b/tests/util/tasks_library.py @@ -2,11 +2,15 @@ import pandas as pd import sqlalchemy as sa -import sqlalchemy.dialects from pydiverse.pipedag import Blob, RawSql, Table, materialize from pydiverse.pipedag.debug import materialize_table +try: + import polars as pl +except ImportError: + pl = None + @materialize(input_type=pd.DataFrame, version="1.0") def noop(x): @@ -28,6 +32,16 @@ def noop_lazy(x): return x +@materialize(input_type=pl.DataFrame if pl else 0) +def noop_polars(x): + return Table(x) + + +@materialize(input_type=pl.LazyFrame if pl else 0) +def noop_lazy_polars(x): + return Table(x) + + @materialize(nout=2, version="1.0", input_type=pd.DataFrame) def create_tuple(x, y): return x, y From 2384753cc578caaf12ea5b1758be1f0f8c91bcb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Tue, 27 Feb 2024 00:45:57 +0100 Subject: [PATCH 09/20] Fix tests --- .../test_sql_table_reference.py | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 2e8d475c..3ef61e1d 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -3,6 +3,7 @@ import pandas as pd import pytest import sqlalchemy as sa +from sqlalchemy.exc import ProgrammingError import tests.util.tasks_library as m from pydiverse.pipedag import * @@ -15,6 +16,7 @@ DropView, Schema, ) +from pydiverse.pipedag.backend.table.sql.dialects import DuckDBTableStore # Parameterize all tests in this file with several instance_id configurations from tests.fixtures.instances import DATABASE_INSTANCES, with_instances @@ -26,13 +28,16 @@ @pytest.mark.polars def test_table_store(): - @materialize(version="1.0") + @materialize(version="1.1") def in_table(): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") table_name = "external_table" table_store.execute(CreateSchema(schema, if_not_exists=True)) - table_store.execute(DropView("external_view", schema, if_exists=True)) + try: + table_store.execute(DropView("external_view", schema)) + except ProgrammingError: + pass table_store.execute(DropTable(table_name, schema, if_exists=True)) query = sql_table_expr({"col": [0, 1, 2, 3]}) table_store.execute( @@ -44,13 +49,17 @@ def in_table(): ) return Table(TableReference(external_schema=schema.get()), table_name) - @materialize(version="1.0", input_type=sa.Table) + @materialize(version="1.1", input_type=sa.Table) def in_view(tbl: sa.Table): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") view_name = "external_view" - table_store.execute(DropView(view_name, schema, if_exists=True)) - query = sa.select(tbl.c.col).where(tbl.c.col > 1).order_by(tbl.c.col) + try: + # We cannot use if_exists=True here because DB2 does not support it + table_store.execute(DropView(view_name, schema)) + except ProgrammingError: + pass + query = sa.select(tbl.c.col).where(tbl.c.col > 1) table_store.execute( CreateViewAsSelect( view_name, @@ -87,20 +96,25 @@ def expected_out_view(): _ = m.assert_table_equal( external_table, expected_external_table, check_dtype=False ) - with Stage("sql_view_reference"): - external_view = in_view(external_table) - expected_external_view = expected_out_view() - _ = m.assert_table_equal( - external_view, expected_external_view, check_dtype=False - ) - external_view_polars = m.noop_polars(external_view) - external_view_lazy_polars = m.noop_lazy_polars(external_view) - _ = m.assert_table_equal( - external_view_polars, expected_external_view, check_dtype=False - ) - _ = m.assert_table_equal( - external_view_lazy_polars, expected_external_view, check_dtype=False - ) + config = ConfigContext.get() + store = config.store.table_store + # External views in DuckDB are not supported until the following issue is + # resolved: https://github.com/duckdb/duckdb/issues/10322 + if not isinstance(store, DuckDBTableStore): + with Stage("sql_view_reference"): + external_view = in_view(external_table) + expected_external_view = expected_out_view() + _ = m.assert_table_equal( + external_view, expected_external_view, check_dtype=False + ) + external_view_polars = m.noop_polars(external_view) + external_view_lazy_polars = m.noop_lazy_polars(external_view) + _ = m.assert_table_equal( + external_view_polars, expected_external_view, check_dtype=False + ) + _ = m.assert_table_equal( + external_view_lazy_polars, expected_external_view, check_dtype=False + ) assert f.run().successful assert f.run().successful From db9d8ce14c0048a53ee22d0b40811e67d77762f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Tue, 27 Feb 2024 00:58:13 +0100 Subject: [PATCH 10/20] Update changelog --- docs/source/changelog.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/source/changelog.md b/docs/source/changelog.md index a0b0fbbc..7231240e 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -1,7 +1,9 @@ # Changelog -## 0.6.10 (2024-XX-XX) -- Add support for `TableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas +## 0.7.0 (2024-XX-XX) +- Rework `TableReference` support: + * Add support for `TableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas. + * Remove support for `TableReference` that points to table in schema of current stage. I.e. `TableReference` can only point to tables in external schemas. ## 0.6.9 (2024-01-24) - Update dependencies and remove some upper boundaries From 0f55519a5f245737c37e123a14033aab313b04b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Thu, 29 Feb 2024 11:26:54 +0100 Subject: [PATCH 11/20] Feedback from @windiana42 --- docs/source/changelog.md | 5 +- docs/source/reference/api.rst | 2 +- .../pipedag/backend/table/sql/__init__.py | 4 +- .../pipedag/backend/table/sql/ddl.py | 1 + .../backend/table/sql/dialects/ibm_db2.py | 5 +- .../backend/table/sql/dialects/mssql.py | 5 +- .../pipedag/backend/table/sql/hooks.py | 54 ++++++++++-------- .../pipedag/backend/table/sql/sql.py | 55 ++++++++++--------- .../pipedag/materialize/container.py | 14 +++++ src/pydiverse/pipedag/materialize/store.py | 4 -- src/pydiverse/pipedag/util/json.py | 2 + .../test_sql_table_reference.py | 14 +++-- 12 files changed, 97 insertions(+), 68 deletions(-) diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 7231240e..4aeba360 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -2,8 +2,9 @@ ## 0.7.0 (2024-XX-XX) - Rework `TableReference` support: - * Add support for `TableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas. - * Remove support for `TableReference` that points to table in schema of current stage. I.e. `TableReference` can only point to tables in external schemas. + * Rename `TableReference` to `ExternalTableReference` + * Add support for `ExternalTableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas. + * Remove support for `ExternalTableReference` that points to table in schema of current stage. I.e. `ExternalTableReference` can only point to tables in external schemas. ## 0.6.9 (2024-01-24) - Update dependencies and remove some upper boundaries diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index 248686ef..d4a6c186 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -96,4 +96,4 @@ Orchestration Engine Special Table Types ------------------- -.. autoclass:: pydiverse.pipedag.backend.table.sql.TableReference +.. autoclass:: pydiverse.pipedag.backend.table.sql.ExternalTableReference diff --git a/src/pydiverse/pipedag/backend/table/sql/__init__.py b/src/pydiverse/pipedag/backend/table/sql/__init__.py index f9d8157e..c74d4faf 100644 --- a/src/pydiverse/pipedag/backend/table/sql/__init__.py +++ b/src/pydiverse/pipedag/backend/table/sql/__init__.py @@ -1,8 +1,8 @@ from __future__ import annotations -from .sql import SQLTableStore, TableReference +from .sql import ExternalTableReference, SQLTableStore __all__ = [ "SQLTableStore", - "TableReference", + "ExternalTableReference", ] diff --git a/src/pydiverse/pipedag/backend/table/sql/ddl.py b/src/pydiverse/pipedag/backend/table/sql/ddl.py index cc393f51..8b096ef8 100644 --- a/src/pydiverse/pipedag/backend/table/sql/ddl.py +++ b/src/pydiverse/pipedag/backend/table/sql/ddl.py @@ -706,6 +706,7 @@ def visit_create_table_as_select_ibm_db_sa(create: CreateTableAsSelect, compiler src_tables = [ f"{preparer.format_schema(tbl['schema'])}.{preparer.quote(tbl['name'])}" for tbl in create.source_tables + if tbl["db2_shared_lock_allowed"] ] lock_statements += [f"LOCK TABLE {ref} IN SHARE MODE" for ref in src_tables] diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py index a1d76632..50ef7e20 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py @@ -127,8 +127,9 @@ def add_indexes( ) self.execute(query) - def resolve_alias(self, table, schema): - return PipedagDB2Reflection.resolve_alias(self.engine, table, schema) + def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + table_name, schema = super().resolve_alias(table, stage_name) + return PipedagDB2Reflection.resolve_alias(self.engine, table_name, schema) def check_materialization_details_supported(self, label: str | None) -> None: _ = label diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py index c983132d..e9a037fd 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py @@ -215,8 +215,9 @@ def _copy_object_to_transaction( definition = _mssql_update_definition(conn, name, src_schema, dest_schema) self.execute(definition, conn=conn) - def resolve_alias(self, table: str, schema: str): - return PipedagMSSqlReflection.resolve_alias(self.engine, table, schema) + def resolve_alias(self, table: Table, stage_name: str): + table_name, schema = super().resolve_alias(table, stage_name) + return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema) @MSSqlTableStore.register_table(pd) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index dbae2721..1cc9f18d 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -18,7 +18,10 @@ CreateTableAsSelect, Schema, ) -from pydiverse.pipedag.backend.table.sql.sql import SQLTableStore, TableReference +from pydiverse.pipedag.backend.table.sql.sql import ( + ExternalTableReference, + SQLTableStore, +) from pydiverse.pipedag.backend.table.util import ( DType, PandasDTypeBackend, @@ -57,11 +60,14 @@ def materialize( source_tables = [ dict( name=tbl.name, - schema=store.get_schema(tbl.stage.current_name, tbl).get(), + schema=store.get_schema(tbl.stage.current_name).get() + if tbl.external_schema is None + else tbl.external_schema, + db2_shared_lock_allowed=tbl.db2_shared_lock_allowed, ) for tbl in TaskContext.get().input_tables ] - schema = store.get_schema(stage_name, table) + schema = store.get_schema(stage_name) store.check_materialization_details_supported( resolve_materialization_details_label(table) @@ -89,8 +95,7 @@ def materialize( def retrieve( cls, store, table: Table, stage_name: str, as_type: type[sa.Table] ) -> sa.sql.expression.Selectable: - schema = store.get_schema(stage_name, table).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) alias_name = TaskContext.get().name_disambiguator.get_name(table_name) for retry_iteration in range(4): @@ -126,10 +131,10 @@ def lazy_query_str(cls, store, obj) -> str: @SQLTableStore.register_table() -class TableReferenceHook(TableHook[SQLTableStore]): +class ExternalTableReferenceHook(TableHook[SQLTableStore]): @classmethod def can_materialize(cls, type_) -> bool: - return issubclass(type_, TableReference) + return issubclass(type_, ExternalTableReference) @classmethod def can_retrieve(cls, type_) -> bool: @@ -137,25 +142,29 @@ def can_retrieve(cls, type_) -> bool: @classmethod def materialize(cls, store: SQLTableStore, table: Table, stage_name: str): - # For a table reference, we don't need to materialize anything. + # For an external table reference, we don't need to materialize anything. # This is any table referenced by a table reference should already exist # in the schema. # Instead, we check that the table actually exists. - schema = store.get_schema(stage_name, table).get() stage_schema = store.get_schema(stage_name).get() - if schema == stage_schema: + if table.external_schema.upper() == stage_schema.upper(): + raise ValueError( + f"ExternalTableReference '{table.name}' is not allowed to reference " + f"tables in the transaction schema '{stage_schema}' of the current " + "stage." + ) + if stage_schema.upper().startswith(table.external_schema.upper() + "__"): raise ValueError( - f"TableReference '{table.name}' is not allowed to reference tables " - "in the same schema as the current stage." + f"ExternalTableReference '{table.name}' is not allowed to reference " + f"tables in the schema '{table.external_schema}' of the current stage." ) - inspector = sa.inspect(store.engine) - has_table = inspector.has_table(table.name, schema) + has_table = store.has_table_or_view(table.name, table.external_schema) if not has_table: raise ValueError( - f"Not table with name '{table.name}' found in schema '{schema}' " - "(reference by TableReference)." + f"Not table with name '{table.name}' found in schema " + f"'{table.external_schema}' (reference by ExternalTableReference)." ) return @@ -203,7 +212,7 @@ def materialize( cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name: str ): df = table.obj.copy(deep=False) - schema = store.get_schema(stage_name, table) + schema = store.get_schema(stage_name) if store.print_materialize: store.logger.info( @@ -311,8 +320,7 @@ def _build_retrieve_query( backend: PandasDTypeBackend, ) -> tuple[Any, dict[str, DType]]: engine = store.engine - schema = store.get_schema(stage_name, table).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) sql_table = sa.Table( table_name, @@ -455,7 +463,7 @@ def materialize(cls, store, table: Table[polars.DataFrame], stage_name: str): # for materialization. df = table.obj - schema = store.get_schema(stage_name, table) + schema = store.get_schema(stage_name) if store.print_materialize: store.logger.info( @@ -494,8 +502,7 @@ def auto_table(cls, obj: polars.DataFrame): @classmethod def _read_db_query(cls, store: SQLTableStore, table: Table, stage_name: str): - schema = store.get_schema(stage_name, table).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) t = sa.table(table_name, schema=schema) q = sa.select("*").select_from(t) @@ -775,8 +782,7 @@ def retrieve( as_type: type[ibis.api.Table], ) -> ibis.api.Table: conn = cls.conn(store) - schema = store.get_schema(stage_name, table).get() - table_name, schema = store.resolve_alias(table.name, schema) + table_name, schema = store.resolve_alias(table, stage_name) for retry_iteration in range(4): # retry operation since it might have been terminated as a deadlock victim try: diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index 2f1af540..b262eef8 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -99,7 +99,7 @@ class SQLTableStore(BaseTableStore): | ``pdt.lazy.SQLTableImpl`` * - pydiverse.pipedag - - | :py:class:`~.TableReference` + - | :py:class:`~.ExternalTableReference` - @@ -367,9 +367,7 @@ def engine_connect(self) -> sa.Connection: yield conn conn.commit() - def get_schema(self, name: str, table: Table | None = None): - if table is not None and table.external_schema is not None: - return Schema(table.external_schema, "", "") + def get_schema(self, name: str) -> Schema: return Schema(name, self.schema_prefix, self.schema_suffix) def _execute(self, query, conn: sa.engine.Connection): @@ -884,12 +882,15 @@ def _deferred_copy_table( self.logger.exception(msg) raise CacheError(msg) from _e - def has_table_or_view(self, name, schema: Schema): + def has_table_or_view(self, name, schema: Schema | str): + if isinstance(schema, Schema): + schema = schema.get() inspector = sa.inspect(self.engine) - has_table = inspector.has_table(name, schema=schema.get()) + has_table = inspector.has_table(name, schema=schema) # workaround for sqlalchemy backends that fail to find views with has_table + # in particular DuckDB https://github.com/duckdb/duckdb/issues/10322 if not has_table: - has_table = name in inspector.get_view_names(schema=schema.get()) + has_table = name in inspector.get_view_names(schema=schema) return has_table def _swap_alias_with_table_copy(self, table: Table, table_copy: Table): @@ -1233,8 +1234,13 @@ def retrieve_raw_sql_metadata( task_hash=result.task_hash, ) - def resolve_alias(self, table: str, schema: str) -> tuple[str, str]: - return table, schema + def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + schema = ( + self.get_schema(stage_name).get() + if table.external_schema is None + else table.external_schema + ) + return table.name, schema def get_objects_in_stage(self, stage: Stage): schema = self.get_schema(stage.transaction_name) @@ -1276,30 +1282,30 @@ def get_lock_schema(self) -> Schema: return self.get_schema(self.LOCK_SCHEMA) -class TableReference: +class ExternalTableReference: """Reference to a user-created table. - By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task, - you can tell pipedag about a table or a view in an `external_schema`. + By returning a `ExternalTableReference` wrapped in a :py:class:`~.Table` from, + a task you can tell pipedag about a table or a view in an `schema`. Only supported by :py:class:`~.SQLTableStore`. Warning ------- - When using a `TableReference`, pipedag has no way of knowing the cache validity - of the external object. Hence, the user should provide a cache function for the - `Task` or version the `Task`. - It is now allowed to specify a `TableReference` to a table in schema of the + When using a `ExternalTableReference`, pipedag has no way of knowing the cache + validity of the external object. Hence, the user should provide a cache function + for the `Task` or version the `Task`. + It is now allowed to specify a `ExternalTableReference` to a table in schema of the current stage. Example ------- - You can use a `TableReference` to tell pipedag about a table that exists + You can use a `ExternalTableReference` to tell pipedag about a table that exists in an external schema:: @materialize(version="1.0") def task(): - return Table(TableReference("external_schema"), "name_of_table") + return Table(ExternalTableReference("name_of_table", "schema")) By using a cache function, you can establish the cache (in-)validity of the external table:: @@ -1312,17 +1318,16 @@ def my_cache_fun(): @materialize(cache=my_cache_fun) def task(): - return Table(TableReference("external_schema"), "name_of_table") + return Table(ExternalTableReference("name_of_table", "schema")) """ - def __init__(self, external_schema: str): - self.external_schema = external_schema + def __init__(self, name: str, schema: str, db2_shared_lock_allowed: bool = True): + self.name = name + self.schema = schema + self.db2_shared_lock_allowed = db2_shared_lock_allowed def __repr__(self): - return ( - f"" - ) + return f"" # Load SQLTableStore Hooks diff --git a/src/pydiverse/pipedag/materialize/container.py b/src/pydiverse/pipedag/materialize/container.py index 99b5ec80..dcc0695f 100644 --- a/src/pydiverse/pipedag/materialize/container.py +++ b/src/pydiverse/pipedag/materialize/container.py @@ -57,6 +57,7 @@ def __init__( self._name = None self.stage: Stage | None = None self.external_schema: str | None = None + self.db2_shared_lock_allowed: bool = True self.obj = obj self.name = name @@ -80,6 +81,19 @@ def __init__( if not isinstance(col, str): raise indexes_type_error + from pydiverse.pipedag.backend.table.sql import ExternalTableReference + + # ExternalTableReference can reference a table from an external schema + if isinstance(self.obj, ExternalTableReference): + self.external_schema = self.obj.schema + if self.name is not None: + raise ValueError( + "When using an ExternalTableReference, the name of the Table must " + "be set via the ExternalTableReference." + ) + self.name = self.obj.name + self.db2_shared_lock_allowed = self.obj.db2_shared_lock_allowed + # cache_key will be overridden shortly before handing over to downstream tasks # that use it to compute their input_hash for cache_invalidation due to input # change diff --git a/src/pydiverse/pipedag/materialize/store.py b/src/pydiverse/pipedag/materialize/store.py index d4bc27d5..c0dd1dea 100644 --- a/src/pydiverse/pipedag/materialize/store.py +++ b/src/pydiverse/pipedag/materialize/store.py @@ -8,7 +8,6 @@ from pydiverse.pipedag import Blob, Stage, Table, backend from pydiverse.pipedag._typing import Materializable, T -from pydiverse.pipedag.backend.table.sql import TableReference from pydiverse.pipedag.context import ConfigContext, RunContext, TaskContext from pydiverse.pipedag.context.run_context import StageState from pydiverse.pipedag.core.config import PipedagConfig @@ -314,9 +313,6 @@ def preparation_mutator(x): if isinstance(x, Table): if x.obj is None: raise TypeError("Underlying table object can't be None") - # TableReference can reference a table from an external schema - if isinstance(x.obj, TableReference): - x.external_schema = x.obj.external_schema tables.append(x) elif isinstance(x, RawSql): if x.sql is None: diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index c07df289..eec62bbb 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -50,6 +50,7 @@ def json_default(o): "cache_key": o.cache_key, "materialization_details": o.materialization_details, "external_schema": o.external_schema, + "db2_shared_lock_allowed": o.db2_shared_lock_allowed, } if isinstance(o, RawSql): return { @@ -116,6 +117,7 @@ def get_stage(name: str): tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] tbl.external_schema = d.get("external_schema") + tbl.db2_shared_lock_allowed = d.get("db2_shared_lock_allowed", True) return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"]) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 3ef61e1d..2dafead3 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -7,7 +7,7 @@ import tests.util.tasks_library as m from pydiverse.pipedag import * -from pydiverse.pipedag.backend.table.sql import TableReference +from pydiverse.pipedag.backend.table.sql import ExternalTableReference from pydiverse.pipedag.backend.table.sql.ddl import ( CreateSchema, CreateTableAsSelect, @@ -28,7 +28,7 @@ @pytest.mark.polars def test_table_store(): - @materialize(version="1.1") + @materialize(version="1.0") def in_table(): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") @@ -47,9 +47,9 @@ def in_table(): query, ) ) - return Table(TableReference(external_schema=schema.get()), table_name) + return Table(ExternalTableReference(table_name, schema=schema.get())) - @materialize(version="1.1", input_type=sa.Table) + @materialize(version="1.0", input_type=sa.Table) def in_view(tbl: sa.Table): table_store = ConfigContext.get().store.table_store schema = Schema("user_controlled_schema", prefix="", suffix="") @@ -67,7 +67,7 @@ def in_view(tbl: sa.Table): query, ) ) - return Table(TableReference(external_schema=schema.get()), view_name) + return Table(ExternalTableReference(view_name, schema=schema.get())) @materialize() def expected_out_table(): @@ -124,7 +124,9 @@ def test_bad_table_reference(): @materialize() def bad_table_reference(): return Table( - TableReference(external_schema="ext_schema"), "this_table_does_not_exist" + ExternalTableReference( + name="this_table_does_not_exist", schema="ext_schema" + ), ) with Flow() as f: From 1ffb88a715a3347cb46b5c924cb6b3fca3406ae0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Thu, 29 Feb 2024 11:40:07 +0100 Subject: [PATCH 12/20] Fix DB2 --- src/pydiverse/pipedag/backend/table/sql/ddl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/ddl.py b/src/pydiverse/pipedag/backend/table/sql/ddl.py index 8b096ef8..d81e3f41 100644 --- a/src/pydiverse/pipedag/backend/table/sql/ddl.py +++ b/src/pydiverse/pipedag/backend/table/sql/ddl.py @@ -827,7 +827,11 @@ def visit_copy_table(copy_table: CopyTable, compiler, **kw): query, early_not_null=copy_table.early_not_null, source_tables=[ - dict(name=copy_table.from_name, schema=copy_table.from_schema.get()) + dict( + name=copy_table.from_name, + schema=copy_table.from_schema.get(), + db2_shared_lock_allowed=True, + ) ], suffix=copy_table.suffix, ) From 2b1a8d375b17d5435f42784385c88d037dbc002a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Thu, 29 Feb 2024 13:46:58 +0100 Subject: [PATCH 13/20] Improve test for DuckDB --- .../test_sql_table_reference.py | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/tests/test_table_hooks/test_sql_table_reference.py b/tests/test_table_hooks/test_sql_table_reference.py index 2dafead3..1f2dced8 100644 --- a/tests/test_table_hooks/test_sql_table_reference.py +++ b/tests/test_table_hooks/test_sql_table_reference.py @@ -16,7 +16,6 @@ DropView, Schema, ) -from pydiverse.pipedag.backend.table.sql.dialects import DuckDBTableStore # Parameterize all tests in this file with several instance_id configurations from tests.fixtures.instances import DATABASE_INSTANCES, with_instances @@ -96,25 +95,21 @@ def expected_out_view(): _ = m.assert_table_equal( external_table, expected_external_table, check_dtype=False ) - config = ConfigContext.get() - store = config.store.table_store - # External views in DuckDB are not supported until the following issue is - # resolved: https://github.com/duckdb/duckdb/issues/10322 - if not isinstance(store, DuckDBTableStore): - with Stage("sql_view_reference"): - external_view = in_view(external_table) - expected_external_view = expected_out_view() - _ = m.assert_table_equal( - external_view, expected_external_view, check_dtype=False - ) - external_view_polars = m.noop_polars(external_view) - external_view_lazy_polars = m.noop_lazy_polars(external_view) - _ = m.assert_table_equal( - external_view_polars, expected_external_view, check_dtype=False - ) - _ = m.assert_table_equal( - external_view_lazy_polars, expected_external_view, check_dtype=False - ) + + with Stage("sql_view_reference"): + external_view = in_view(external_table) + expected_external_view = expected_out_view() + _ = m.assert_table_equal( + external_view, expected_external_view, check_dtype=False + ) + external_view_polars = m.noop_polars(external_view) + external_view_lazy_polars = m.noop_lazy_polars(external_view) + _ = m.assert_table_equal( + external_view_polars, expected_external_view, check_dtype=False + ) + _ = m.assert_table_equal( + external_view_lazy_polars, expected_external_view, check_dtype=False + ) assert f.run().successful assert f.run().successful From a5769bc701d276cf346e86d17526ffd408313890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Thu, 29 Feb 2024 18:06:10 +0100 Subject: [PATCH 14/20] Add external nickname test --- tests/test_sql_dialect/test_ibm_db2.py | 49 +++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/test_sql_dialect/test_ibm_db2.py b/tests/test_sql_dialect/test_ibm_db2.py index 56e8d6cd..54f4ff8b 100644 --- a/tests/test_sql_dialect/test_ibm_db2.py +++ b/tests/test_sql_dialect/test_ibm_db2.py @@ -6,10 +6,19 @@ import pytest import sqlalchemy as sa -from pydiverse.pipedag import Flow, RawSql, Stage, Table, materialize +from pydiverse.pipedag import ConfigContext, Flow, RawSql, Stage, Table, materialize +from pydiverse.pipedag.backend.table.sql import ExternalTableReference +from pydiverse.pipedag.backend.table.sql.ddl import ( + CreateSchema, + CreateTableAsSelect, + DropTable, + Schema, +) from tests.fixtures.instances import with_instances +from tests.util.sql import sql_table_expr from tests.util.tasks_library import ( assert_table_equal, + noop_sql, simple_dataframe, simple_lazy_table, ) @@ -30,11 +39,49 @@ def create_nicknames(table: sa.Table): return RawSql(simple_nicknames, "create_nicknames", separator="|") + @materialize(nout=2) + def create_external_nicknames(): + table_store = ConfigContext.get().store.table_store + schema = Schema("user_controlled_schema", prefix="", suffix="") + table_name = "external_table_for_nickname" + table_store.execute(CreateSchema(schema, if_not_exists=True)) + table_store.execute(DropTable(table_name, schema, if_exists=True)) + query = sql_table_expr({"col": [0, 1, 2, 3]}) + table_store.execute( + CreateTableAsSelect( + table_name, + schema, + query, + ) + ) + script_path = Path(__file__).parent / "scripts" / "simple_nicknames.sql" + simple_nicknames = Path(script_path).read_text() + simple_nicknames = simple_nicknames.replace("{{out_schema}}", schema.get()) + simple_nicknames = simple_nicknames.replace("{{out_table}}", table_name) + table_store.execute_raw_sql( + RawSql(simple_nicknames, "create_external_nicknames", separator="|") + ) + + return Table( + ExternalTableReference( + "nick1", schema=schema.get(), db2_shared_lock_allowed=False + ) + ), Table( + ExternalTableReference( + "nick2", schema=schema.get(), db2_shared_lock_allowed=True + ) + ) + with Flow("f") as f: with Stage("stage"): x = simple_dataframe() nicknames = create_nicknames(x) _ = nicknames + nick_1_ref, nick_2_ref = create_external_nicknames() + nick_1_ref_noop = noop_sql(nick_1_ref) + nick_2_ref_noop = noop_sql(nick_2_ref) + assert_table_equal(nick_1_ref, nick_1_ref_noop) + assert_table_equal(nick_2_ref, nick_2_ref_noop) # We run three times to ensure that the nicknames created in the first run # have to be dropped, since the same schema is reused. From c3bdebeee7206e61bae2ec044747b48a4887e64f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=BCller?= Date: Thu, 29 Feb 2024 18:22:29 +0100 Subject: [PATCH 15/20] Expand documentation --- src/pydiverse/pipedag/backend/table/sql/sql.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index b262eef8..fd4b6a6b 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1286,7 +1286,9 @@ class ExternalTableReference: """Reference to a user-created table. By returning a `ExternalTableReference` wrapped in a :py:class:`~.Table` from, - a task you can tell pipedag about a table or a view in an `schema`. + a task you can tell pipedag about a table, a view or DB2 nickname in an external + `schema`. The schema may be a multi-part identifier like "[db_name].[schema_name]" + if the database supports this. It is passed to SQLAlchemy as-is. Only supported by :py:class:`~.SQLTableStore`. @@ -1322,6 +1324,14 @@ def task(): """ def __init__(self, name: str, schema: str, db2_shared_lock_allowed: bool = True): + """ + :param name: The name of the table, view, or DB2 nickname + :param schema: The external schema of the object + :param db2_shared_lock_allowed: Whether to use a shared lock when using + the object in a SQL query. If set to `False`, no lock is used. This is + useful when the user is not allowed to lock the table. + The default is `True`. + """ self.name = name self.schema = schema self.db2_shared_lock_allowed = db2_shared_lock_allowed From 847a07bf856bb7147cd1a30115d5b264e3221e52 Mon Sep 17 00:00:00 2001 From: windiana42 Date: Thu, 7 Mar 2024 10:45:08 +0100 Subject: [PATCH 16/20] Renamed db2_shared_lock_allowed to shared_lock_allowed. Also adjusted documentation to be more generic. Redefined default to False. --- .../pipedag/backend/table/sql/ddl.py | 4 ++-- .../pipedag/backend/table/sql/hooks.py | 2 +- .../pipedag/backend/table/sql/sql.py | 19 +++++++++++-------- .../pipedag/materialize/container.py | 4 ++-- src/pydiverse/pipedag/util/json.py | 4 ++-- tests/test_sql_dialect/test_ibm_db2.py | 4 ++-- 6 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/ddl.py b/src/pydiverse/pipedag/backend/table/sql/ddl.py index d81e3f41..a94a64c8 100644 --- a/src/pydiverse/pipedag/backend/table/sql/ddl.py +++ b/src/pydiverse/pipedag/backend/table/sql/ddl.py @@ -706,7 +706,7 @@ def visit_create_table_as_select_ibm_db_sa(create: CreateTableAsSelect, compiler src_tables = [ f"{preparer.format_schema(tbl['schema'])}.{preparer.quote(tbl['name'])}" for tbl in create.source_tables - if tbl["db2_shared_lock_allowed"] + if tbl["shared_lock_allowed"] ] lock_statements += [f"LOCK TABLE {ref} IN SHARE MODE" for ref in src_tables] @@ -830,7 +830,7 @@ def visit_copy_table(copy_table: CopyTable, compiler, **kw): dict( name=copy_table.from_name, schema=copy_table.from_schema.get(), - db2_shared_lock_allowed=True, + shared_lock_allowed=True, ) ], suffix=copy_table.suffix, diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 1cc9f18d..ed1839a9 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -63,7 +63,7 @@ def materialize( schema=store.get_schema(tbl.stage.current_name).get() if tbl.external_schema is None else tbl.external_schema, - db2_shared_lock_allowed=tbl.db2_shared_lock_allowed, + shared_lock_allowed=tbl.shared_lock_allowed, ) for tbl in TaskContext.get().input_tables ] diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index fd4b6a6b..c4e917fc 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1323,18 +1323,21 @@ def task(): return Table(ExternalTableReference("name_of_table", "schema")) """ - def __init__(self, name: str, schema: str, db2_shared_lock_allowed: bool = True): + def __init__(self, name: str, schema: str, shared_lock_allowed: bool = False): """ - :param name: The name of the table, view, or DB2 nickname - :param schema: The external schema of the object - :param db2_shared_lock_allowed: Whether to use a shared lock when using - the object in a SQL query. If set to `False`, no lock is used. This is - useful when the user is not allowed to lock the table. - The default is `True`. + :param name: The name of the table, view, or nickname/alias + :param schema: The external schema of the object. A multi-part schema + is allowed with '.' separator as also supported by SQLAlchemy Table + schema argument. + :param shared_lock_allowed: Whether to disable acquiring a shared lock + when using the object in a SQL query. If set to `False`, no lock is + used. This is useful when the user is not allowed to lock the table. + If pipedag does not lock source tables for this dialect, this argument + has no effect. The default is `False`. """ self.name = name self.schema = schema - self.db2_shared_lock_allowed = db2_shared_lock_allowed + self.shared_lock_allowed = shared_lock_allowed def __repr__(self): return f"" diff --git a/src/pydiverse/pipedag/materialize/container.py b/src/pydiverse/pipedag/materialize/container.py index dcc0695f..6b984ab9 100644 --- a/src/pydiverse/pipedag/materialize/container.py +++ b/src/pydiverse/pipedag/materialize/container.py @@ -57,7 +57,7 @@ def __init__( self._name = None self.stage: Stage | None = None self.external_schema: str | None = None - self.db2_shared_lock_allowed: bool = True + self.shared_lock_allowed: bool = True self.obj = obj self.name = name @@ -92,7 +92,7 @@ def __init__( "be set via the ExternalTableReference." ) self.name = self.obj.name - self.db2_shared_lock_allowed = self.obj.db2_shared_lock_allowed + self.shared_lock_allowed = self.obj.shared_lock_allowed # cache_key will be overridden shortly before handing over to downstream tasks # that use it to compute their input_hash for cache_invalidation due to input diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index eec62bbb..33928b6d 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -50,7 +50,7 @@ def json_default(o): "cache_key": o.cache_key, "materialization_details": o.materialization_details, "external_schema": o.external_schema, - "db2_shared_lock_allowed": o.db2_shared_lock_allowed, + "shared_lock_allowed": o.shared_lock_allowed, } if isinstance(o, RawSql): return { @@ -117,7 +117,7 @@ def get_stage(name: str): tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] tbl.external_schema = d.get("external_schema") - tbl.db2_shared_lock_allowed = d.get("db2_shared_lock_allowed", True) + tbl.shared_lock_allowed = d.get("shared_lock_allowed", True) return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"]) diff --git a/tests/test_sql_dialect/test_ibm_db2.py b/tests/test_sql_dialect/test_ibm_db2.py index 54f4ff8b..cc101948 100644 --- a/tests/test_sql_dialect/test_ibm_db2.py +++ b/tests/test_sql_dialect/test_ibm_db2.py @@ -64,11 +64,11 @@ def create_external_nicknames(): return Table( ExternalTableReference( - "nick1", schema=schema.get(), db2_shared_lock_allowed=False + "nick1", schema=schema.get(), shared_lock_allowed=False ) ), Table( ExternalTableReference( - "nick2", schema=schema.get(), db2_shared_lock_allowed=True + "nick2", schema=schema.get(), shared_lock_allowed=True ) ) From 15d3f2af7af8580be08c58245680c73f9ef78148 Mon Sep 17 00:00:00 2001 From: windiana42 Date: Thu, 7 Mar 2024 12:19:42 +0100 Subject: [PATCH 17/20] Added more comments to resolve_alias mechanics. --- .../pipedag/backend/table/sql/dialects/ibm_db2.py | 3 +++ .../pipedag/backend/table/sql/dialects/mssql.py | 3 +++ src/pydiverse/pipedag/backend/table/sql/sql.py | 14 ++++++++++++++ 3 files changed, 20 insertions(+) diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py index 50ef7e20..ea5a4f22 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py @@ -128,6 +128,9 @@ def add_indexes( self.execute(query) def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + # The base implementation already takes care of converting Table objects + # based on ExternalTableReference objects to string table name and schema. + # For normal Table objects, it needs the stage schema name. table_name, schema = super().resolve_alias(table, stage_name) return PipedagDB2Reflection.resolve_alias(self.engine, table_name, schema) diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py index e9a037fd..775f3ff1 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py @@ -216,6 +216,9 @@ def _copy_object_to_transaction( self.execute(definition, conn=conn) def resolve_alias(self, table: Table, stage_name: str): + # The base implementation already takes care of converting Table objects + # based on ExternalTableReference objects to string table name and schema. + # For normal Table objects, it needs the stage schema name. table_name, schema = super().resolve_alias(table, stage_name) return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index c4e917fc..6954ca50 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1235,6 +1235,20 @@ def retrieve_raw_sql_metadata( ) def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]: + """ + Convert Table objects to string table name and schema. + + Take care of ExternalTableReference objects that turn into Table objects + with external_schema not None. For normal Table objects, the stage + schema name is needed. It will be prefixed/postfixed based on config. + Dialect specific implementations will also try to resolve table aliases + or synonyms since they may cause trouble with SQLAlchemy. + + :param table: Table object + :param stage_name: Stage name component to schema (will be different + naming pattern for currently processed transaction stage) + :return: string table name and schema + """ schema = ( self.get_schema(stage_name).get() if table.external_schema is None From a5895b436afb015427c4b200370c4fa7a2ee2022 Mon Sep 17 00:00:00 2001 From: windiana42 Date: Thu, 7 Mar 2024 12:59:29 +0100 Subject: [PATCH 18/20] fix typo --- src/pydiverse/pipedag/backend/table/sql/hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index ed1839a9..66de2967 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -163,7 +163,7 @@ def materialize(cls, store: SQLTableStore, table: Table, stage_name: str): if not has_table: raise ValueError( - f"Not table with name '{table.name}' found in schema " + f"No table with name '{table.name}' found in schema " f"'{table.external_schema}' (reference by ExternalTableReference)." ) From ae1fd3614933972f6971699a107a2b7739624b3e Mon Sep 17 00:00:00 2001 From: windiana42 Date: Thu, 7 Mar 2024 13:09:06 +0100 Subject: [PATCH 19/20] Minor changes to comment and recovery defaults. --- src/pydiverse/pipedag/backend/table/sql/sql.py | 2 +- src/pydiverse/pipedag/util/json.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index 6954ca50..55089b58 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -1310,7 +1310,7 @@ class ExternalTableReference: ------- When using a `ExternalTableReference`, pipedag has no way of knowing the cache validity of the external object. Hence, the user should provide a cache function - for the `Task` or version the `Task`. + for the `Task`. It is now allowed to specify a `ExternalTableReference` to a table in schema of the current stage. diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index 33928b6d..b9dde1b7 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -117,7 +117,7 @@ def get_stage(name: str): tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] tbl.external_schema = d.get("external_schema") - tbl.shared_lock_allowed = d.get("shared_lock_allowed", True) + tbl.shared_lock_allowed = d.get("shared_lock_allowed", False) return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"]) From 918ddc5dd04ec1957f267db97f9417d5ff7b4d3c Mon Sep 17 00:00:00 2001 From: windiana42 <61181806+windiana42@users.noreply.github.com> Date: Thu, 7 Mar 2024 14:02:08 +0100 Subject: [PATCH 20/20] Update src/pydiverse/pipedag/util/json.py --- src/pydiverse/pipedag/util/json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index b9dde1b7..33928b6d 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -117,7 +117,7 @@ def get_stage(name: str): tbl.stage = get_stage(d["stage"]) tbl.cache_key = d["cache_key"] tbl.external_schema = d.get("external_schema") - tbl.shared_lock_allowed = d.get("shared_lock_allowed", False) + tbl.shared_lock_allowed = d.get("shared_lock_allowed", True) return tbl if type_ == Type.RAW_SQL: raw_sql = RawSql(name=d["name"])