@@ -705,3 +705,142 @@ async def mock_server():
705705 await session .initialize ()
706706
707707 await session .call_tool (name = mocked_tool .name , arguments = {"foo" : "bar" }, meta = meta )
708+
709+
710+ @pytest .mark .anyio
711+ async def test_default_message_handler_logs_exceptions (caplog : pytest .LogCaptureFixture ):
712+ """Test that the default message handler logs exceptions instead of silently swallowing them.
713+
714+ When an exception (e.g. a transport error) is delivered through the read stream,
715+ the default handler should log it at WARNING level so the error is observable.
716+ Previously, exceptions were silently discarded, making transport failures
717+ impossible to diagnose.
718+ """
719+ client_to_server_send , client_to_server_receive = anyio .create_memory_object_stream [SessionMessage ](1 )
720+ server_to_client_send , server_to_client_receive = anyio .create_memory_object_stream [SessionMessage | Exception ](1 )
721+
722+ async def mock_server ():
723+ # Receive the initialization request
724+ session_message = await client_to_server_receive .receive ()
725+ jsonrpc_request = session_message .message
726+ assert isinstance (jsonrpc_request , JSONRPCRequest )
727+
728+ result = InitializeResult (
729+ protocol_version = LATEST_PROTOCOL_VERSION ,
730+ capabilities = ServerCapabilities (),
731+ server_info = Implementation (name = "mock-server" , version = "0.1.0" ),
732+ )
733+
734+ # Send init response
735+ await server_to_client_send .send (
736+ SessionMessage (
737+ JSONRPCResponse (
738+ jsonrpc = "2.0" ,
739+ id = jsonrpc_request .id ,
740+ result = result .model_dump (by_alias = True , mode = "json" , exclude_none = True ),
741+ )
742+ )
743+ )
744+
745+ # Receive initialized notification
746+ await client_to_server_receive .receive ()
747+
748+ # Inject an exception into the read stream (simulating a transport error)
749+ await server_to_client_send .send (RuntimeError ("SSE stream read timeout" ))
750+
751+ # Close the stream so the session can exit cleanly
752+ await server_to_client_send .aclose ()
753+
754+ async with (
755+ ClientSession (
756+ server_to_client_receive ,
757+ client_to_server_send ,
758+ # Use the default message_handler (no override)
759+ ) as session ,
760+ anyio .create_task_group () as tg ,
761+ client_to_server_send ,
762+ client_to_server_receive ,
763+ server_to_client_send ,
764+ server_to_client_receive ,
765+ ):
766+ tg .start_soon (mock_server )
767+ await session .initialize ()
768+
769+ # Wait for the receive loop to process the exception
770+ await anyio .sleep (0.1 )
771+
772+ # Verify the exception was logged instead of silently swallowed
773+ warning_records = [r for r in caplog .records if "Unhandled exception in message handler" in r .message ]
774+ assert len (warning_records ) >= 1
775+ # The exception details are attached via exc_info, visible in the formatted output
776+ assert warning_records [0 ].exc_info is not None
777+ assert warning_records [0 ].exc_info [1 ] is not None
778+ assert "SSE stream read timeout" in str (warning_records [0 ].exc_info [1 ])
779+
780+
781+ @pytest .mark .anyio
782+ async def test_custom_message_handler_can_suppress_exceptions ():
783+ """Test that a custom message handler can suppress exceptions if desired."""
784+ client_to_server_send , client_to_server_receive = anyio .create_memory_object_stream [SessionMessage ](1 )
785+ server_to_client_send , server_to_client_receive = anyio .create_memory_object_stream [SessionMessage | Exception ](1 )
786+
787+ suppressed_exceptions : list [Exception ] = []
788+
789+ async def suppressing_handler (
790+ message : RequestResponder [types .ServerRequest , types .ClientResult ] | types .ServerNotification | Exception ,
791+ ) -> None :
792+ if isinstance (message , Exception ):
793+ suppressed_exceptions .append (message )
794+ # Intentionally NOT re-raising — old silent behavior
795+
796+ async def mock_server ():
797+ # Receive the initialization request
798+ session_message = await client_to_server_receive .receive ()
799+ jsonrpc_request = session_message .message
800+ assert isinstance (jsonrpc_request , JSONRPCRequest )
801+
802+ result = InitializeResult (
803+ protocol_version = LATEST_PROTOCOL_VERSION ,
804+ capabilities = ServerCapabilities (),
805+ server_info = Implementation (name = "mock-server" , version = "0.1.0" ),
806+ )
807+
808+ # Send init response
809+ await server_to_client_send .send (
810+ SessionMessage (
811+ JSONRPCResponse (
812+ jsonrpc = "2.0" ,
813+ id = jsonrpc_request .id ,
814+ result = result .model_dump (by_alias = True , mode = "json" , exclude_none = True ),
815+ )
816+ )
817+ )
818+
819+ # Receive initialized notification
820+ await client_to_server_receive .receive ()
821+
822+ # Inject an exception, then close the stream
823+ await server_to_client_send .send (RuntimeError ("transport error" ))
824+ await server_to_client_send .aclose ()
825+
826+ async with (
827+ ClientSession (
828+ server_to_client_receive ,
829+ client_to_server_send ,
830+ message_handler = suppressing_handler ,
831+ ) as session ,
832+ anyio .create_task_group () as tg ,
833+ client_to_server_send ,
834+ client_to_server_receive ,
835+ server_to_client_send ,
836+ server_to_client_receive ,
837+ ):
838+ tg .start_soon (mock_server )
839+ await session .initialize ()
840+
841+ # Give the receive loop time to process the exception
842+ await anyio .sleep (0.1 )
843+
844+ # The custom handler captured the exception instead of crashing
845+ assert len (suppressed_exceptions ) == 1
846+ assert str (suppressed_exceptions [0 ]) == "transport error"
0 commit comments