Skip to content

Commit accda3a

Browse files
committed
fix oracle jdbc url
1 parent b8259a7 commit accda3a

File tree

4 files changed

+22
-15
lines changed

4 files changed

+22
-15
lines changed

src/databricks/labs/lakebridge/reconcile/connectors/jdbc_reader.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class JDBCReaderMixin:
99
_spark: SparkSession
1010

1111
# TODO update the url
12-
def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
12+
def _get_jdbc_reader(self, query, jdbc_url, driver, additional_options: dict | None = None):
1313
driver_class = {
1414
"oracle": "oracle.jdbc.driver.OracleDriver",
1515
"snowflake": "net.snowflake.client.jdbc.SnowflakeDriver",
@@ -21,8 +21,9 @@ def _get_jdbc_reader(self, query, jdbc_url, driver, prepare_query=None):
2121
.option("driver", driver_class.get(driver, driver))
2222
.option("dbtable", f"({query}) tmp")
2323
)
24-
if prepare_query is not None:
25-
reader = reader.option('prepareQuery', prepare_query)
24+
if isinstance(additional_options, dict):
25+
for key, value in additional_options.items():
26+
reader = reader.option(key, value)
2627
return reader
2728

2829
@staticmethod

src/databricks/labs/lakebridge/reconcile/connectors/oracle.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ def __init__(
4949
@property
5050
def get_jdbc_url(self) -> str:
5151
return (
52-
f"jdbc:{OracleDataSource._DRIVER}:thin:{self._get_secret('user')}"
53-
f"/{self._get_secret('password')}@//{self._get_secret('host')}"
52+
f"jdbc:{OracleDataSource._DRIVER}:thin@//{self._get_secret('host')}"
5453
f":{self._get_secret('port')}/{self._get_secret('database')}"
5554
)
5655

@@ -86,7 +85,7 @@ def get_schema(
8685
schema_query = re.sub(
8786
r'\s+',
8887
' ',
89-
OracleDataSource._SCHEMA_QUERY.format(table=table, owner=schema),
88+
OracleDataSource._SCHEMA_QUERY.format(table=table.lower(), owner=schema.lower()),
9089
)
9190
try:
9291
logger.debug(f"Fetching schema using query: \n`{schema_query}`")
@@ -109,7 +108,12 @@ def _get_timestamp_options() -> dict[str, str]:
109108
}
110109

111110
def reader(self, query: str) -> DataFrameReader:
112-
return self._get_jdbc_reader(query, self.get_jdbc_url, OracleDataSource._DRIVER)
111+
user = self._get_secret('user')
112+
password = self._get_secret('password')
113+
logger.debug(f"Using user: {user} to connect to Oracle")
114+
return self._get_jdbc_reader(
115+
query, self.get_jdbc_url, OracleDataSource._DRIVER, {"user": user, "password": password}
116+
)
113117

114118
def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
115119
normalized = DialectUtils.normalize_identifier(

src/databricks/labs/lakebridge/reconcile/connectors/tsql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def get_schema(
134134
return self.log_and_throw_exception(e, "schema", schema_query)
135135

136136
def reader(self, query: str, prepare_query_str="") -> DataFrameReader:
137-
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, prepare_query_str)
137+
return self._get_jdbc_reader(query, self.get_jdbc_url, self._DRIVER, {"prepareQuery": prepare_query_str})
138138

139139
def normalize_identifier(self, identifier: str) -> NormalizedIdentifier:
140140
return DialectUtils.normalize_identifier(

tests/unit/reconcile/connectors/test_oracle.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ def test_read_data_with_options():
7272
spark.read.format.assert_called_with("jdbc")
7373
spark.read.format().option.assert_called_with(
7474
"url",
75-
"jdbc:oracle:thin:my_user/my_password@//my_host:777/my_database",
75+
"jdbc:oracle:thin@//my_host:777/my_database",
7676
)
7777
spark.read.format().option().option.assert_called_with("driver", "oracle.jdbc.driver.OracleDriver")
7878
spark.read.format().option().option().option.assert_called_with("dbtable", "(select 1 from data.employee) tmp")
79-
actual_args = spark.read.format().option().option().option().options.call_args.kwargs
80-
expected_args = {
79+
spark.read.format().option().option().option().option.assert_called_with("user", "my_user")
80+
spark.read.format().option().option().option().option().option.assert_called_with("password", "my_password")
81+
jdbc_actual_args = spark.read.format().option().option().option().option().option().options.call_args.kwargs
82+
jdbc_expected_args = {
8183
"numPartitions": 50,
8284
"partitionColumn": "s_nationkey",
8385
"lowerBound": '0',
@@ -88,8 +90,8 @@ def test_read_data_with_options():
8890
r"'''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD "
8991
r"HH24:MI:SS''');END;",
9092
}
91-
assert actual_args == expected_args
92-
spark.read.format().option().option().option().options().load.assert_called_once()
93+
assert jdbc_actual_args == jdbc_expected_args
94+
spark.read.format().option().option().option().option().option().options().load.assert_called_once()
9395

9496

9597
def test_get_schema():
@@ -141,7 +143,7 @@ def test_read_data_exception_handling():
141143
filters=None,
142144
)
143145

144-
spark.read.format().option().option().option().options().load.side_effect = RuntimeError("Test Exception")
146+
spark.read.format().option().option().option().option().option().options().load.side_effect = RuntimeError("Test Exception")
145147

146148
# Call the read_data method with the Tables configuration and assert that a PySparkException is raised
147149
with pytest.raises(
@@ -156,7 +158,7 @@ def test_get_schema_exception_handling():
156158
engine, spark, ws, scope = initial_setup()
157159
ords = OracleDataSource(engine, spark, ws, scope)
158160

159-
spark.read.format().option().option().option().load.side_effect = RuntimeError("Test Exception")
161+
spark.read.format().option().option().option().option().option().load.side_effect = RuntimeError("Test Exception")
160162

161163
# Call the get_schema method with predefined table, schema, and catalog names and assert that a PySparkException
162164
# is raised

0 commit comments

Comments
 (0)