@@ -60,13 +60,15 @@ void JsonRpcConnection::Start()
6060
6161void JsonRpcConnection::HandleIncomingMessages (boost::asio::yield_context yc)
6262{
63+ namespace ch = std::chrono;
64+
6365 m_Stream->next_layer ().SetSeen (&m_Seen);
6466
6567 for (;;) {
66- String message ;
68+ String jsonString ;
6769
6870 try {
69- message = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
71+ jsonString = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
7072 } catch (const std::exception& ex) {
7173 Log (m_ShuttingDown ? LogDebug : LogNotice, " JsonRpcConnection" )
7274 << " Error while reading JSON-RPC message for identity '" << m_Identity
@@ -76,20 +78,55 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
7678 }
7779
7880 m_Seen = Utility::GetTime ();
81+ if (m_Endpoint) {
82+ m_Endpoint->AddMessageReceived (jsonString.GetLength ());
83+ }
84+
85+ auto start (ch::steady_clock::now ());
86+ ch::steady_clock::duration cpuBoundDuration, totalDuration;
87+
88+ String rpcMethod (" UNKNOWN" );
89+ String diagnosticInfo; // Contains the diagnostic/debug information in case of an error.
7990
8091 try {
92+ Defer extractTotalDuration ([&start, &totalDuration]() {
93+ totalDuration = ch::steady_clock::now () - start;
94+ });
95+
8196 CpuBoundWork handleMessage (yc);
8297
98+ // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
99+ cpuBoundDuration = ch::steady_clock::now () - start;
100+
101+ Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
102+ if (auto method = message->Get (" method" ); !method.IsEmpty ()) {
103+ rpcMethod = method;
104+ }
105+
83106 MessageHandler (message);
84107
85108 l_TaskStats.InsertValue (Utility::GetTime (), 1 );
86109 } catch (const std::exception& ex) {
87- Log (m_ShuttingDown ? LogDebug : LogWarning, " JsonRpcConnection" )
88- << " Error while processing JSON-RPC message for identity '" << m_Identity
89- << " ': " << DiagnosticInformation (ex);
110+ diagnosticInfo = DiagnosticInformation (ex);
111+ }
90112
91- break ;
113+ auto severity = LogDebug;
114+ if (totalDuration >= ch::seconds (5 ) || (!m_ShuttingDown && !diagnosticInfo.IsEmpty ())) {
115+ // Either this RPC message took an unexpectedly long time to process, or a fatal error
116+ // has occurred, so promote the log entry from debug to warning.
117+ severity = LogWarning;
118+ }
119+
120+ Log statsLog (severity, " JsonRpcConnection" );
121+ statsLog << (!diagnosticInfo.IsEmpty () ? " Error while processing" : " Processing" ) << " JSON-RPC '"
122+ << rpcMethod << " ' message for identity '" << m_Identity << " ' " ;
123+
124+ if (cpuBoundDuration >= ch::seconds (1 )) {
125+ statsLog << " waited " << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count () << " ms on semaphore and " ;
92126 }
127+
128+ statsLog << " took total " << ch::duration_cast<ch::milliseconds>(totalDuration).count ()
129+ << " ms" << (diagnosticInfo.IsEmpty () ? " ." : " : " +diagnosticInfo);
93130 }
94131
95132 Disconnect ();
@@ -245,10 +282,8 @@ void JsonRpcConnection::Disconnect()
245282 });
246283}
247284
248- void JsonRpcConnection::MessageHandler (const String& jsonString )
285+ void JsonRpcConnection::MessageHandler (const Dictionary::Ptr& message )
249286{
250- Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
251-
252287 if (m_Endpoint && message->Contains (" ts" )) {
253288 double ts = message->Get (" ts" );
254289
@@ -267,8 +302,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
267302 origin->FromZone = m_Endpoint->GetZone ();
268303 else
269304 origin->FromZone = Zone::GetByName (message->Get (" originZone" ));
270-
271- m_Endpoint->AddMessageReceived (jsonString.GetLength ());
272305 }
273306
274307 Value vmethod;
0 commit comments