Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d42ac3c
Enable the creation of TableReferences that reference tables in exter…
nicolasmueller Feb 24, 2024
45adc0b
Fix tests on DB2 and MSSQL
nicolasmueller Feb 24, 2024
6d77863
Improve readability
nicolasmueller Feb 24, 2024
8c23cd2
Add `repr` output for `TableReference`
nicolasmueller Feb 24, 2024
02ce6a0
Add changelog entry
nicolasmueller Feb 24, 2024
a7f504d
Disable TableReference to schema of current stage
nicolasmueller Feb 26, 2024
b51a3b7
Add external view to test
nicolasmueller Feb 26, 2024
8cc63d0
Add test for polars hook
nicolasmueller Feb 26, 2024
2384753
Fix tests
nicolasmueller Feb 26, 2024
db9d8ce
Update changelog
nicolasmueller Feb 26, 2024
0f55519
Feedback from @windiana42
nicolasmueller Feb 29, 2024
1ffb88a
Fix DB2
nicolasmueller Feb 29, 2024
24fb6c2
Merge branch 'main' into external_table_reference
nicolasmueller Feb 29, 2024
2b1a8d3
Improve test for DuckDB
nicolasmueller Feb 29, 2024
a5769bc
Add external nickname test
nicolasmueller Feb 29, 2024
7b83775
Merge branch 'main' into external_table_reference
nicolasmueller Feb 29, 2024
c3bdebe
Expand documentation
nicolasmueller Feb 29, 2024
847a07b
Renamed db2_shared_lock_allowed to shared_lock_allowed.
windiana42 Mar 7, 2024
15d3f2a
Added more comments to resolve_alias mechanics.
windiana42 Mar 7, 2024
a5895b4
fix typo
windiana42 Mar 7, 2024
ae1fd36
Minor changes to comment and recovery defaults.
windiana42 Mar 7, 2024
918ddc5
Update src/pydiverse/pipedag/util/json.py
windiana42 Mar 7, 2024
df75a7c
Merge branch 'main' into external_table_reference
nicolasmueller Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/source/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog

## 0.6.11 (2024-XX-XX)

## 0.7.0 (2024-XX-XX)
- Rework `TableReference` support:
* Rename `TableReference` to `ExternalTableReference`
* Add support for `ExternalTableReference` to point to tables in external (i.e. not managed by `pipedag`) schemas.
* Remove support for `ExternalTableReference` that points to table in schema of current stage. I.e. `ExternalTableReference` can only point to tables in external schemas.
- Fix polars import in `pyproject.toml` when using OS X with rosetta2

## 0.6.10 (2024-02-29)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,4 @@ Orchestration Engine
Special Table Types
-------------------

.. autoclass:: pydiverse.pipedag.backend.table.sql.TableReference
.. autoclass:: pydiverse.pipedag.backend.table.sql.ExternalTableReference
4 changes: 2 additions & 2 deletions src/pydiverse/pipedag/backend/table/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

from .sql import SQLTableStore, TableReference
from .sql import ExternalTableReference, SQLTableStore

__all__ = [
"SQLTableStore",
"TableReference",
"ExternalTableReference",
]
7 changes: 6 additions & 1 deletion src/pydiverse/pipedag/backend/table/sql/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ def visit_create_table_as_select_ibm_db_sa(create: CreateTableAsSelect, compiler
src_tables = [
f"{preparer.format_schema(tbl['schema'])}.{preparer.quote(tbl['name'])}"
for tbl in create.source_tables
if tbl["shared_lock_allowed"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to remind myself: this is needed to prevent DB2 dialect from locking external table reference input tables if not allowed.

]
lock_statements += [f"LOCK TABLE {ref} IN SHARE MODE" for ref in src_tables]

Expand Down Expand Up @@ -826,7 +827,11 @@ def visit_copy_table(copy_table: CopyTable, compiler, **kw):
query,
early_not_null=copy_table.early_not_null,
source_tables=[
dict(name=copy_table.from_name, schema=copy_table.from_schema.get())
dict(
name=copy_table.from_name,
schema=copy_table.from_schema.get(),
shared_lock_allowed=True,
)
],
suffix=copy_table.suffix,
)
Expand Down
8 changes: 6 additions & 2 deletions src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ def add_indexes(
)
self.execute(query)

def resolve_alias(self, table, schema):
return PipedagDB2Reflection.resolve_alias(self.engine, table, schema)
def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]:
# The base implementation already takes care of converting Table objects
# based on ExternalTableReference objects to string table name and schema.
# For normal Table objects, it needs the stage schema name.
table_name, schema = super().resolve_alias(table, stage_name)
return PipedagDB2Reflection.resolve_alias(self.engine, table_name, schema)

