Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b1ac4eb
fix oracle jdbc url
m-abulazm Oct 6, 2025
95e4fdc
add oracle and snowflake jars to local spark setup
m-abulazm Oct 7, 2025
c4a2ea1
add base for read schema integration specs
m-abulazm Oct 7, 2025
b5f8100
use localhost for `test_oracle_read_schema_happy`
m-abulazm Oct 9, 2025
6e3ddb7
fix schema compare for oracle and snowflake
m-abulazm Oct 9, 2025
2c2295d
add oracle recon test
m-abulazm Oct 9, 2025
73f0230
fix oracle transformations and hashing
m-abulazm Oct 13, 2025
fc43b96
improve recon logging
m-abulazm Oct 13, 2025
ab07c04
improve oracle integration test
m-abulazm Oct 13, 2025
6f937a6
fmt
m-abulazm Oct 13, 2025
fa6f93b
fix outdated specs
m-abulazm Oct 13, 2025
bfffbfc
add error handling in setup oracle spark infra
m-abulazm Oct 13, 2025
e88fa50
use correct oracle jdbc driver class instead of deprecated one
m-abulazm Oct 13, 2025
2222c96
fix outdated spec
m-abulazm Oct 13, 2025
5aba442
add reading secrets from env getter for iontegration specs
m-abulazm Oct 13, 2025
fa23add
Merge branch 'main' into fix/recon/oracle
m-abulazm Oct 14, 2025
e1b5a83
Merge branch 'main' into fix/recon/oracle
m-abulazm Oct 17, 2025
47ca8e8
use `row` reconcile to catch hashing inconsistencies
m-abulazm Oct 20, 2025
d8266bd
add setup oracle script
m-abulazm Oct 21, 2025
fa8a022
productionize oracle setup
m-abulazm Oct 22, 2025
4f1dec0
Merge branch 'main' into fix/recon/oracle
m-abulazm Oct 22, 2025
7edda10
add test to improve coverage
m-abulazm Oct 27, 2025
1dd8fa4
remove value error check as type checker solves it
m-abulazm Oct 27, 2025
243de22
one more coverage trick
m-abulazm Oct 27, 2025
493f9ec
fmt
m-abulazm Oct 27, 2025
12ea4ad
fix oracle url bug
m-abulazm Oct 28, 2025
0b09ea1
Merge branch 'main' into fix/recon/oracle
m-abulazm Oct 28, 2025
eaa68ad
Merge branch 'main' into fix/recon/oracle
m-abulazm Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions .github/scripts/setup_local_oracle.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env bash
set -Eeuo pipefail

# Config
ORACLE_CONTAINER="${ORACLE_CONTAINER:-oracle-free}"
ORACLE_IMAGE="${ORACLE_IMAGE:-container-registry.oracle.com/database/free:latest-lite}"
ORACLE_PORT="${ORACLE_PORT:-1521}"
ORACLE_PWD="${ORACLE_PWD:?export ORACLE_PWD for SYS}"
ORACLE_SID="${ORACLE_SID:-FREEPDB1}"

# Dependencies
command -v docker >/dev/null || { echo "Docker not installed" >&2; exit 2; }

# Image present?
docker image inspect "${ORACLE_IMAGE}" >/dev/null 2>&1 || docker pull "${ORACLE_IMAGE}"

# Start container if needed
if docker ps --format '{{.Names}}' | grep -qx "${ORACLE_CONTAINER}"; then
:
elif docker ps -a --format '{{.Names}}' | grep -qx "${ORACLE_CONTAINER}"; then
docker start "${ORACLE_CONTAINER}" >/dev/null
else
docker run --name "${ORACLE_CONTAINER}" \
-p "${ORACLE_PORT}:1521" \
-e ORACLE_PWD="${ORACLE_PWD}" \
-d "${ORACLE_IMAGE}" >/dev/null
echo "Starting Oracle container..."
fi

