diff --git a/src/databricks/labs/lakebridge/reconcile/design/__init__.py b/src/databricks/labs/lakebridge/reconcile/design/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/databricks/labs/lakebridge/reconcile/design/expressions.py b/src/databricks/labs/lakebridge/reconcile/design/expressions.py new file mode 100644 index 000000000..7ca02ba38 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/expressions.py @@ -0,0 +1,120 @@ +import dataclasses +import typing as t +from abc import ABC, abstractmethod +from functools import reduce + +import sqlglot.expressions as e +from duckdb.duckdb import alias +from sqlglot.dialects import Dialect as SqlglotDialect + +DialectType = t.Union[str, SqlglotDialect, t.Type[SqlglotDialect], None] + +@dataclasses.dataclass(frozen=True) +class ExpressionTransformation: + func: t.Callable # isnt this Func + args: dict + + +class AnyExpression(ABC): + @abstractmethod + def build(self) -> str: + pass + + +class ExpressionBuilder(AnyExpression): + _expression: e.Expression + + def __init__(self, column_name: str, dialect: str, table_name: str | None = None): + self._column_name = column_name + self._alias = None + self._table_name = table_name + self._dialect = dialect + self._transformations: list[ExpressionTransformation] = [] + + def build(self) -> str: + if self._table_name: + column = e.Column(this=self._column_name, table=self._table_name) + else: + column = e.Column(this=self._column_name, quoted=False) + aliased = e.Alias(this=column, alias=self._alias) if self._alias else column + transformed = self._apply_transformations(aliased) + select_stmt = e.select(transformed).sql(dialect=self._dialect) + return select_stmt.removeprefix("SELECT ") # return only column with the transformations + + def _apply_transformations(self, column: e.Expression) -> e.Expression: + exp = column + for transformation in self._transformations: + exp = transformation.func(this=exp.copy(), **transformation.args) # add error handling + return exp + + def column_name(self, name: str): + self._column_name = name + return self + + def alias(self, alias: str | None): + self._alias = alias + return self + + def table_name(self, name: str): + self._column_name = name + return self + + def transform(self, func: t.Callable, **kwargs): + transform = ExpressionTransformation(func, kwargs) + self._transformations.append(transform) + return self + + def concat(self, other: "ExpressionBuilder"): + pass + +class HashExpressionsBuilder(AnyExpression): + + def __init__(self, dialect: str, columns: list[ExpressionBuilder]): + self._dialect = dialect + self._alias = None + self._expressions: list[ExpressionBuilder] = columns + + def build(self) -> str: + columns_to_hash = [col.alias(None).build() for col in self._expressions] + columns_to_hash_expr = [e.Column(this=col) for col in columns_to_hash] + concat_expr = e.Concat(expressions=columns_to_hash_expr) + if self._dialect == "oracle": + concat_expr = reduce(lambda x, y: e.DPipe(this=x, expression=y), concat_expr.expressions) + match self._dialect: # Implement for the rest + case "tsql": return ( + "CONVERT(VARCHAR(256), HASHBYTES(" + "'SHA2_256', CONVERT(VARCHAR(256),{})), 2)" + f" AS {self._alias}" if self._alias else "" + .format(concat_expr.sql(dialect=self._dialect)) + ) + case _: + sha = e.SHA2(this=concat_expr, length=e.Literal(this=256, is_string=False)) + if self._alias: sha = e.Alias(this=sha, alias=self._alias) + return sha.sql(dialect=self._dialect) + + def alias(self, alias: str | None): + self._alias = alias + return self + + +class QueryBuilder: + + def __init__(self, dialect: str, columns: list[AnyExpression]): + self._dialect = dialect + self._expressions: list[AnyExpression] = columns + + def build(self) -> str: + select = [ex.build() for ex in self._expressions] + return e.select(*select).from_(":table").sql(dialect=self._dialect) + + +def coalesce(column: ExpressionBuilder, default=0, is_string=False) -> ExpressionBuilder: + expressions = [e.Literal(this=default, is_string=is_string)] + return column.transform(e.Coalesce, expressions=expressions) + +def trim(column: ExpressionBuilder) -> ExpressionBuilder: + return column.transform(e.Trim) + +def unix_time(column: ExpressionBuilder): + return column.transform(e.TimeStrToUnix) #placeholder + diff --git a/src/databricks/labs/lakebridge/reconcile/design/normalizers.py b/src/databricks/labs/lakebridge/reconcile/design/normalizers.py new file mode 100644 index 000000000..b5096a415 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/normalizers.py @@ -0,0 +1,238 @@ +import dataclasses +from abc import ABC, abstractmethod + +import expressions as e +from databricks.labs.lakebridge.reconcile.connectors.dialect_utils import DialectUtils +from utypes import ExternalType, UType, ColumnTypeName + + +@dataclasses.dataclass(frozen=True) +class ExternalColumnDefinition: + column_name: str + data_type: ExternalType + encoding: str = "utf-8" + +@dataclasses.dataclass(frozen=True) +class DatetimeColumnDefinition(ExternalColumnDefinition): + timezone: str = "UTC" + + +class AbstractNormalizer(ABC): + @classmethod + @abstractmethod + def registry_key_family(cls) -> str: + pass + + @classmethod + @abstractmethod + def registry_key(cls) -> str: + pass + + @abstractmethod + def normalize(self, column: e.ExpressionBuilder, dialect: e.DialectType, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + pass + +class UniversalNormalizer(AbstractNormalizer, ABC): + @classmethod + def registry_key_family(cls) -> str: + return "Universal" + +class HandleNullsAndTrimNormalizer(UniversalNormalizer): + @classmethod + def registry_key(cls) -> str: + return cls.__name__ + + def normalize(self, column: e.ExpressionBuilder, dialect: e.DialectType, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + return e.coalesce(e.trim(column), "__null_recon__", is_string=True) + +class QuoteIdentifierNormalizer(UniversalNormalizer): + @classmethod + def registry_key(cls) -> str: + return cls.__name__ + + def normalize(self, column: e.ExpressionBuilder, dialect: e.DialectType, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + match dialect: + case "oracle": return self._quote_oracle(column, column_def) + case "databricks": return self._quote_databricks(column, column_def) + case "snowflake": return self._quote_snowflake(column, column_def) + case _: return column # instead of error, return as is + + def _quote_oracle(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + normalized = DialectUtils.normalize_identifier( + column_def.column_name, + source_start_delimiter='"', + source_end_delimiter='"', + ).source_normalized + return column.column_name(normalized) + + def _quote_databricks(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + normalized = DialectUtils.ansi_quote_identifier(column_def.column_name) + return column.column_name(normalized) + + def _quote_snowflake(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + normalized = DialectUtils.normalize_identifier( + column_def.column_name, + source_start_delimiter='"', + source_end_delimiter='"', + ).source_normalized + return column.column_name(normalized) + + +class AbstractTypeNormalizer(AbstractNormalizer): + @classmethod + def registry_key_family(cls) -> str: + return "ForType" + + @classmethod + @abstractmethod + def utype(cls) -> UType: + pass + + def normalize(self, column: e.ExpressionBuilder, dialect: str, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + match dialect: + case "oracle": return self._normalize_oracle(column, column_def) + case "databricks": return self._normalize_databricks(column, column_def) + case "snowflake": return self._normalize_snowflake(column, column_def) + case _: return column # instead of error, return as is + + @abstractmethod + def _normalize_oracle(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + pass + + @abstractmethod + def _normalize_databricks(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + pass + + @abstractmethod + def _normalize_snowflake(self, column: e.ExpressionBuilder, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + pass + +class UDatetimeTypeNormalizer(AbstractTypeNormalizer): + """ + transform all dialects to unix time + """ + + @classmethod + def registry_key(cls) -> str: + return cls.utype().name.name + + @classmethod + def utype(cls) -> UType: + return UType(ColumnTypeName.DATETIME) + + def _normalize_oracle(self, column: e.ExpressionBuilder, source_col: ExternalColumnDefinition) -> e.ExpressionBuilder: + return column + + def _normalize_databricks(self, column: e.ExpressionBuilder, source_col: ExternalColumnDefinition) -> e.ExpressionBuilder: + return e.unix_time(column) + + def _normalize_snowflake(self, column: e.ExpressionBuilder, source_col: ExternalColumnDefinition) -> e.ExpressionBuilder: + return column + +class UStringTypeNormalizer(AbstractTypeNormalizer): + + _delegate = HandleNullsAndTrimNormalizer() + + @classmethod + def registry_key(cls) -> str: + return cls.utype().name.name + + @classmethod + def utype(cls) -> UType: + return UType(ColumnTypeName.VARCHAR) + + def _normalize_oracle(self, column: e.ExpressionBuilder, + column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + return self._delegate.normalize(column, "", column_def) + + def _normalize_databricks(self, column: e.ExpressionBuilder, + column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + return self._delegate.normalize(column, "", column_def) + + def _normalize_snowflake(self, column: e.ExpressionBuilder, + column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + return self._delegate.normalize(column, "", column_def) + + +class NormalizersRegistry: + _registry: dict[str,dict[str, AbstractNormalizer]] = {} # can we type this to subclass of AbstractTypeNormalizer + + def register_normalizer(self, normalizer: AbstractNormalizer): # also subclasses + family = self._registry.get(normalizer.registry_key_family(), {}) + if family.get(normalizer.registry_key()): + raise ValueError(f"Normalizer already registered for utype: {normalizer.registry_key_family()},{normalizer.registry_key()}") + if not family: + self._registry[normalizer.registry_key_family()] = {} + self._registry[normalizer.registry_key_family()][normalizer.registry_key()] = normalizer + + def get_type_normalizer(self, name: ColumnTypeName) -> AbstractTypeNormalizer | None: + return self._registry.get(AbstractTypeNormalizer.registry_key_family(), {}).get(name.name) + + def get_universal_normalizers(self): + return self._registry.get(UniversalNormalizer.registry_key_family(), {}).values() + +class DialectNormalizer(ABC): + DbTypeNormalizerType = dict[ColumnTypeName, ColumnTypeName] + # or ExternalType to UType. what about extra type information e.g scale, precision? + + dialect: e.DialectType + + def __init__(self, registry: NormalizersRegistry): + self._registry = registry + + @classmethod + def type_normalizers(cls) -> DbTypeNormalizerType: + return { + ColumnTypeName("DATE"): UDatetimeTypeNormalizer.utype().name, + ColumnTypeName("NCHAR"): UStringTypeNormalizer.utype().name, + ColumnTypeName("CHAR"): UStringTypeNormalizer.utype().name, + ColumnTypeName("VARCHAR"): UStringTypeNormalizer.utype().name, + ColumnTypeName("NVARCHAR"): UStringTypeNormalizer.utype().name, + ColumnTypeName("VARCHAR2"): UStringTypeNormalizer.utype().name, + } + + def normalize(self, column_def: ExternalColumnDefinition) -> e.ExpressionBuilder: + start = e.ExpressionBuilder(column_def.column_name, self.dialect) + for normalizer in self._registry.get_universal_normalizers(): + start = normalizer.normalize(start, self.dialect, column_def) + utype = self.type_normalizers().get(column_def.data_type.name) + if utype: + normalizer = self._registry.get_type_normalizer(utype) + if normalizer: + return normalizer.normalize(start, self.dialect, column_def) + return start + + +class OracleNormalizer(DialectNormalizer): + dialect = "oracle" + + +class SnowflakeNormalizer(DialectNormalizer): + dialect = "snowflake" + +if __name__ == "__main__": + registry = NormalizersRegistry() + registry.register_normalizer(UDatetimeTypeNormalizer()) + registry.register_normalizer(UStringTypeNormalizer()) + # registry.register_normalizer(HandleNullsAndTrimNormalizer()) + registry.register_normalizer(QuoteIdentifierNormalizer()) + oracle = OracleNormalizer(registry) + snow = SnowflakeNormalizer(registry) + + column = ExternalColumnDefinition("student_id", ExternalType(ColumnTypeName["NCHAR"])) + + oracle_column = oracle.normalize(column, registry).build() + assert oracle_column == "COALESCE(TRIM(\"student_id\"), '__null_recon__')" + snow_column = snow.normalize(column, registry).build() + assert snow_column == "COALESCE(TRIM(\"student_id\"), '__null_recon__')" + + +""" +1. source system +2. target system +3. datatype +4. encoding +5. query +""" + + diff --git a/src/databricks/labs/lakebridge/reconcile/design/normalizers.yaml b/src/databricks/labs/lakebridge/reconcile/design/normalizers.yaml new file mode 100644 index 000000000..394b21043 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/normalizers.yaml @@ -0,0 +1,136 @@ +version: "1.0.0" + +lakebridge: + description: > + Unified configuration for type normalization rules. + Each category defines transformations that standardize data types + across dialects before comparison. + + categories: + # ------------------------------------------------------------------------ + # TEMPORAL CATEGORY + # ------------------------------------------------------------------------ + temporal: + description: > + Temporal types across dialects. + + # Per-category dialect type mapping -> canonical classes + type_map: + canonical_classes: ["DATE", "TIME", "TIMESTAMP", "TIMESTAMP_TZ", "INTERVAL"] + tsql: + DATE: ["DATE"] + TIME: ["TIME"] + TIMESTAMP: ["DATETIME2", "DATETIME", "SMALLDATETIME"] + TIMESTAMP_TZ: ["DATETIMEOFFSET"] + INTERVAL: [] + oracle: + DATE: ["DATE"] # Oracle DATE includes time component. treat as DATE here for normalization. + TIME: [] + TIMESTAMP: ["TIMESTAMP"] + TIMESTAMP_TZ: ["TIMESTAMP WITH TIME ZONE", "TIMESTAMP WITH LOCAL TIME ZONE"] + INTERVAL: ["INTERVAL YEAR TO MONTH", "INTERVAL DAY TO SECOND"] + snowflake: + DATE: ["DATE"] + TIME: ["TIME"] + TIMESTAMP: ["DATETIME", "TIMESTAMP_NTZ"] + TIMESTAMP_TZ: ["TIMESTAMP_TZ", "TIMESTAMP_LTZ"] + INTERVAL: [] + databricks: + DATE: ["DATE"] + TIME: [] + TIMESTAMP: ["TIMESTAMP_NTZ"] + TIMESTAMP_TZ: ["TIMESTAMP"] + INTERVAL: ["INTERVAL"] + + transformations: + + date_to_iso8601: + description: "Normalize DATE to ISO 8601 (YYYY-MM-DD)." + applies_to: ["DATE"] + dialects: + tsql: + sql: "CONVERT(VARCHAR(10), {}, 23)" + snowflake: + sql: "TO_VARCHAR({}, 'YYYY-MM-DD')" + databricks: + sql: "DATE_FORMAT({}, 'yyyy-MM-dd')" + oracle: + sql: "TO_CHAR({}, 'YYYY-MM-DD')" + test_cases: + - name: "standard_date" + input: "2025-10-24" + expected_output: "2025-10-24" + - name: "null_handling" + input: null + expected_output: null + + timestamp_to_iso8601: + description: "Normalize TIMESTAMP (no tz) to ISO 8601 with microseconds." + applies_to: ["TIMESTAMP"] + dialects: + snowflake: + sql: "TO_VARCHAR({}, 'YYYY-MM-DD HH24:MI:SS.FF6')" + oracle: + sql: "TO_CHAR({}, 'YYYY-MM-DD HH24:MI:SS.FF6')" + tsql: + sql: "CONVERT(VARCHAR(27), {}, 126)" + databricks: + sql: "DATE_FORMAT({}, 'yyyy-MM-dd HH:mm:ss.SSSSSS')" + test_cases: + - name: "standard_timestamp" + input: "2025-10-24 13:45:30.123456" + expected_output: "2025-10-24 13:45:30.123456" + + timestamptz_to_iso8601: + description: "Normalize TIMESTAMP with time zone to ISO 8601 with microseconds." + applies_to: ["TIMESTAMP_TZ"] + dialects: + snowflake: + sql: "TO_VARCHAR({}, 'YYYY-MM-DD HH24:MI:SS.FF6 TZH:TZM')" + oracle: + sql: "TO_CHAR({}, 'YYYY-MM-DD HH24:MI:SS.FF6 TZH:TZM')" + tsql: + sql: "REPLACE(CONVERT(VARCHAR(33), CAST({} AS DATETIME2(6)), 126), 'T', ' ')" + databricks: + sql: "DATE_FORMAT({}, 'yyyy-MM-dd HH:mm:ss.SSSSSS ZZZZZ')" + test_cases: + - name: "with_offset" + input: "2025-10-24 13:45:30.123456 +02:00" + expected_output: "2025-10-24 13:45:30.123456 +02:00" + + time_to_hhmmss: + description: "Normalize TIME to HH:MM:SS[.ffffff]." + applies_to: ["TIME"] + dialects: + snowflake: + sql: "TO_VARCHAR({}, 'HH24:MI:SS.FF6')" + tsql: + sql: "FORMAT({}, 'HH:mm:ss.ffffff')" + test_cases: + - name: "standard_time" + input: "13:45:30" + expected_output: "13:45:30" + + numeric: + description: > + Numeric types represent numbers, including integers, decimals, and floating‑point values. + + # Per-category dialect type mapping -> canonical numeric classes + type_map: + canonical_classes: ["INTEGER", "DECIMAL", "FLOAT"] + tsql: # implement + INTEGER: [] + DECIMAL: [] + FLOAT: [] + oracle: + INTEGER: [] + DECIMAL: ["NUMBER"] + FLOAT: ["BINARY_DOUBLE", "BINARY_FLOAT"] + snowflake: # implement + INTEGER: [] + DECIMAL: [] + FLOAT: [] + databricks: + INTEGER: ["TINYINT", "SMALLINT", "INT", "BIGINT"] + DECIMAL: ["DECIMAL"] + FLOAT: ["FLOAT", "DOUBLE"] diff --git a/src/databricks/labs/lakebridge/reconcile/design/pipeline.py b/src/databricks/labs/lakebridge/reconcile/design/pipeline.py new file mode 100644 index 000000000..b32878e1a --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/pipeline.py @@ -0,0 +1,178 @@ +import dataclasses +from abc import ABC + +from pyspark.sql import DataFrame +from pyspark.sql.functions import col + +from databricks.labs.lakebridge.reconcile.connectors.data_source import DataSource +from databricks.labs.lakebridge.reconcile.design.expressions import HashExpressionsBuilder, DialectType, QueryBuilder +from databricks.labs.lakebridge.reconcile.design.normalizers import DialectNormalizer, ExternalColumnDefinition, \ + NormalizersRegistry +from databricks.labs.lakebridge.reconcile.design.utypes import ExternalType, ColumnTypeName +from databricks.labs.lakebridge.reconcile.recon_config import Table, Schema + + +@dataclasses.dataclass +class ReconcileOutput: + status: str + name: str + recon_id: str + details: dict + + def __init__(self, status: bool, name: str, recon_id: str, details: dict): + self.status = "success" if status else "failure" + self.name = name + self.recon_id = recon_id + self.details = details + +@dataclasses.dataclass +class ReconcileLayerContext: + layer: str + dialect: DialectType + source: DataSource + normalizer: DialectNormalizer + +@dataclasses.dataclass +class ReconcileRequest: + recon_id: str + reconcile_type: str + source_context: ReconcileLayerContext + target_context: ReconcileLayerContext + config: Table + +class Reconcile: + + def reconcile(self, config: ReconcileRequest) -> ReconcileOutput: + pass + +class ComparisonStrategy(ABC): + name: str + registry: NormalizersRegistry + + def query_source_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def query_target_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def compare_data(self, config: ReconcileRequest, source_data: DataFrame, target_data: DataFrame) -> ReconcileOutput: + pass + + def execute(self, config: ReconcileRequest) -> ReconcileOutput: + return self.compare_data( + config, + self.query_source_data(config), + self.query_target_data(config) + ) + + @classmethod + def get_strategy(cls, name: str) -> "ComparisonStrategy": + strategies = { + "row": HashBasedComparisonStrategy, + "aggregate": AggregateComparisonStrategy, + "schema": SchemaComparisonStrategy, + "data": DataComparisonStrategy, + "all": AllComparisonsStrategy, + } + strategy_class = strategies.get(name.lower()) + if not strategy_class: + raise ValueError(f"Unknown reconcile strategy name: {name}") + return strategy_class() + +class HashBasedComparisonStrategy(ComparisonStrategy): + name = "row" + + hash_column_name = "hash_value_recon" + + def build_query(self, layer: ReconcileLayerContext, schema: list[Schema], config: Table): + columns = [ExternalColumnDefinition(s.column_name, ExternalType(ColumnTypeName(s.data_type))) for s in schema] + normalized = [layer.normalizer.normalize(c) for c in columns] + + join_columns = [jc for jc in normalized if jc.column_name in config.join_columns] + hashed = HashExpressionsBuilder(layer.dialect, normalized).alias(self.hash_column_name) + select_cols = join_columns.copy() + select_cols.append(hashed) + query = QueryBuilder(layer.dialect, select_cols) + return query + + def query_source_data(self, config: ReconcileRequest) -> DataFrame: + source_table_fqn = ".".join(["catalog", "schema", config.config.source_name]) # FIXME implement + schema = config.source_context.source.get_schema(None, "", source_table_fqn, False) + query = self.build_query(config.source_context, schema, config.config) + df = config.source_context.source.read_data(None, "", source_table_fqn, query.build(), config.config.jdbc_reader_options) + df = df.alias("src") + return df + + def query_target_data(self, config: ReconcileRequest) -> DataFrame: + target_table_fqn = ".".join(["catalog", "schema", config.config.target_name]) # FIXME implement + schema = config.source_context.source.get_schema(None, "", target_table_fqn, False) + query = self.build_query(config.target_context, schema, config.config) + df = config.source_context.source.read_data(None, "", target_table_fqn, query.build(), config.config.jdbc_reader_options) + df = df.alias("tgt") + return df + + def compare_data(self, config: ReconcileRequest, source_data: DataFrame, target_data: DataFrame) -> ReconcileOutput: + joined = source_data.join(target_data, + col(f"src.{self.hash_column_name}") == col(f"tgt.{self.hash_column_name}"), + 'outer') + + successful = joined.count() == source_data.count() == target_data.count() # Very basic comparison for now + sample = max(1.0, 100/joined.count()) + return ReconcileOutput(successful, self.name, config.recon_id, {"sample_output": joined.sample(sample, 42).collect()}) + +class AggregateComparisonStrategy(ComparisonStrategy): + name = "aggregate" + + def query_source_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def query_target_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def compare_data(self, source_data: DataFrame, target_data: DataFrame) -> ReconcileOutput: + pass + +class SchemaComparisonStrategy(ComparisonStrategy): + name = "schema" + + def query_source_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def query_target_data(self, config: ReconcileRequest) -> DataFrame: + pass + + def compare_data(self, source_data: DataFrame, target_data: DataFrame) -> ReconcileOutput: + pass + +class DataComparisonStrategy(ComparisonStrategy): + name = "data" + + def execute(self, config: ReconcileRequest) -> ReconcileOutput: + pass + +class AllComparisonsStrategy(ComparisonStrategy): + name = "all" + + def execute(self, config: ReconcileRequest) -> ReconcileOutput: + schema = SchemaComparisonStrategy().execute(config) + row = HashBasedComparisonStrategy().execute(config) + column = DataComparisonStrategy().execute(config) + + return ReconcileOutput( + status=( + self._map_status(schema.status) + and self._map_status(row.status) + and self._map_status(column.status) + ), + name="all", + recon_id=config.recon_id, + details={ + "schema": schema.details, + "row": row.details, + "column": column.details + } + ) + + @classmethod + def _map_status(cls, status: str) -> bool: + return status.lower() == "success" diff --git a/src/databricks/labs/lakebridge/reconcile/design/type_normalizers.csv b/src/databricks/labs/lakebridge/reconcile/design/type_normalizers.csv new file mode 100644 index 000000000..1bd108ed3 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/type_normalizers.csv @@ -0,0 +1,25 @@ +type;dialect;transformation;notes +string;databricks;COALESCE({}, 'null_recon');already string +varchar;databricks;COALESCE({}, 'null_recon');alias of string +char;databricks;COALESCE(RTRIM({}), 'null_recon');trim padding +timestamp;databricks;COALESCE(date_format({}, 'yyyy-MM-ddTHH:mm:ss.SSSZ'), 'null_recon');assumes UTC already +array;databricks;COALESCE(CONCAT_WS(',', ARRAY_SORT(FILTER({}, x -> x IS NOT NULL))), 'null_recon');filter NULLs then sort then join +string;snowflake;COALESCE({}, 'null_recon');already string +varchar;snowflake;COALESCE({}, 'null_recon');alias of string +text;snowflake;COALESCE({}, 'null_recon');alias of string +char;snowflake;COALESCE(RTRIM({}), 'null_recon');trim padding +timestamp_ntz;snowflake;COALESCE(TO_VARCHAR({}::TIMESTAMP_NTZ, 'YYYY-MM-DDTHH24:MI:SS.FF3Z'), 'null_recon');UTC ISO-8601 with millis +timestamp_tz;snowflake;COALESCE(TO_VARCHAR(CONVERT_TIMEZONE('UTC', {}::TIMESTAMP_TZ), 'YYYY-MM-DDTHH24:MI:SS.FF3Z'), 'null_recon');convert to UTC then format +timestamp_ltz;snowflake;COALESCE(TO_VARCHAR(CONVERT_TIMEZONE('UTC', {}::TIMESTAMP_LTZ), 'YYYY-MM-DDTHH24:MI:SS.FF3Z'), 'null_recon');convert to UTC then format +array;snowflake;COALESCE(ARRAY_TO_STRING(ARRAY_SORT(ARRAY_COMPACT({})), ','), 'null_recon');remove NULLs then sort then join +string;tsql;COALESCE({}, 'null_recon');already string +varchar;tsql;COALESCE({}, 'null_recon');alias of string +char;tsql;COALESCE(RTRIM({}), 'null_recon');trim padding +datetime;tsql;COALESCE(CONVERT(varchar(24), CAST(TODATETIMEOFFSET(CAST({} AS datetime2(3)), '+00:00') AS datetime2(3)), 126) + 'Z', 'null_recon');assume UTC then format +date;tsql;COALESCE(CONVERT(varchar(10), CAST({} AS date), 23), 'null_recon');YYYY-MM-DD +time;tsql;COALESCE(CONVERT(varchar(12), CAST({} AS time(3)), 114), 'null_recon');HH:mm:ss.fff +string;oracle;NVL({}, 'null_recon');already string +varchar2;oracle;NVL({}, 'null_recon');already string +nvarchar2;oracle;NVL({}, 'null_recon');already string +char;oracle;NVL(TRIM({}), 'null_recon');trim padding +nchar;oracle;NVL(TRIM({}), 'null_recon');trim padding diff --git a/src/databricks/labs/lakebridge/reconcile/design/utils.py b/src/databricks/labs/lakebridge/reconcile/design/utils.py new file mode 100644 index 000000000..622425ab8 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/utils.py @@ -0,0 +1,13 @@ +from enum import Enum + + +class AutoName(Enum): + """ + This is used for creating Enum classes where `auto()` is the string form + of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). + + Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values + """ + + def _generate_next_value_(name, _start, _count, _last_values): + return name diff --git a/src/databricks/labs/lakebridge/reconcile/design/utypes.py b/src/databricks/labs/lakebridge/reconcile/design/utypes.py new file mode 100644 index 000000000..de02fd906 --- /dev/null +++ b/src/databricks/labs/lakebridge/reconcile/design/utypes.py @@ -0,0 +1,149 @@ +""" +manage the mappings universally because the types do not divert much between the systems +""" +import dataclasses +from abc import ABC +from enum import auto + +from databricks.labs.lakebridge.reconcile.design.utils import AutoName + + +class ColumnTypeName(AutoName): + ARRAY = auto() + AGGREGATEFUNCTION = auto() + SIMPLEAGGREGATEFUNCTION = auto() + BIGDECIMAL = auto() + BIGINT = auto() + BIGSERIAL = auto() + BINARY = auto() + BIT = auto() + BOOLEAN = auto() + BPCHAR = auto() + CHAR = auto() + DATE = auto() + DATE32 = auto() + DATEMULTIRANGE = auto() + DATERANGE = auto() + DATETIME = auto() + DATETIME2 = auto() + DATETIME64 = auto() + DECIMAL = auto() + DECIMAL32 = auto() + DECIMAL64 = auto() + DECIMAL128 = auto() + DECIMAL256 = auto() + DOUBLE = auto() + ENUM = auto() + ENUM8 = auto() + ENUM16 = auto() + FIXEDSTRING = auto() + FLOAT = auto() + GEOGRAPHY = auto() + GEOMETRY = auto() + POINT = auto() + RING = auto() + LINESTRING = auto() + MULTILINESTRING = auto() + POLYGON = auto() + MULTIPOLYGON = auto() + HLLSKETCH = auto() + HSTORE = auto() + IMAGE = auto() + INET = auto() + INT = auto() + INT128 = auto() + INT256 = auto() + INT4MULTIRANGE = auto() + INT4RANGE = auto() + INT8MULTIRANGE = auto() + INT8RANGE = auto() + INTERVAL = auto() + IPADDRESS = auto() + IPPREFIX = auto() + IPV4 = auto() + IPV6 = auto() + JSON = auto() + JSONB = auto() + LIST = auto() + LONGBLOB = auto() + LONGTEXT = auto() + LOWCARDINALITY = auto() + MAP = auto() + MEDIUMBLOB = auto() + MEDIUMINT = auto() + MEDIUMTEXT = auto() + MONEY = auto() + NAME = auto() + NCHAR = auto() + NESTED = auto() + NULL = auto() + NUMMULTIRANGE = auto() + NUMRANGE = auto() + NVARCHAR = auto() + OBJECT = auto() + RANGE = auto() + ROWVERSION = auto() + SERIAL = auto() + SET = auto() + SMALLDATETIME = auto() + SMALLINT = auto() + SMALLMONEY = auto() + SMALLSERIAL = auto() + STRUCT = auto() + SUPER = auto() + TEXT = auto() + TINYBLOB = auto() + TINYTEXT = auto() + TIME = auto() + TIMETZ = auto() + TIMESTAMP = auto() + TIMESTAMPNTZ = auto() + TIMESTAMPLTZ = auto() + TIMESTAMPTZ = auto() + TIMESTAMP_S = auto() + TIMESTAMP_MS = auto() + TIMESTAMP_NS = auto() + TINYINT = auto() + TSMULTIRANGE = auto() + TSRANGE = auto() + TSTZMULTIRANGE = auto() + TSTZRANGE = auto() + UBIGINT = auto() + UINT = auto() + UINT128 = auto() + UINT256 = auto() + UMEDIUMINT = auto() + UDECIMAL = auto() + UNION = auto() + UNIQUEIDENTIFIER = auto() + UNKNOWN = auto() # Sentinel value, useful for type annotation + USERDEFINED = "USER-DEFINED" + USMALLINT = auto() + UTINYINT = auto() + UUID = auto() + VARBINARY = auto() + VARCHAR = auto() + VARCHAR2 = auto() # ORACLE specific + VARIANT = auto() + VECTOR = auto() + XML = auto() + YEAR = auto() + TDIGEST = auto() + +@dataclasses.dataclass(frozen=True) +class _ColumnType(ABC): + name: ColumnTypeName + + def __eq__(self, other): + return isinstance(other, _ColumnType) and self.name == other.name + def __hash__(self): + return hash(f"DatabaseType/{self.name}") + +@dataclasses.dataclass(frozen=True) +class UType(_ColumnType): # subtyped to be more strict where it comes from. might be overkill + name: ColumnTypeName + + +@dataclasses.dataclass(frozen=True) +class ExternalType(_ColumnType): # can be sybtyped if needed to override equal e.g. combining all oracle char columns in one + name: ColumnTypeName