Skip to content

Commit 3002743

Browse files
authored
Improve oracle support in reconcile (#2081)
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST --> ## Changes <!-- Summary of your changes that are easy to understand. Add screenshots when necessary, they're helpful to illustrate the before and after state --> ### What does this PR do? * Fix Oracle JDBC URL: move credentials out of URL into options, thin syntax corrected. Pass user and password through options * Hashing/expr pipeline: replace bad `RAWTOHEX(...), 2` with `UTL_I18N.STRING_TO_RAW(...,'AL32UTF8'), 4` (SHA-256). * Small fix for oracle and snowflake schema compare. * default transformations: tweak datatype parsing so oracle works correctly. * Log levels: demote noisy warnings to debug/info. * Infra: add Oracle/Snowflake jars in setup script. * Tests: extend integration scaffolding for Oracle/Snowflake, add e2e test. ### Linked issues <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Resolves #2057 ### Functionality - [ ] added relevant user documentation ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [X] manually tested - [ ] added unit tests - [X] added integration tests
1 parent 7be6421 commit 3002743

File tree

19 files changed

+427
-45
lines changed

19 files changed

+427
-45
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#!/usr/bin/env bash
2+
set -Eeuo pipefail
3+
4+
# Config
5+
ORACLE_CONTAINER="${ORACLE_CONTAINER:-oracle-free}"
6+
ORACLE_IMAGE="${ORACLE_IMAGE:-container-registry.oracle.com/database/free:latest-lite}"
7+
ORACLE_PORT="${ORACLE_PORT:-1521}"
8+
ORACLE_PWD="${ORACLE_PWD:?export ORACLE_PWD for SYS}"
9+
ORACLE_SID="${ORACLE_SID:-FREEPDB1}"
10+
11+
# Dependencies
12+
command -v docker >/dev/null || { echo "Docker not installed" >&2; exit 2; }
13+
14+
# Image present?
15+
docker image inspect "${ORACLE_IMAGE}" >/dev/null 2>&1 || docker pull "${ORACLE_IMAGE}"
16+
17+
# Start container if needed
18+
if docker ps --format '{{.Names}}' | grep -qx "${ORACLE_CONTAINER}"; then
19+
:
20+
elif docker ps -a --format '{{.Names}}' | grep -qx "${ORACLE_CONTAINER}"; then
21+
docker start "${ORACLE_CONTAINER}" >/dev/null
22+
else
23+
docker run --name "${ORACLE_CONTAINER}" \
24+
-p "${ORACLE_PORT}:1521" \
25+
-e ORACLE_PWD="${ORACLE_PWD}" \
26+
-d "${ORACLE_IMAGE}" >/dev/null
27+
echo "Starting Oracle container..."
28+
fi
29+
30+
echo "Waiting up to 5 minutes for Oracle to be healthy..."
31+
MAX_WAIT=300; WAIT_INTERVAL=5; waited=0
32+
while :; do
33+
state="$(docker inspect -f '{{.State.Health.Status}}' "${ORACLE_CONTAINER}" 2>/dev/null || true)"
34+
echo "health=${state:-unknown} waited=${waited}s"
35+
[[ "$state" == "healthy" ]] && break
36+
(( waited >= MAX_WAIT )) && { echo "ERROR: Oracle not healthy in 300s" >&2; exit 1; }
37+
sleep "$WAIT_INTERVAL"; waited=$((waited + WAIT_INTERVAL))
38+
done
39+
echo "Oracle is fully started."
40+
41+
# SQL bootstrap as SYSDBA, target SYSTEM schema
42+
docker exec -i "${ORACLE_CONTAINER}" bash -lc \
43+
"sqlplus -L -s 'sys/${ORACLE_PWD}@localhost:${ORACLE_PORT}/${ORACLE_SID} as sysdba'" <<'SQL'
44+
WHENEVER SQLERROR EXIT SQL.SQLCODE
45+
SET ECHO ON FEEDBACK ON PAGESIZE 200 LINESIZE 32767 SERVEROUTPUT ON
46+
47+
-- reconcile queries executes DBMS_CRYPTO
48+
GRANT EXECUTE ON DBMS_CRYPTO TO SYSTEM;
49+
50+
-- work in SYSTEM, not SYS
51+
ALTER SESSION SET CURRENT_SCHEMA=SYSTEM;
52+
53+
-- create table if not exists (guard ORA-00955)
54+
BEGIN
55+
EXECUTE IMMEDIATE q'[
56+
CREATE TABLE SOURCE_TABLE (
57+
ID NUMBER(15,0),
58+
DESCR CHAR(30 CHAR),
59+
YEAR NUMBER(4,0),
60+
DATEE DATE,
61+
CONSTRAINT PK_SOURCE_TABLE PRIMARY KEY (ID)
62+
)
63+
]';
64+
EXCEPTION
65+
WHEN OTHERS THEN
66+
IF SQLCODE != -955 THEN RAISE; END IF;
67+
END;
68+
/
69+
70+
-- truncate if exists
71+
DECLARE n INTEGER;
72+
BEGIN
73+
SELECT COUNT(*) INTO n FROM USER_TABLES WHERE TABLE_NAME='SOURCE_TABLE';
74+
IF n=1 THEN EXECUTE IMMEDIATE 'TRUNCATE TABLE SOURCE_TABLE'; END IF;
75+
END;
76+
/
77+
78+
-- idempotent load
79+
MERGE INTO SOURCE_TABLE t
80+
USING (
81+
SELECT 1001 ID, 'Cycle 1' DESCR, 2025 YEAR, DATE '2025-01-01' DATEE FROM DUAL UNION ALL
82+
SELECT 1002, 'Cycle 2', 2025, DATE '2025-02-01' FROM DUAL UNION ALL
83+
SELECT 1003, 'Cycle 3', 2025, DATE '2025-03-01' FROM DUAL UNION ALL
84+
SELECT 1004, 'Cycle 4', 2025, DATE '2025-04-15' FROM DUAL UNION ALL
85+
SELECT 1005, 'Cycle 5', 2025, DATE '2025-05-01' FROM DUAL
86+
) s
87+
ON (t.ID = s.ID)
88+
WHEN MATCHED THEN UPDATE SET t.DESCR = RPAD(s.DESCR,30), t.YEAR = s.YEAR, t.DATEE = s.DATEE
89+
WHEN NOT MATCHED THEN INSERT (ID, DESCR, YEAR, DATEE)
90+
VALUES (s.ID, RPAD(s.DESCR,30), s.YEAR, s.DATEE);
91+
92+
COMMIT;
93+
SQL
94+
95+
echo "Oracle DDL/DML completed."