echo "Waiting up to 5 minutes for Oracle to be healthy..."
MAX_WAIT=300; WAIT_INTERVAL=5; waited=0
while :; do
state="$(docker inspect -f '{{.State.Health.Status}}' "${ORACLE_CONTAINER}" 2>/dev/null || true)"
echo "health=${state:-unknown} waited=${waited}s"
[[ "$state" == "healthy" ]] && break
(( waited >= MAX_WAIT )) && { echo "ERROR: Oracle not healthy in 300s" >&2; exit 1; }
sleep "$WAIT_INTERVAL"; waited=$((waited + WAIT_INTERVAL))
done
echo "Oracle is fully started."

# SQL bootstrap as SYSDBA, target SYSTEM schema
docker exec -i "${ORACLE_CONTAINER}" bash -lc \
"sqlplus -L -s 'sys/${ORACLE_PWD}@localhost:${ORACLE_PORT}/${ORACLE_SID} as sysdba'" <<'SQL'
WHENEVER SQLERROR EXIT SQL.SQLCODE
SET ECHO ON FEEDBACK ON PAGESIZE 200 LINESIZE 32767 SERVEROUTPUT ON

-- reconcile queries executes DBMS_CRYPTO
GRANT EXECUTE ON DBMS_CRYPTO TO SYSTEM;

-- work in SYSTEM, not SYS
ALTER SESSION SET CURRENT_SCHEMA=SYSTEM;

-- create table if not exists (guard ORA-00955)
BEGIN
EXECUTE IMMEDIATE q'[
CREATE TABLE SOURCE_TABLE (
ID NUMBER(15,0),
DESCR CHAR(30 CHAR),
YEAR NUMBER(4,0),
DATEE DATE,
CONSTRAINT PK_SOURCE_TABLE PRIMARY KEY (ID)
)
]';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN RAISE; END IF;
END;
/

-- truncate if exists
DECLARE n INTEGER;
BEGIN
SELECT COUNT(*) INTO n FROM USER_TABLES WHERE TABLE_NAME='SOURCE_TABLE';
IF n=1 THEN EXECUTE IMMEDIATE 'TRUNCATE TABLE SOURCE_TABLE'; END IF;
END;
/

-- idempotent load
MERGE INTO SOURCE_TABLE t
USING (
SELECT 1001 ID, 'Cycle 1' DESCR, 2025 YEAR, DATE '2025-01-01' DATEE FROM DUAL UNION ALL
SELECT 1002, 'Cycle 2', 2025, DATE '2025-02-01' FROM DUAL UNION ALL
SELECT 1003, 'Cycle 3', 2025, DATE '2025-03-01' FROM DUAL UNION ALL
SELECT 1004, 'Cycle 4', 2025, DATE '2025-04-15' FROM DUAL UNION ALL
SELECT 1005, 'Cycle 5', 2025, DATE '2025-05-01' FROM DUAL
) s
ON (t.ID = s.ID)
WHEN MATCHED THEN UPDATE SET t.DESCR = RPAD(s.DESCR,30), t.YEAR = s.YEAR, t.DATEE = s.DATEE
WHEN NOT MATCHED THEN INSERT (ID, DESCR, YEAR, DATEE)
VALUES (s.ID, RPAD(s.DESCR,30), s.YEAR, s.DATEE);

COMMIT;
SQL

echo "Oracle DDL/DML completed."
50 changes: 46 additions & 4 deletions .github/scripts/setup_spark_remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ fi

spark=spark-${version}-bin-hadoop3
spark_connect="spark-connect_2.12"
mssql_jdbc_version="1.4.0"
mssql_jdbc="spark-mssql-connector_2.12-${mssql_jdbc_version}-BETA"
mkdir -p "${spark}"


SERVER_SCRIPT=$HOME/spark/${spark}/sbin/start-connect-server.sh

## check the spark version already exist ,if not download the respective version
Expand All @@ -33,7 +30,18 @@ else
fi