def check_materialization_details_supported(self, label: str | None) -> None:
_ = label
Expand Down
8 changes: 6 additions & 2 deletions src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,12 @@ def _copy_object_to_transaction(
definition = _mssql_update_definition(conn, name, src_schema, dest_schema)
self.execute(definition, conn=conn)

def resolve_alias(self, table: str, schema: str):
return PipedagMSSqlReflection.resolve_alias(self.engine, table, schema)
def resolve_alias(self, table: Table, stage_name: str):
# The base implementation already takes care of converting Table objects
# based on ExternalTableReference objects to string table name and schema.
# For normal Table objects, it needs the stage schema name.
table_name, schema = super().resolve_alias(table, stage_name)
return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema)


@MSSqlTableStore.register_table(pd)
Expand Down
58 changes: 36 additions & 22 deletions src/pydiverse/pipedag/backend/table/sql/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
CreateTableAsSelect,
Schema,
)
from pydiverse.pipedag.backend.table.sql.sql import SQLTableStore, TableReference
from pydiverse.pipedag.backend.table.sql.sql import (
ExternalTableReference,
SQLTableStore,
)
from pydiverse.pipedag.backend.table.util import (
DType,
PandasDTypeBackend,
Expand Down Expand Up @@ -57,7 +60,10 @@ def materialize(
source_tables = [
dict(
name=tbl.name,
schema=store.get_schema(tbl.stage.current_name).get(),
schema=store.get_schema(tbl.stage.current_name).get()
if tbl.external_schema is None
else tbl.external_schema,
shared_lock_allowed=tbl.shared_lock_allowed,
)
for tbl in TaskContext.get().input_tables
]
Expand Down Expand Up @@ -87,10 +93,9 @@ def materialize(

@classmethod
def retrieve(
cls, store, table, stage_name, as_type: type[sa.Table]
cls, store, table: Table, stage_name: str, as_type: type[sa.Table]
) -> sa.sql.expression.Selectable:
schema = store.get_schema(stage_name).get()
table_name, schema = store.resolve_alias(table.name, schema)
table_name, schema = store.resolve_alias(table, stage_name)
alias_name = TaskContext.get().name_disambiguator.get_name(table_name)

for retry_iteration in range(4):
Expand Down Expand Up @@ -126,30 +131,40 @@ def lazy_query_str(cls, store, obj) -> str:


@SQLTableStore.register_table()
class TableReferenceHook(TableHook[SQLTableStore]):
class ExternalTableReferenceHook(TableHook[SQLTableStore]):
@classmethod
def can_materialize(cls, type_) -> bool:
return issubclass(type_, TableReference)
return issubclass(type_, ExternalTableReference)

@classmethod
def can_retrieve(cls, type_) -> bool:
return False

@classmethod
def materialize(cls, store: SQLTableStore, table: Table, stage_name):
# For a table reference, we don't need to materialize anything.
def materialize(cls, store: SQLTableStore, table: Table, stage_name: str):
# For an external table reference, we don't need to materialize anything.
# This is any table referenced by a table reference should already exist
# in the schema.
# Instead, we check that the table actually exists.
schema = store.get_schema(stage_name).get()
stage_schema = store.get_schema(stage_name).get()
if table.external_schema.upper() == stage_schema.upper():
raise ValueError(
f"ExternalTableReference '{table.name}' is not allowed to reference "
f"tables in the transaction schema '{stage_schema}' of the current "
"stage."
)
if stage_schema.upper().startswith(table.external_schema.upper() + "__"):
raise ValueError(
f"ExternalTableReference '{table.name}' is not allowed to reference "
f"tables in the schema '{table.external_schema}' of the current stage."
)

inspector = sa.inspect(store.engine)
has_table = inspector.has_table(table.name, schema)
has_table = store.has_table_or_view(table.name, table.external_schema)

if not has_table:
raise ValueError(
f"Not table with name '{table.name}' found in schema '{schema}' "
"(reference by TableReference)."
f"No table with name '{table.name}' found in schema "
f"'{table.external_schema}' (reference by ExternalTableReference)."
)

return
Expand Down Expand Up @@ -193,7 +208,9 @@ def auto_table(cls, obj: pd.DataFrame):
return super().auto_table(obj)

@classmethod
def materialize(cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name):
def materialize(
cls, store: SQLTableStore, table: Table[pd.DataFrame], stage_name: str
):
df = table.obj.copy(deep=False)
schema = store.get_schema(stage_name)

Expand Down Expand Up @@ -303,8 +320,7 @@ def _build_retrieve_query(
backend: PandasDTypeBackend,
) -> tuple[Any, dict[str, DType]]:
engine = store.engine
schema = store.get_schema(stage_name).get()
table_name, schema = store.resolve_alias(table.name, schema)
table_name, schema = store.resolve_alias(table, stage_name)

sql_table = sa.Table(
table_name,
Expand Down Expand Up @@ -441,7 +457,7 @@ def can_retrieve(cls, type_) -> bool:
return type_ == polars.DataFrame

@classmethod
def materialize(cls, store, table: Table[polars.DataFrame], stage_name):
def materialize(cls, store, table: Table[polars.DataFrame], stage_name: str):
# Materialization for polars happens by first converting the dataframe to
# a pyarrow backed pandas dataframe, and then calling the PandasTableHook
# for materialization.
Expand Down Expand Up @@ -486,8 +502,7 @@ def auto_table(cls, obj: polars.DataFrame):

@classmethod
def _read_db_query(cls, store: SQLTableStore, table: Table, stage_name: str):
schema = store.get_schema(stage_name).get()
table_name, schema = store.resolve_alias(table.name, schema)
table_name, schema = store.resolve_alias(table, stage_name)

t = sa.table(table_name, schema=schema)
q = sa.select("*").select_from(t)
Expand Down Expand Up @@ -767,8 +782,7 @@ def retrieve(
as_type: type[ibis.api.Table],
) -> ibis.api.Table:
conn = cls.conn(store)
schema = store.get_schema(stage_name).get()
table_name, schema = store.resolve_alias(table.name, schema)
table_name, schema = store.resolve_alias(table, stage_name)
for retry_iteration in range(4):
# retry operation since it might have been terminated as a deadlock victim
try:
Expand Down
103 changes: 69 additions & 34 deletions src/pydiverse/pipedag/backend/table/sql/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SQLTableStore(BaseTableStore):
| ``pdt.lazy.SQLTableImpl``

* - pydiverse.pipedag
- | :py:class:`~.TableReference`
- | :py:class:`~.ExternalTableReference`
-


Expand Down Expand Up @@ -367,7 +367,7 @@ def engine_connect(self) -> sa.Connection:
yield conn
conn.commit()

def get_schema(self, name):
def get_schema(self, name: str) -> Schema:
return Schema(name, self.schema_prefix, self.schema_suffix)

def _execute(self, query, conn: sa.engine.Connection):
Expand Down Expand Up @@ -882,12 +882,15 @@ def _deferred_copy_table(
self.logger.exception(msg)
raise CacheError(msg) from _e

def has_table_or_view(self, name, schema: Schema):
def has_table_or_view(self, name, schema: Schema | str):
if isinstance(schema, Schema):
schema = schema.get()
inspector = sa.inspect(self.engine)
has_table = inspector.has_table(name, schema=schema.get())
has_table = inspector.has_table(name, schema=schema)
# workaround for sqlalchemy backends that fail to find views with has_table
# in particular DuckDB https://github.com/duckdb/duckdb/issues/10322
if not has_table:
has_table = name in inspector.get_view_names(schema=schema.get())
has_table = name in inspector.get_view_names(schema=schema)
return has_table

def _swap_alias_with_table_copy(self, table: Table, table_copy: Table):
Expand Down Expand Up @@ -1231,8 +1234,27 @@ def retrieve_raw_sql_metadata(
task_hash=result.task_hash,
)

def resolve_alias(self, table: str, schema: str) -> tuple[str, str]:
return table, schema
def resolve_alias(self, table: Table, stage_name: str) -> tuple[str, str]:
"""
Convert Table objects to string table name and schema.

Take care of ExternalTableReference objects that turn into Table objects
with external_schema not None. For normal Table objects, the stage
schema name is needed. It will be prefixed/postfixed based on config.
Dialect specific implementations will also try to resolve table aliases
or synonyms since they may cause trouble with SQLAlchemy.

:param table: Table object
:param stage_name: Stage name component to schema (will be different
naming pattern for currently processed transaction stage)
:return: string table name and schema
"""
schema = (
self.get_schema(stage_name).get()
if table.external_schema is None
else table.external_schema
)
return table.name, schema

def get_objects_in_stage(self, stage: Stage):
schema = self.get_schema(stage.transaction_name)
Expand Down Expand Up @@ -1274,52 +1296,65 @@ def get_lock_schema(self) -> Schema:
return self.get_schema(self.LOCK_SCHEMA)


class TableReference:
class ExternalTableReference:
"""Reference to a user-created table.

By returning a `TableReference` wrapped in a :py:class:`~.Table` from a task,
you can tell pipedag that you yourself created a new table inside
the correct schema of the table store.
This may be useful if you need to perform a complex load operation to create
a table (e.g. load a table from an Oracle database into Postgres
using `oracle_fdw`).
By returning a `ExternalTableReference` wrapped in a :py:class:`~.Table` from,
a task you can tell pipedag about a table, a view or DB2 nickname in an external
`schema`. The schema may be a multi-part identifier like "[db_name].[schema_name]"
if the database supports this. It is passed to SQLAlchemy as-is.

Only supported by :py:class:`~.SQLTableStore`.

Warning
-------
Using table references is not recommended unless you know what you are doing.
It may lead unexpected behaviour.
It also requires accessing non-public parts of the pipedag API which can
change without notice.
When using a `ExternalTableReference`, pipedag has no way of knowing the cache
validity of the external object. Hence, the user should provide a cache function
for the `Task`.
It is now allowed to specify a `ExternalTableReference` to a table in schema of the
current stage.

Example
-------
Making sure that the table is created in the correct schema is not trivial,
because the schema names usually are different from the stage names.
To get the correct schema name, you must use undocumented and non-public
parts of the pipedag API. To help you get started, we'll provide you with
this example, however, be warned that this might break without notice::
You can use a `ExternalTableReference` to tell pipedag about a table that exists
in an external schema::

@materialize(version="1.0")
def task():
from pydiverse.pipedag.context import ConfigContext, TaskContext
return Table(ExternalTableReference("name_of_table", "schema"))

table_store = ConfigContext.get().store.table_store
task = TaskContext.get().task
By using a cache function, you can establish the cache (in-)validity of the
external table::

# Name of the schema in which you must create the table
schema_name = table_store.get_schema(task.stage.transaction_name).get()
from datetime import date

# Use the table store's engine to create a table in the correct schema
with table_store.engine.begin() as conn:
conn.execute(...)
# The external table becomes cache invalid every day at midnight
def my_cache_fun():
return date.today().strftime("%Y/%m/%d")

# Return a reference to the newly created table
return Table(TableReference(), "name_of_table")
@materialize(cache=my_cache_fun)
def task():
return Table(ExternalTableReference("name_of_table", "schema"))
"""

pass
def __init__(self, name: str, schema: str, shared_lock_allowed: bool = False):
"""
:param name: The name of the table, view, or nickname/alias
:param schema: The external schema of the object. A multi-part schema
is allowed with '.' separator as also supported by SQLAlchemy Table
schema argument.
:param shared_lock_allowed: Whether to disable acquiring a shared lock
when using the object in a SQL query. If set to `False`, no lock is
used. This is useful when the user is not allowed to lock the table.
If pipedag does not lock source tables for this dialect, this argument
has no effect. The default is `False`.
"""
self.name = name
self.schema = schema
self.shared_lock_allowed = shared_lock_allowed

def __repr__(self):
return f"<ExternalTableReference: {hex(id(self))}" f" (schema: {self.schema})>"


# Load SQLTableStore Hooks
Expand Down
Loading