.github/scripts/setup_spark_remote.sh

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@ fi
1313

1414
spark=spark-${version}-bin-hadoop3
1515
spark_connect="spark-connect_2.12"
16-
mssql_jdbc_version="1.4.0"
17-
mssql_jdbc="spark-mssql-connector_2.12-${mssql_jdbc_version}-BETA"
1816
mkdir -p "${spark}"
1917

20-
2118
SERVER_SCRIPT=$HOME/spark/${spark}/sbin/start-connect-server.sh
2219

2320
## check the spark version already exist ,if not download the respective version
@@ -33,7 +30,18 @@ else
3330
fi
3431

3532
JARS_DIR=$HOME/spark/${spark}/jars
36-
MSSQL_JDBC_JAR=$HOME/spark/${spark}/jars/${mssql_jdbc}.jar
33+
mssql_jdbc_version="1.4.0"
34+
mssql_jdbc="spark-mssql-connector_2.12-${mssql_jdbc_version}-BETA"
35+
MSSQL_JDBC_JAR=$JARS_DIR/${mssql_jdbc}.jar
36+
ORACLE_JDBC_VERSION="19.28.0.0"
37+
ORACLE_JDBC_JAR="ojdbc8-${ORACLE_JDBC_VERSION}.jar"
38+
SNOWFLAKE_JDBC_VERSION="3.26.1"
39+
SNOWFLAKE_JDBC_JAR="snowflake-jdbc-${SNOWFLAKE_JDBC_VERSION}.jar"
40+
SNOWFLAKE_JDBC_JAR_PATH="$JARS_DIR/$SNOWFLAKE_JDBC_JAR"
41+
SNOWFLAKE_SPARK_VERSION="2.11.2-spark_3.3"
42+
SNOWFLAKE_SPARK_JAR="spark-snowflake_2.12-${SNOWFLAKE_SPARK_VERSION}.jar"
43+
SNOWFLAKE_SPARK_JAR_PATH="$JARS_DIR/$SNOWFLAKE_SPARK_JAR"
44+
3745
if [ -f "${MSSQL_JDBC_JAR}" ];then
3846
echo "MSSQL JAR already exists"
3947
else
@@ -50,6 +58,40 @@ else
5058
wget "https://github.com/microsoft/sql-spark-connector/releases/download/v${mssql_jdbc_version}/${mssql_jdbc}.jar" -O "$JARS_DIR"/${mssql_jdbc}.jar
5159
fi
5260