JARS_DIR=$HOME/spark/${spark}/jars
MSSQL_JDBC_JAR=$HOME/spark/${spark}/jars/${mssql_jdbc}.jar
mssql_jdbc_version="1.4.0"
mssql_jdbc="spark-mssql-connector_2.12-${mssql_jdbc_version}-BETA"
MSSQL_JDBC_JAR=$JARS_DIR/${mssql_jdbc}.jar
ORACLE_JDBC_VERSION="19.28.0.0"
ORACLE_JDBC_JAR="ojdbc8-${ORACLE_JDBC_VERSION}.jar"
SNOWFLAKE_JDBC_VERSION="3.26.1"
SNOWFLAKE_JDBC_JAR="snowflake-jdbc-${SNOWFLAKE_JDBC_VERSION}.jar"
SNOWFLAKE_JDBC_JAR_PATH="$JARS_DIR/$SNOWFLAKE_JDBC_JAR"
SNOWFLAKE_SPARK_VERSION="2.11.2-spark_3.3"
SNOWFLAKE_SPARK_JAR="spark-snowflake_2.12-${SNOWFLAKE_SPARK_VERSION}.jar"
SNOWFLAKE_SPARK_JAR_PATH="$JARS_DIR/$SNOWFLAKE_SPARK_JAR"

if [ -f "${MSSQL_JDBC_JAR}" ];then
echo "MSSQL JAR already exists"
else
Expand All @@ -50,6 +58,40 @@ else
wget "https://github.com/microsoft/sql-spark-connector/releases/download/v${mssql_jdbc_version}/${mssql_jdbc}.jar" -O "$JARS_DIR"/${mssql_jdbc}.jar
fi

if [ -f "$JARS_DIR/$ORACLE_JDBC_JAR" ]; then
echo "Oracle JDBC JAR already exists"
else
echo "Downloading Oracle JDBC JAR"
wget "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/${ORACLE_JDBC_VERSION}/ojdbc8-${ORACLE_JDBC_VERSION}.jar" -O "$JARS_DIR/$ORACLE_JDBC_JAR"
if [ $? -ne 0 ]; then
echo "Failed to download Oracle JDBC JAR"
exit 1
fi
fi


if [ -f "${SNOWFLAKE_JDBC_JAR_PATH}" ]; then
echo "Snowflake JDBC JAR already exists"
else
echo "Downloading Snowflake JDBC JAR"
wget "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/${SNOWFLAKE_JDBC_VERSION}/${SNOWFLAKE_JDBC_JAR}" -O "${SNOWFLAKE_JDBC_JAR_PATH}"
if [ $? -ne 0 ]; then
echo "Failed to download Snowflake JDBC JAR"
exit 1
fi
fi
if [ -f "${SNOWFLAKE_SPARK_JAR_PATH}" ]; then
echo "Snowflake Spark Connector JAR already exists"
else
echo "Downloading Snowflake Spark Connector JAR"
wget "https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/${SNOWFLAKE_SPARK_VERSION}/${SNOWFLAKE_SPARK_JAR}" -O "${SNOWFLAKE_SPARK_JAR_PATH}"
if [ $? -ne 0 ]; then
echo "Failed to download Snowflake Spark Connector JAR"
exit 1
fi
fi


