From 1dd387738b5cce0f6a4fb6ff0e2139507ce0c6ca Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Thu, 10 Jul 2025 07:27:12 +0530 Subject: [PATCH 1/5] remove additional call on success Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/sea/backend.py | 48 ++++++++++++------- .../sql/backend/sea/models/__init__.py | 2 - .../sql/backend/sea/models/responses.py | 20 -------- tests/unit/test_sea_backend.py | 7 +-- 4 files changed, 32 insertions(+), 45 deletions(-) diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index cfb27adbd..5f6c6b12d 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -40,7 +40,6 @@ DeleteSessionRequest, StatementParameter, ExecuteStatementResponse, - GetStatementResponse, CreateSessionResponse, ) @@ -323,7 +322,7 @@ def _extract_description_from_manifest( return columns def _results_message_to_execute_response( - self, response: GetStatementResponse + self, response: ExecuteStatementResponse ) -> ExecuteResponse: """ Convert a SEA response to an ExecuteResponse and extract result data. @@ -357,6 +356,28 @@ def _results_message_to_execute_response( return execute_response + def _response_to_result_set( + self, response: ExecuteStatementResponse, cursor: Cursor + ) -> SeaResultSet: + """ + Convert a SEA response to a SeaResultSet. + """ + + # Create and return a SeaResultSet + from databricks.sql.backend.sea.result_set import SeaResultSet + + execute_response = self._results_message_to_execute_response(response) + + return SeaResultSet( + connection=cursor.connection, + execute_response=execute_response, + sea_client=self, + result_data=response.result, + manifest=response.manifest, + buffer_size_bytes=cursor.buffer_size_bytes, + arraysize=cursor.arraysize, + ) + def _check_command_not_in_failed_or_closed_state( self, state: CommandState, command_id: CommandId ) -> None: @@ -491,6 +512,10 @@ def execute_command( if async_op: return None + if response.status.state == CommandState.SUCCEEDED: + # if the response succeeded within the wait_timeout, return the results immediately + return self._response_to_result_set(response, cursor) + self._wait_until_command_done(response) return self.get_execution_result(command_id, cursor) @@ -573,7 +598,7 @@ def get_query_state(self, command_id: CommandId) -> CommandState: ) # Parse the response - response = GetStatementResponse.from_dict(response_data) + response = ExecuteStatementResponse.from_dict(response_data) return response.status.state def get_execution_result( @@ -611,22 +636,9 @@ def get_execution_result( path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id), data=request.to_dict(), ) - response = GetStatementResponse.from_dict(response_data) - - # Create and return a SeaResultSet - from databricks.sql.backend.sea.result_set import SeaResultSet - - execute_response = self._results_message_to_execute_response(response) + response = ExecuteStatementResponse.from_dict(response_data) - return SeaResultSet( - connection=cursor.connection, - execute_response=execute_response, - sea_client=self, - result_data=response.result, - manifest=response.manifest, - buffer_size_bytes=cursor.buffer_size_bytes, - arraysize=cursor.arraysize, - ) + return self._response_to_result_set(response, cursor) # == Metadata Operations == diff --git a/src/databricks/sql/backend/sea/models/__init__.py b/src/databricks/sql/backend/sea/models/__init__.py index b7c8bd399..e591f0fd4 100644 --- a/src/databricks/sql/backend/sea/models/__init__.py +++ b/src/databricks/sql/backend/sea/models/__init__.py @@ -25,7 +25,6 @@ from databricks.sql.backend.sea.models.responses import ( ExecuteStatementResponse, - GetStatementResponse, CreateSessionResponse, ) @@ -47,6 +46,5 @@ "DeleteSessionRequest", # Response models "ExecuteStatementResponse", - "GetStatementResponse", "CreateSessionResponse", ] diff --git a/src/databricks/sql/backend/sea/models/responses.py b/src/databricks/sql/backend/sea/models/responses.py index 302b32d0c..01526e3a1 100644 --- a/src/databricks/sql/backend/sea/models/responses.py +++ b/src/databricks/sql/backend/sea/models/responses.py @@ -124,26 +124,6 @@ def from_dict(cls, data: Dict[str, Any]) -> "ExecuteStatementResponse": ) -@dataclass -class GetStatementResponse: - """Representation of the response from getting information about a statement.""" - - statement_id: str - status: StatementStatus - manifest: ResultManifest - result: ResultData - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> "GetStatementResponse": - """Create a GetStatementResponse from a dictionary.""" - return cls( - statement_id=data.get("statement_id", ""), - status=_parse_status(data), - manifest=_parse_manifest(data), - result=_parse_result(data), - ) - - @dataclass class CreateSessionResponse: """Representation of the response from creating a new session.""" diff --git a/tests/unit/test_sea_backend.py b/tests/unit/test_sea_backend.py index 7eae8e5a8..f68f2d88b 100644 --- a/tests/unit/test_sea_backend.py +++ b/tests/unit/test_sea_backend.py @@ -225,7 +225,7 @@ def test_command_execution_sync( mock_http_client._make_request.return_value = execute_response with patch.object( - sea_client, "get_execution_result", return_value="mock_result_set" + sea_client, "_response_to_result_set", return_value="mock_result_set" ) as mock_get_result: result = sea_client.execute_command( operation="SELECT 1", @@ -240,9 +240,6 @@ def test_command_execution_sync( enforce_embedded_schema_correctness=False, ) assert result == "mock_result_set" - cmd_id_arg = mock_get_result.call_args[0][0] - assert isinstance(cmd_id_arg, CommandId) - assert cmd_id_arg.guid == "test-statement-123" # Test with invalid session ID with pytest.raises(ValueError) as excinfo: @@ -357,7 +354,7 @@ def test_command_execution_advanced( mock_http_client._make_request.return_value = execute_response param = {"name": "param1", "value": "value1", "type": "STRING"} - with patch.object(sea_client, "get_execution_result"): + with patch.object(sea_client, "_response_to_result_set"): sea_client.execute_command( operation="SELECT * FROM table WHERE col = :param1", session_id=sea_session_id, From 8374a7500f0e4853411b20b5a75f6be21d8c4c76 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Thu, 10 Jul 2025 07:33:12 +0530 Subject: [PATCH 2/5] reduce additional network call after wait Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/sea/backend.py | 63 ++++++++++------------- tests/unit/test_sea_backend.py | 2 +- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index 5f6c6b12d..aedd40798 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -398,7 +398,7 @@ def _check_command_not_in_failed_or_closed_state( def _wait_until_command_done( self, response: ExecuteStatementResponse - ) -> CommandState: + ) -> ExecuteStatementResponse: """ Wait until a command is done. """ @@ -408,11 +408,12 @@ def _wait_until_command_done( while state in [CommandState.PENDING, CommandState.RUNNING]: time.sleep(self.POLL_INTERVAL_SECONDS) - state = self.get_query_state(command_id) + response = self._poll_query(command_id) + state = response.status.state self._check_command_not_in_failed_or_closed_state(state, command_id) - return state + return response def execute_command( self, @@ -516,8 +517,8 @@ def execute_command( # if the response succeeded within the wait_timeout, return the results immediately return self._response_to_result_set(response, cursor) - self._wait_until_command_done(response) - return self.get_execution_result(command_id, cursor) + response = self._wait_until_command_done(response) + return self._response_to_result_set(response, cursor) def cancel_command(self, command_id: CommandId) -> None: """ @@ -569,18 +570,9 @@ def close_command(self, command_id: CommandId) -> None: data=request.to_dict(), ) - def get_query_state(self, command_id: CommandId) -> CommandState: + def _poll_query(self, command_id: CommandId) -> ExecuteStatementResponse: """ - Get the state of a running query. - - Args: - command_id: Command identifier - - Returns: - CommandState: The current state of the command - - Raises: - ProgrammingError: If the command ID is invalid + Poll for the current command info. """ if command_id.backend_type != BackendType.SEA: @@ -596,9 +588,25 @@ def get_query_state(self, command_id: CommandId) -> CommandState: path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id), data=request.to_dict(), ) - - # Parse the response response = ExecuteStatementResponse.from_dict(response_data) + + return response + + def get_query_state(self, command_id: CommandId) -> CommandState: + """ + Get the state of a running query. + + Args: + command_id: Command identifier + + Returns: + CommandState: The current state of the command + + Raises: + ProgrammingError: If the command ID is invalid + """ + + response = self._poll_query(command_id) return response.status.state def get_execution_result( @@ -620,24 +628,7 @@ def get_execution_result( ValueError: If the command ID is invalid """ - if command_id.backend_type != BackendType.SEA: - raise ValueError("Not a valid SEA command ID") - - sea_statement_id = command_id.to_sea_statement_id() - if sea_statement_id is None: - raise ValueError("Not a valid SEA command ID") - - # Create the request model - request = GetStatementRequest(statement_id=sea_statement_id) - - # Get the statement result - response_data = self.http_client._make_request( - method="GET", - path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id), - data=request.to_dict(), - ) - response = ExecuteStatementResponse.from_dict(response_data) - + response = self._poll_query(command_id) return self._response_to_result_set(response, cursor) # == Metadata Operations == diff --git a/tests/unit/test_sea_backend.py b/tests/unit/test_sea_backend.py index f68f2d88b..0724f6016 100644 --- a/tests/unit/test_sea_backend.py +++ b/tests/unit/test_sea_backend.py @@ -327,7 +327,7 @@ def test_command_execution_advanced( mock_http_client._make_request.side_effect = [initial_response, poll_response] with patch.object( - sea_client, "get_execution_result", return_value="mock_result_set" + sea_client, "_response_to_result_set", return_value="mock_result_set" ) as mock_get_result: with patch("time.sleep"): result = sea_client.execute_command( From 70e22f8888b6475e5d8f8e036cb9ad9f6cf9180c Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Sat, 19 Jul 2025 11:31:01 +0530 Subject: [PATCH 3/5] re-introduce GetStatementResponse Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/sea/backend.py | 34 +++++++++++-------- .../sql/backend/sea/models/__init__.py | 2 ++ .../sql/backend/sea/models/responses.py | 20 +++++++++++ 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index 7ecb2a56a..6ea3ff69f 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set from databricks.sql.backend.sea.models.base import ResultManifest +from databricks.sql.backend.sea.models.responses import GetStatementResponse from databricks.sql.backend.sea.utils.constants import ( ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP, ResultFormat, @@ -323,7 +324,7 @@ def _extract_description_from_manifest( return columns def _results_message_to_execute_response( - self, response: ExecuteStatementResponse + self, response: Union[ExecuteStatementResponse, GetStatementResponse] ) -> ExecuteResponse: """ Convert a SEA response to an ExecuteResponse and extract result data. @@ -358,7 +359,9 @@ def _results_message_to_execute_response( return execute_response def _response_to_result_set( - self, response: ExecuteStatementResponse, cursor: Cursor + self, + response: Union[ExecuteStatementResponse, GetStatementResponse], + cursor: Cursor, ) -> SeaResultSet: """ Convert a SEA response to a SeaResultSet. @@ -399,22 +402,24 @@ def _check_command_not_in_failed_or_closed_state( def _wait_until_command_done( self, response: ExecuteStatementResponse - ) -> ExecuteStatementResponse: + ) -> Union[ExecuteStatementResponse, GetStatementResponse]: """ Wait until a command is done. """ - state = response.status.state - command_id = CommandId.from_sea_statement_id(response.statement_id) + final_response: Union[ExecuteStatementResponse, GetStatementResponse] = response + + state = final_response.status.state + command_id = CommandId.from_sea_statement_id(final_response.statement_id) while state in [CommandState.PENDING, CommandState.RUNNING]: time.sleep(self.POLL_INTERVAL_SECONDS) - response = self._poll_query(command_id) - state = response.status.state + final_response = self._poll_query(command_id) + state = final_response.status.state self._check_command_not_in_failed_or_closed_state(state, command_id) - return response + return final_response def execute_command( self, @@ -516,12 +521,11 @@ def execute_command( if async_op: return None - if response.status.state == CommandState.SUCCEEDED: - # if the response succeeded within the wait_timeout, return the results immediately - return self._response_to_result_set(response, cursor) + final_response: Union[ExecuteStatementResponse, GetStatementResponse] = response + if response.status.state != CommandState.SUCCEEDED: + final_response = self._wait_until_command_done(response) - response = self._wait_until_command_done(response) - return self._response_to_result_set(response, cursor) + return self._response_to_result_set(final_response, cursor) def cancel_command(self, command_id: CommandId) -> None: """ @@ -573,7 +577,7 @@ def close_command(self, command_id: CommandId) -> None: data=request.to_dict(), ) - def _poll_query(self, command_id: CommandId) -> ExecuteStatementResponse: + def _poll_query(self, command_id: CommandId) -> GetStatementResponse: """ Poll for the current command info. """ @@ -591,7 +595,7 @@ def _poll_query(self, command_id: CommandId) -> ExecuteStatementResponse: path=self.STATEMENT_PATH_WITH_ID.format(sea_statement_id), data=request.to_dict(), ) - response = ExecuteStatementResponse.from_dict(response_data) + response = GetStatementResponse.from_dict(response_data) return response diff --git a/src/databricks/sql/backend/sea/models/__init__.py b/src/databricks/sql/backend/sea/models/__init__.py index e591f0fd4..b7c8bd399 100644 --- a/src/databricks/sql/backend/sea/models/__init__.py +++ b/src/databricks/sql/backend/sea/models/__init__.py @@ -25,6 +25,7 @@ from databricks.sql.backend.sea.models.responses import ( ExecuteStatementResponse, + GetStatementResponse, CreateSessionResponse, ) @@ -46,5 +47,6 @@ "DeleteSessionRequest", # Response models "ExecuteStatementResponse", + "GetStatementResponse", "CreateSessionResponse", ] diff --git a/src/databricks/sql/backend/sea/models/responses.py b/src/databricks/sql/backend/sea/models/responses.py index 01526e3a1..302b32d0c 100644 --- a/src/databricks/sql/backend/sea/models/responses.py +++ b/src/databricks/sql/backend/sea/models/responses.py @@ -124,6 +124,26 @@ def from_dict(cls, data: Dict[str, Any]) -> "ExecuteStatementResponse": ) +@dataclass +class GetStatementResponse: + """Representation of the response from getting information about a statement.""" + + statement_id: str + status: StatementStatus + manifest: ResultManifest + result: ResultData + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "GetStatementResponse": + """Create a GetStatementResponse from a dictionary.""" + return cls( + statement_id=data.get("statement_id", ""), + status=_parse_status(data), + manifest=_parse_manifest(data), + result=_parse_result(data), + ) + + @dataclass class CreateSessionResponse: """Representation of the response from creating a new session.""" From 46650836bbaad47af2cc891c519468ffe56b8198 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Sat, 19 Jul 2025 11:38:28 +0530 Subject: [PATCH 4/5] remove need for lazy load of SeaResultSet Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/sea/backend.py | 6 ++---- src/databricks/sql/backend/sea/result_set.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index 5a4f50a83..df7dcfbc2 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -19,7 +19,8 @@ if TYPE_CHECKING: from databricks.sql.client import Cursor - from databricks.sql.backend.sea.result_set import SeaResultSet + +from databricks.sql.backend.sea.result_set import SeaResultSet from databricks.sql.backend.databricks_client import DatabricksClient from databricks.sql.backend.types import ( @@ -375,9 +376,6 @@ def _response_to_result_set( Convert a SEA response to a SeaResultSet. """ - # Create and return a SeaResultSet - from databricks.sql.backend.sea.result_set import SeaResultSet - execute_response = self._results_message_to_execute_response(response) return SeaResultSet( diff --git a/src/databricks/sql/backend/sea/result_set.py b/src/databricks/sql/backend/sea/result_set.py index b67fc74d4..a6a0a298b 100644 --- a/src/databricks/sql/backend/sea/result_set.py +++ b/src/databricks/sql/backend/sea/result_set.py @@ -4,7 +4,6 @@ import logging -from databricks.sql.backend.sea.backend import SeaDatabricksClient from databricks.sql.backend.sea.models.base import ResultData, ResultManifest from databricks.sql.backend.sea.utils.conversion import SqlTypeConverter @@ -15,6 +14,7 @@ if TYPE_CHECKING: from databricks.sql.client import Connection + from databricks.sql.backend.sea.backend import SeaDatabricksClient from databricks.sql.types import Row from databricks.sql.backend.sea.queue import JsonQueue, SeaResultSetQueueFactory from databricks.sql.backend.types import ExecuteResponse From 137c4f4ef79c2b743ed90a29087ec095631d2289 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Sat, 19 Jul 2025 12:40:23 +0530 Subject: [PATCH 5/5] re-organise GetStatementResponse import Signed-off-by: varun-edachali-dbx --- src/databricks/sql/backend/sea/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index df7dcfbc2..42677b903 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -6,7 +6,6 @@ from typing import Any, Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set from databricks.sql.backend.sea.models.base import ExternalLink, ResultManifest -from databricks.sql.backend.sea.models.responses import GetStatementResponse from databricks.sql.backend.sea.utils.constants import ( ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP, ResultFormat, @@ -43,6 +42,7 @@ DeleteSessionRequest, StatementParameter, ExecuteStatementResponse, + GetStatementResponse, CreateSessionResponse, ) from databricks.sql.backend.sea.models.responses import GetChunksResponse