61+
if [ -f "$JARS_DIR/$ORACLE_JDBC_JAR" ]; then
62+
echo "Oracle JDBC JAR already exists"
63+
else
64+
echo "Downloading Oracle JDBC JAR"
65+
wget "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/${ORACLE_JDBC_VERSION}/ojdbc8-${ORACLE_JDBC_VERSION}.jar" -O "$JARS_DIR/$ORACLE_JDBC_JAR"
66+
if [ $? -ne 0 ]; then
67+
echo "Failed to download Oracle JDBC JAR"
68+
exit 1
69+
fi
70+
fi
71+
72+
73+
if [ -f "${SNOWFLAKE_JDBC_JAR_PATH}" ]; then
74+
echo "Snowflake JDBC JAR already exists"
75+
else
76+
echo "Downloading Snowflake JDBC JAR"
77+
wget "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/${SNOWFLAKE_JDBC_VERSION}/${SNOWFLAKE_JDBC_JAR}" -O "${SNOWFLAKE_JDBC_JAR_PATH}"
78+
if [ $? -ne 0 ]; then
79+
echo "Failed to download Snowflake JDBC JAR"
80+
exit 1
81+
fi
82+
fi
83+
if [ -f "${SNOWFLAKE_SPARK_JAR_PATH}" ]; then
84+
echo "Snowflake Spark Connector JAR already exists"
85+
else
86+
echo "Downloading Snowflake Spark Connector JAR"
87+
wget "https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/${SNOWFLAKE_SPARK_VERSION}/${SNOWFLAKE_SPARK_JAR}" -O "${SNOWFLAKE_SPARK_JAR_PATH}"
88+
if [ $? -ne 0 ]; then
89+
echo "Failed to download Snowflake Spark Connector JAR"
90+
exit 1
91+
fi
92+
fi
93+
94+
5395
cd "${spark}" || exit 1
5496
## check spark remote is running,if not start the spark remote
5597
## Temporary workaround for Spark Connect server still points to 3.5.5

src/databricks/labs/lakebridge/reconcile/compare.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def reconcile_data(
8080

8181
# Write unmatched df to volume
8282
df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(df)
83-
logger.warning(f"Unmatched data was written to {path} successfully")
83+
logger.debug(f"Unmatched data was written to {path} successfully")
8484

8585
mismatch = _get_mismatch_data(df, source_alias, target_alias) if report_type in {"all", "data"} else None
8686

src/databricks/labs/lakebridge/reconcile/connectors/jdbc_reader.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ class JDBCReaderMixin:
99
_spark: SparkSession
1010

1111
# TODO update the url
12-
def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
12+
def _get_jdbc_reader(self, query, jdbc_url, driver, additional_options: dict | None = None):
1313
driver_class = {
14-
"oracle": "oracle.jdbc.driver.OracleDriver",
14+
"oracle": "oracle.jdbc.OracleDriver",
1515
"snowflake": "net.snowflake.client.jdbc.SnowflakeDriver",
1616
"sqlserver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
1717
}
@@ -21,8 +21,9 @@ def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
2121
.option("driver", driver_class.get(driver, driver))
2222
.option("dbtable", f"({query}) tmp")
2323
)
24-
if prepare_query is not None:
25-
reader = reader.option('prepareQuery', prepare_query)
24+
if isinstance(additional_options, dict):
25+
for key, value in additional_options.items():
26+
reader = reader.option(key, value)
2627
return reader
2728