cd "${spark}" || exit 1
## check spark remote is running,if not start the spark remote
## Temporary workaround for Spark Connect server still points to 3.5.5
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/lakebridge/reconcile/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def reconcile_data(

# Write unmatched df to volume
df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(df)
logger.warning(f"Unmatched data was written to {path} successfully")
logger.debug(f"Unmatched data was written to {path} successfully")

mismatch = _get_mismatch_data(df, source_alias, target_alias) if report_type in {"all", "data"} else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ class JDBCReaderMixin:
_spark: SparkSession

# TODO update the url
def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
def _get_jdbc_reader(self, query, jdbc_url, driver, additional_options: dict | None = None):
driver_class = {
"oracle": "oracle.jdbc.driver.OracleDriver",
"oracle": "oracle.jdbc.OracleDriver",
"snowflake": "net.snowflake.client.jdbc.SnowflakeDriver",
"sqlserver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
Expand All @@ -21,8 +21,9 @@ def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
.option("driver", driver_class.get(driver, driver))
.option("dbtable", f"({query}) tmp")
)
if prepare_query is not None:
reader = reader.option('prepareQuery', prepare_query)
if isinstance(additional_options, dict):
for key, value in additional_options.items():
reader = reader.option(key, value)
return reader

@staticmethod
Expand Down
14 changes: 9 additions & 5 deletions src/databricks/labs/lakebridge/reconcile/connectors/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ def __init__(
@property
def get_jdbc_url(self) -> str:
return (
f"jdbc:{OracleDataSource._DRIVER}:thin:{self._get_secret('user')}"
f"/{self._get_secret('password')}@//{self._get_secret('host')}"
f"jdbc:{OracleDataSource._DRIVER}:thin:@//{self._get_secret('host')}"
f":{self._get_secret('port')}/{self._get_secret('database')}"
)

Expand Down Expand Up @@ -86,7 +85,7 @@ def get_schema(
schema_query = re.sub(
r'\s+',
' ',
OracleDataSource._SCHEMA_QUERY.format(table=table, owner=schema),
OracleDataSource._SCHEMA_QUERY.format(table=table.lower(), owner=schema.lower()),
)
try:
logger.debug(f"Fetching schema using query: \n`{schema_query}`")
Expand All @@ -102,14 +101,19 @@ def get_schema(
@staticmethod
def _get_timestamp_options() -> dict[str, str]:
return {
"oracle.jdbc.mapDateToTimestamp": "False",
"oracle.jdbc.mapDateToTimestamp": "false",
"sessionInitStatement": "BEGIN dbms_session.set_nls('nls_date_format', "
"'''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD "
"HH24:MI:SS''');END;",
}

def reader(self, query: str) -> DataFrameReader:
return self._get_jdbc_reader(query, self.get_jdbc_url, OracleDataSource._DRIVER)
user = self._get_secret('user')
password = self._get_secret('password')
logger.debug(f"Using user: {user} to connect to Oracle")
return self._get_jdbc_reader(
query, self.get_jdbc_url, OracleDataSource._DRIVER, {"user": user, "password": password}
)

def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
normalized = DialectUtils.normalize_identifier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def get_schema(
return self.log_and_throw_exception(e, "schema", schema_query)

def reader(self, query: str, prepare_query_str="") -> DataFrameReader:
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, prepare_query_str)
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, {"prepareQuery": prepare_query_str})

def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
return DialectUtils.normalize_identifier(
Expand Down
11 changes: 9 additions & 2 deletions src/databricks/labs/lakebridge/reconcile/query_builder/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC

import sqlglot
import sqlglot.expressions as exp
from sqlglot import Dialect, parse_one

Expand Down Expand Up @@ -135,8 +136,14 @@ def _get_transform(datatype: str):

source_mapping = DataType_transform_mapping.get(source_dialect, {})

if source_mapping.get(datatype.upper()) is not None:
return source_mapping.get(datatype.upper())
parsed = datatype
try:
parsed = exp.DataType.build(datatype, source).this.value
except sqlglot.errors.ParseError:
logger.warning(f"Could not parse datatype {datatype} for source {source_dialect}")

if source_mapping.get(parsed) is not None:
return source_mapping.get(parsed)
if source_mapping.get("default") is not None:
return source_mapping.get("default")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _get_is_string(column_types_dict: dict[str, DataType], column_name: str) ->
exp.DataType.Type.NCHAR.value: [
partial(anonymous, func="NVL(TRIM(TO_CHAR({})),'_null_recon_')", dialect=get_dialect("oracle"))
],
exp.DataType.Type.NVARCHAR.value: [
exp.DataType.Type.CHAR.value: [
partial(anonymous, func="NVL(TRIM(TO_CHAR({})),'_null_recon_')", dialect=get_dialect("oracle"))
],
},
Expand Down Expand Up @@ -281,7 +281,10 @@ def _get_is_string(column_types_dict: dict[str, DataType], column_name: str) ->
),
get_dialect("oracle"): HashAlgoMapping(
source=partial(
anonymous, func="DBMS_CRYPTO.HASH(RAWTOHEX({}), 2)", is_expr=True, dialect=get_dialect("oracle")
anonymous,
func="DBMS_CRYPTO.HASH(UTL_I18N.STRING_TO_RAW({}, 'AL32UTF8'), 4)",
is_expr=True,
dialect=get_dialect("oracle"),
),
target=md5_partial,
),
Expand Down
4 changes: 1 addition & 3 deletions src/databricks/labs/lakebridge/reconcile/recon_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def clean_unmatched_df_from_volume(self):
# workspace_client.dbfs.delete(path, recursive=True)
empty_df = self.spark.createDataFrame([], schema=StructType([StructField("empty", StringType(), True)]))
empty_df.write.format("parquet").mode("overwrite").save(self.path)
logger.warning(f"Unmatched DF cleaned up from {self.path} successfully.")
logger.debug(f"Unmatched DF cleaned up from {self.path} successfully.")
except PySparkException as e:
message = f"Error cleaning up unmatched DF from {self.path} volumes --> {e}"
logger.error(message)
Expand Down Expand Up @@ -296,8 +296,6 @@ def _is_mismatch_within_threshold_limits(
total_mismatch_count = (
data_reconcile_output.mismatch_count + data_reconcile_output.threshold_output.threshold_mismatch_count
)
logger.info(f"total_mismatch_count : {total_mismatch_count}")
logger.warning(f"reconciled_record_count : {record_count}")
# if the mismatch count is 0 then no need of checking bounds.
if total_mismatch_count == 0:
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N

1. This works by creating two SQL queries. both queries are a create table statement with a single column:
* first query uses the source column name and datatype to simulate the source.
* we escape the ansi name with sqlglot because normalize is not implemented for oracle and snowflake
* second query uses the same column name as ansi and databricks datatype to simulate databricks.
* we don't use the databricks column name as it may have been renamed.
* renaming is checked in the previous step to retrieve the databricks column.
Expand All @@ -117,7 +118,8 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N
:param master: source and target column names and datatypes computed by previous step.
"""
target = get_dialect("databricks")
source_query = f"create table dummy ({master.source_column_normalized} {master.source_datatype})"
source_column_normalized = cls._escape_source_column(source, target, master.source_column_normalized_ansi)
source_query = f"create table dummy ({source_column_normalized} {master.source_datatype})"
converted_source_query = cls._parse(source, target, source_query)
databricks_query = f"create table dummy ({master.source_column_normalized_ansi} {master.databricks_datatype})"
converted_databricks_query = cls._parse(target, source, databricks_query)
Expand All @@ -141,6 +143,10 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N
def _parse(cls, source: Dialect, target: Dialect, source_query: str) -> str:
return parse_one(source_query, read=source).sql(dialect=target).replace(", ", ",")

@classmethod
def _escape_source_column(cls, source: Dialect, target: Dialect, ansi_column: str) -> str:
return parse_one(ansi_column, read=target).sql(dialect=source).replace(", ", ",")

def compare(
self,
source_schema: list[Schema],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def recon_one(
recon_capture: ReconCapture,
reconcile_config: ReconcileConfig,
table_conf: Table,
):
) -> tuple[SchemaReconcileOutput, DataReconcileOutput]:
normalized_table_conf = NormalizeReconConfigService(
reconciler.source, reconciler.target
).normalize_recon_table_config(table_conf)
Expand All @@ -130,6 +130,8 @@ def recon_one(
recon_process_duration,
)

return schema_reconcile_output, data_reconcile_output

@staticmethod
def _do_recon_one(reconciler: Reconciliation, reconcile_config: ReconcileConfig, table_conf: Table):
recon_process_duration = ReconcileProcessDuration(start_ts=str(datetime.now()), end_ts=None)
Expand All @@ -150,7 +152,7 @@ def _do_recon_one(reconciler: Reconciliation, reconcile_config: ReconcileConfig,
src_schema=src_schema,
tgt_schema=tgt_schema,
)
logger.warning("Schema comparison is completed.")
logger.info("Schema comparison is completed.")

if reconciler.report_type in {"data", "row", "all"}:
data_reconcile_output = TriggerReconService._run_reconcile_data(
Expand All @@ -159,7 +161,7 @@ def _do_recon_one(reconciler: Reconciliation, reconcile_config: ReconcileConfig,
src_schema=src_schema,
tgt_schema=tgt_schema,
)
logger.warning(f"Reconciliation for '{reconciler.report_type}' report completed.")
logger.info(f"Reconciliation for '{reconciler.report_type}' report completed.")

recon_process_duration.end_ts = str(datetime.now())
return schema_reconcile_output, data_reconcile_output, recon_process_duration
Expand Down
Loading
Loading