diff --git a/docs/source/changelog.md b/docs/source/changelog.md index e7fdcea7..67b7dae0 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -12,6 +12,7 @@ non-nullable columns by default. If neither nullable nor non_nullable are specified, the default `CREATE TABLE as SELECT` is kept unmodified except for primary key columns where some dialects require explicit `NOT NULL` statements. - Fix that unlogged tables were created as logged tables when they were copied as cache valid +- Added MaterializationDetails class for MSSQL Table Stores. Currently only one option is supported: identity_insert. If set to true the autoincrement attributes of all columns in a table are set to true ## 0.7.2 (2024-03-25) - Disable Kroki links by default. New setting disable_kroki=True allows to still default kroki_url to https://kroki.io. diff --git a/pipedag.yaml b/pipedag.yaml index 44bab860..fb213b8e 100644 --- a/pipedag.yaml +++ b/pipedag.yaml @@ -106,6 +106,20 @@ instances: blob_store: blob_store_connection: no_blob + mssql_materialization_details: + instance_id: mssql_materialization_details + table_store: + table_store_connection: mssql + args: + strict_materialization_details: false + materialization_details: + __any__: + identity_insert: false + with_identity_insert: + identity_insert: true + blob_store: + blob_store_connection: no_blob + ibm_db2: instance_id: pd_ibm_db2 stage_commit_technique: READ_VIEWS diff --git a/src/pydiverse/pipedag/backend/table/sql/ddl.py b/src/pydiverse/pipedag/backend/table/sql/ddl.py index 4728d657..bce9dd94 100644 --- a/src/pydiverse/pipedag/backend/table/sql/ddl.py +++ b/src/pydiverse/pipedag/backend/table/sql/ddl.py @@ -329,6 +329,7 @@ def __init__( column_types: list[str], nullable: bool | list[bool] | None = None, cap_varchar_max: int | None = None, + autoincrement: bool | None = None, ): if not isinstance(nullable, list): nullable = [nullable for _ in column_names] @@ -338,6 +339,7 @@ def __init__( self.column_types = column_types self.nullable = nullable self.cap_varchar_max = cap_varchar_max + self.autoincrement = False if autoincrement is None else autoincrement class ChangeColumnNullable(DDLElement): diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py b/src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py index e8a6fec5..124cf748 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py @@ -84,7 +84,7 @@ def _execute_materialize( # Create empty table with correct schema cls._dialect_create_empty_table(store, df, table, schema, dtypes) - store.add_indexes_and_set_nullable( + store.add_indexes_and_set_nullable_and_set_autoincrement( table, schema, on_empty_table=True, table_cols=df.columns ) @@ -98,11 +98,8 @@ def _execute_materialize( with duckdb.connect(connection_uri) as conn: conn.execute(f"INSERT INTO {schema_name}.{table_name} SELECT * FROM df") - store.add_indexes_and_set_nullable( - table, - schema, - on_empty_table=False, - table_cols=df.columns, + store.add_indexes_and_set_nullable_and_set_autoincrement( + table, schema, on_empty_table=False, table_cols=df.columns ) diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py index 839417ff..0d6a6dfd 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py @@ -145,15 +145,16 @@ def get_forced_nullability_columns( ] return nullable_cols, non_nullable_cols - def add_indexes_and_set_nullable( + def add_indexes_and_set_nullable_and_set_autoincrement( self, table: Table, schema: Schema, *, on_empty_table: bool | None = None, table_cols: Iterable[str] | None = None, + enable_identity_insert: bool | None = None, ): - super().add_indexes_and_set_nullable( + super().add_indexes_and_set_nullable_and_set_autoincrement( table, schema, on_empty_table=on_empty_table, table_cols=table_cols ) table_name = self.engine.dialect.identifier_preparer.quote(table.name) diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py index 4260dd49..9ac0bac6 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py @@ -2,6 +2,7 @@ import re from collections.abc import Iterable +from dataclasses import dataclass from typing import Any import pandas as pd @@ -22,6 +23,26 @@ from pydiverse.pipedag.backend.table.util import DType from pydiverse.pipedag.materialize import Table from pydiverse.pipedag.materialize.container import RawSql +from pydiverse.pipedag.materialize.details import ( + BaseMaterializationDetails, + resolve_materialization_details_label, +) + + +@dataclass(frozen=True) +class MSSQLMaterializationDetails(BaseMaterializationDetails): + """ + :param identity_insert: Allows explicit values to be inserted + into the identity column of a table. + + .. _identity_insert: + https://learn.microsoft.com/en-us/sql/t-sql/statements/set-identity-insert-transact-sql?view=sql-server-ver16 + """ + + def __post_init__(self): + assert isinstance(self.identity_insert, bool) + + identity_insert: bool = False class MSSqlTableStore(SQLTableStore): @@ -127,13 +148,14 @@ def get_forced_nullability_columns( # the list of nullable columns as well return self._process_table_nullable_parameters(table, table_cols) - def add_indexes_and_set_nullable( + def add_indexes_and_set_nullable_and_set_autoincrement( self, table: Table, schema: Schema, *, on_empty_table: bool | None = None, table_cols: Iterable[str] | None = None, + enable_identity_insert: bool | None = None, ): if on_empty_table is None or on_empty_table: # Set non-nullable and primary key on empty table @@ -164,6 +186,7 @@ def add_indexes_and_set_nullable( nullable_cols, sql_types, nullable=True, + autoincrement=enable_identity_insert, ) ) sql_types = [types[col] for col in non_nullable_cols] @@ -175,6 +198,7 @@ def add_indexes_and_set_nullable( non_nullable_cols, sql_types, nullable=False, + autoincrement=enable_identity_insert, ) ) if len(key_columns) > 0: @@ -189,8 +213,10 @@ def add_indexes_and_set_nullable( sql_types, nullable=False, cap_varchar_max=1024, + autoincrement=enable_identity_insert, ) ) + self.add_table_primary_key(table, schema) if on_empty_table is None or not on_empty_table: self.add_table_indexes(table, schema) @@ -297,6 +323,28 @@ def resolve_alias(self, table: Table, stage_name: str): table_name, schema = super().resolve_alias(table, stage_name) return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema) + def _set_materialization_details( + self, materialization_details: dict[str, dict[str | list[str]]] | None + ) -> None: + self.materialization_details = ( + MSSQLMaterializationDetails.create_materialization_details_dict( + materialization_details, + self.strict_materialization_details, + self.default_materialization_details, + self.logger, + ) + ) + + def get_identity_insert(self, materialization_details_label: str | None) -> bool: + return MSSQLMaterializationDetails.get_attribute_from_dict( + self.materialization_details, + materialization_details_label, + self.default_materialization_details, + "identity_insert", + self.strict_materialization_details, + self.logger, + ) + @MSSqlTableStore.register_table(pd) class PandasTableHook(PandasTableHook): @@ -311,6 +359,58 @@ def _get_dialect_dtypes(cls, dtypes: dict[str, DType], table: Table[pd.DataFrame } ) + @classmethod + def _execute_materialize( + cls, + df: pd.DataFrame, + store: MSSqlTableStore, + table: Table[pd.DataFrame], + schema: Schema, + dtypes: dict[str, DType], + ): + dtypes = cls._get_dialect_dtypes(dtypes, table) + if table.type_map: + dtypes.update(table.type_map) + + store.check_materialization_details_supported( + resolve_materialization_details_label(table) + ) + + enable_identity_insert = store.get_identity_insert( + resolve_materialization_details_label(table) + ) + + if early := store.dialect_requests_empty_creation(table, is_sql=False): + cls._dialect_create_empty_table(store, df, table, schema, dtypes) + store.add_indexes_and_set_nullable_and_set_autoincrement( + table, + schema, + on_empty_table=True, + table_cols=df.columns, + enable_identity_insert=enable_identity_insert, + ) + + with store.engine_connect() as conn: + with conn.begin(): + if early: + store.lock_table(table, schema, conn) + df.to_sql( + table.name, + conn, + schema=schema.get(), + index=False, + dtype=dtypes, + chunksize=100_000, + if_exists="append" if early else "fail", + ) + store.add_indexes_and_set_nullable_and_set_autoincrement( + table, + schema, + on_empty_table=False if early else None, + table_cols=df.columns, + enable_identity_insert=enable_identity_insert, + ) + try: import ibis diff --git a/src/pydiverse/pipedag/backend/table/sql/dialects/postgres.py b/src/pydiverse/pipedag/backend/table/sql/dialects/postgres.py index b567b9cb..501f3c13 100644 --- a/src/pydiverse/pipedag/backend/table/sql/dialects/postgres.py +++ b/src/pydiverse/pipedag/backend/table/sql/dialects/postgres.py @@ -130,7 +130,7 @@ def _execute_materialize( # Create empty table cls._dialect_create_empty_table(store, df, table, schema, dtypes) - store.add_indexes_and_set_nullable( + store.add_indexes_and_set_nullable_and_set_autoincrement( table, schema, on_empty_table=True, table_cols=df.columns ) diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 0c158782..e961dec8 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -99,7 +99,9 @@ def materialize( suffix=suffix, ) ) - store.add_indexes_and_set_nullable(table, schema, on_empty_table=True) + store.add_indexes_and_set_nullable_and_set_autoincrement( + table, schema, on_empty_table=True + ) statements = store.lock_table(table, schema) statements += store.lock_source_tables(source_tables) statements += [ @@ -113,7 +115,9 @@ def materialize( statements, truncate_printed_select=True, ) - store.add_indexes_and_set_nullable(table, schema, on_empty_table=False) + store.add_indexes_and_set_nullable_and_set_autoincrement( + table, schema, on_empty_table=False + ) else: statements = store.lock_source_tables(source_tables) statements += [ @@ -126,7 +130,7 @@ def materialize( ) ] store.execute(statements) - store.add_indexes_and_set_nullable(table, schema) + store.add_indexes_and_set_nullable_and_set_autoincrement(table, schema) @classmethod def retrieve( @@ -323,7 +327,7 @@ def _execute_materialize( if early := store.dialect_requests_empty_creation(table, is_sql=False): cls._dialect_create_empty_table(store, df, table, schema, dtypes) - store.add_indexes_and_set_nullable( + store.add_indexes_and_set_nullable_and_set_autoincrement( table, schema, on_empty_table=True, table_cols=df.columns ) @@ -340,7 +344,7 @@ def _execute_materialize( chunksize=100_000, if_exists="append" if early else "fail", ) - store.add_indexes_and_set_nullable( + store.add_indexes_and_set_nullable_and_set_autoincrement( table, schema, on_empty_table=False if early else None, diff --git a/src/pydiverse/pipedag/backend/table/sql/sql.py b/src/pydiverse/pipedag/backend/table/sql/sql.py index d0386548..88cc49b3 100644 --- a/src/pydiverse/pipedag/backend/table/sql/sql.py +++ b/src/pydiverse/pipedag/backend/table/sql/sql.py @@ -677,13 +677,14 @@ def dialect_requests_empty_creation(self, table: Table, is_sql: bool) -> bool: _ = is_sql return table.nullable is not None or table.non_nullable is not None - def add_indexes_and_set_nullable( + def add_indexes_and_set_nullable_and_set_autoincrement( self, table: Table, schema: Schema, *, on_empty_table: bool | None = None, table_cols: Iterable[str] | None = None, + enable_identity_insert: bool | None = None, ): if on_empty_table is None or on_empty_table: # By default, we set non-nullable on empty table diff --git a/tests/fixtures/instances.py b/tests/fixtures/instances.py index eab12d74..1ae39251 100644 --- a/tests/fixtures/instances.py +++ b/tests/fixtures/instances.py @@ -22,6 +22,7 @@ "postgres_unlogged": pytest.mark.postgres, "mssql": pytest.mark.mssql, "mssql_pytsql": pytest.mark.mssql, + "mssql_materialization_details": pytest.mark.mssql, "ibm_db2": pytest.mark.ibm_db2, "ibm_db2_avoid_schema": pytest.mark.ibm_db2, "ibm_db2_materialization_details": pytest.mark.ibm_db2, @@ -54,6 +55,7 @@ "postgres_unlogged", "mssql", "mssql_pytsql", + "mssql_materialization_details", "ibm_db2", "ibm_db2_avoid_schema", "ibm_db2_materialization_details", diff --git a/tests/test_identity_insert.py b/tests/test_identity_insert.py new file mode 100644 index 00000000..e7911eff --- /dev/null +++ b/tests/test_identity_insert.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from pydiverse.pipedag import Flow, Stage + +# Parameterize all tests in this file with several instance_id configurations +from tests.fixtures.instances import ( + DATABASE_INSTANCES, + skip_instances, + with_instances, +) +from tests.util import tasks_library as m + +pytestmark = [with_instances(DATABASE_INSTANCES)] + + +@with_instances(DATABASE_INSTANCES, "mssql_materialization_details") +@skip_instances("ibm_db2", "postgres", "duckdb") +def test_identity_insert(): + with Flow("flow") as f: + with Stage("stage"): + _ = m.simple_dataframe() + _ = m.simple_identity_insert() + + assert f.run().successful diff --git a/tests/util/tasks_library.py b/tests/util/tasks_library.py index cab7f011..1b32f7e3 100644 --- a/tests/util/tasks_library.py +++ b/tests/util/tasks_library.py @@ -306,6 +306,19 @@ def simple_table_default_compressed(): ) +@materialize(lazy=True, version="1.0.0") +def simple_identity_insert(): + df = pd.DataFrame( + { + "col1": [0, 1, 2, 3], + "col2": ["0", "1", "2", "3"], + } + ) + return Table( + df, name="identity_insert_on", materialization_details="with_identity_insert" + ) + + @materialize(version="1.0") def pd_dataframe(data: dict[str, list]): return pd.DataFrame(data)