2829
@staticmethod

src/databricks/labs/lakebridge/reconcile/connectors/oracle.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ def __init__(
4949
@property
5050
def get_jdbc_url(self) -> str:
5151
return (
52-
f"jdbc:{OracleDataSource._DRIVER}:thin:{self._get_secret('user')}"
53-
f"/{self._get_secret('password')}@//{self._get_secret('host')}"
52+
f"jdbc:{OracleDataSource._DRIVER}:thin:@//{self._get_secret('host')}"
5453
f":{self._get_secret('port')}/{self._get_secret('database')}"
5554
)
5655

@@ -86,7 +85,7 @@ def get_schema(
8685
schema_query = re.sub(
8786
r'\s+',
8887
' ',
89-
OracleDataSource._SCHEMA_QUERY.format(table=table, owner=schema),
88+
OracleDataSource._SCHEMA_QUERY.format(table=table.lower(), owner=schema.lower()),
9089
)
9190
try:
9291
logger.debug(f"Fetching schema using query: \n`{schema_query}`")
@@ -102,14 +101,19 @@ def get_schema(
102101
@staticmethod
103102
def _get_timestamp_options() -> dict[str, str]:
104103
return {
105-
"oracle.jdbc.mapDateToTimestamp": "False",
104+
"oracle.jdbc.mapDateToTimestamp": "false",
106105
"sessionInitStatement": "BEGIN dbms_session.set_nls('nls_date_format', "
107106
"'''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD "
108107
"HH24:MI:SS''');END;",
109108
}
110109

111110
def reader(self, query: str) -> DataFrameReader:
112-
return self._get_jdbc_reader(query, self.get_jdbc_url, OracleDataSource._DRIVER)
111+
user = self._get_secret('user')
112+
password = self._get_secret('password')
113+
logger.debug(f"Using user: {user} to connect to Oracle")
114+
return self._get_jdbc_reader(
115+
query, self.get_jdbc_url, OracleDataSource._DRIVER, {"user": user, "password": password}
116+
)
113117

114118
def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
115119
normalized = DialectUtils.normalize_identifier(

src/databricks/labs/lakebridge/reconcile/connectors/tsql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def get_schema(
134134
return self.log_and_throw_exception(e, "schema", schema_query)
135135

136136
def reader(self, query: str, prepare_query_str="") -> DataFrameReader:
137-
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, prepare_query_str)
137+
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, {"prepareQuery": prepare_query_str})
138138

139139
def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
140140
return DialectUtils.normalize_identifier(

src/databricks/labs/lakebridge/reconcile/query_builder/base.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from abc import ABC
33

4+
import sqlglot
45
import sqlglot.expressions as exp
56
from sqlglot import Dialect, parse_one
67

@@ -135,8 +136,14 @@ def _get_transform(datatype: str):
135136

136137
source_mapping = DataType_transform_mapping.get(source_dialect, {})
137138

138-
if source_mapping.get(datatype.upper()) is not None:
139-
return source_mapping.get(datatype.upper())
139+
parsed = datatype
140+
try:
141+
parsed = exp.DataType.build(datatype, source).this.value
142+
except sqlglot.errors.ParseError:
143+
logger.warning(f"Could not parse datatype {datatype} for source {source_dialect}")
144+
145+
if source_mapping.get(parsed) is not None:
146+
return source_mapping.get(parsed)
140147
if source_mapping.get("default") is not None:
141148
return source_mapping.get("default")
142149

src/databricks/labs/lakebridge/reconcile/query_builder/expression_generator.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def _get_is_string(column_types_dict: dict[str, DataType], column_name: str) ->
253253
exp.DataType.Type.NCHAR.value: [
254254
partial(anonymous, func="NVL(TRIM(TO_CHAR({})),'_null_recon_')", dialect=get_dialect("oracle"))
255255
],
256-
exp.DataType.Type.NVARCHAR.value: [
256+
exp.DataType.Type.CHAR.value: [
257257
partial(anonymous, func="NVL(TRIM(TO_CHAR({})),'_null_recon_')", dialect=get_dialect("oracle"))
258258
],
259259
},
@@ -281,7 +281,10 @@ def _get_is_string(column_types_dict: dict[str, DataType], column_name: str) ->
281281
),
282282
get_dialect("oracle"): HashAlgoMapping(
283283
source=partial(
284-
anonymous, func="DBMS_CRYPTO.HASH(RAWTOHEX({}), 2)", is_expr=True, dialect=get_dialect("oracle")
284+
anonymous,
285+
func="DBMS_CRYPTO.HASH(UTL_I18N.STRING_TO_RAW({}, 'AL32UTF8'), 4)",
286+
is_expr=True,
287+
dialect=get_dialect("oracle"),
285288
),
286289
target=md5_partial,
287290
),

src/databricks/labs/lakebridge/reconcile/recon_capture.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def clean_unmatched_df_from_volume(self):
6060
# workspace_client.dbfs.delete(path, recursive=True)
6161
empty_df = self.spark.createDataFrame([], schema=StructType([StructField("empty", StringType(), True)]))
6262
empty_df.write.format("parquet").mode("overwrite").save(self.path)
63-
logger.warning(f"Unmatched DF cleaned up from {self.path} successfully.")
63+
logger.debug(f"Unmatched DF cleaned up from {self.path} successfully.")
6464
except PySparkException as e:
6565
message = f"Error cleaning up unmatched DF from {self.path} volumes --> {e}"
6666
logger.error(message)
@@ -296,8 +296,6 @@ def _is_mismatch_within_threshold_limits(
296296
total_mismatch_count = (
297297
data_reconcile_output.mismatch_count + data_reconcile_output.threshold_output.threshold_mismatch_count
298298
)
299-
logger.info(f"total_mismatch_count : {total_mismatch_count}")
300-
logger.warning(f"reconciled_record_count : {record_count}")
301299
# if the mismatch count is 0 then no need of checking bounds.
302300
if total_mismatch_count == 0:
303301
return True

src/databricks/labs/lakebridge/reconcile/schema_compare.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N
106106
107107
1. This works by creating two SQL queries. both queries are a create table statement with a single column:
108108
* first query uses the source column name and datatype to simulate the source.
109+
* we escape the ansi name with sqlglot because normalize is not implemented for oracle and snowflake
109110
* second query uses the same column name as ansi and databricks datatype to simulate databricks.
110111
* we don't use the databricks column name as it may have been renamed.
111112
* renaming is checked in the previous step to retrieve the databricks column.
@@ -117,7 +118,8 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N
117118
:param master: source and target column names and datatypes computed by previous step.
118119
"""
119120
target = get_dialect("databricks")
120-
source_query = f"create table dummy ({master.source_column_normalized} {master.source_datatype})"
121+
source_column_normalized = cls._escape_source_column(source, target, master.source_column_normalized_ansi)
122+
source_query = f"create table dummy ({source_column_normalized} {master.source_datatype})"
121123
converted_source_query = cls._parse(source, target, source_query)
122124
databricks_query = f"create table dummy ({master.source_column_normalized_ansi} {master.databricks_datatype})"
123125
converted_databricks_query = cls._parse(target, source, databricks_query)
@@ -141,6 +143,10 @@ def _validate_parsed_query(cls, source: Dialect, master: SchemaMatchResult) -> N
141143
def _parse(cls, source: Dialect, target: Dialect, source_query: str) -> str:
142144
return parse_one(source_query, read=source).sql(dialect=target).replace(", ", ",")
143145

146+
@classmethod
147+
def _escape_source_column(cls, source: Dialect, target: Dialect, ansi_column: str) -> str:
148+
return parse_one(ansi_column, read=target).sql(dialect=source).replace(", ", ",")
149+
144150
def compare(
145151
self,
146152
source_schema: list[Schema],

0 commit comments

Comments
 (0)