diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index e3f27ef8527..7200e4a9057 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -627,6 +627,47 @@ struct P2PNetworkTest { } } + ACTOR static Future run_impl_simple(P2PNetworkTest* self) { + state ActorCollection actors(false); + + self->startTime = now(); + + fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n", + self->listeners.size(), + self->remotes.size(), + self->connectionsOut); + + for (auto n : self->remotes) { + printf("Remote: %s\n", n.toString().c_str()); + } + + for (auto el : self->listeners) { + printf("Listener: %s\n", el->getListenAddress().toString().c_str()); + } + + if (!self->listeners.empty()) { + state Reference conn1 = wait(self->listeners[0]->accept()); + printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str()); + try { + wait(conn1->acceptHandshake()); + printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str()); + } catch (Error& e) { + printf("Server: handshake error %s\n", e.what()); + } + threadSleep(11.0); + return Void(); + } + + if (!self->remotes.empty()) { + state Reference conn2 = wait(INetworkConnections::net()->connect(self->remotes[0])); + printf("Client: connected to %s\n", self->remotes[0].toString().c_str()); + wait(conn2->connectHandshake()); + printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str()); + } + + return Void(); + } + Future run() { return run_impl(this); } }; diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 8cf956eaa55..20d31ea6c91 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -137,6 +137,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( TLS_HANDSHAKE_LIMIT, 1000 ); init( DISABLE_MAINTHREAD_TLS_HANDSHAKE, false ); init( TLS_HANDSHAKE_FLOWLOCK_PRIORITY, static_cast(TaskPriority::DefaultYield) ); + init( TLS_HANDSHAKE_TIMEOUT_SECONDS, 0 ); // 0 -> no timeout init( NETWORK_TEST_CLIENT_COUNT, 30 ); init( NETWORK_TEST_REPLY_SIZE, 600e3 ); init( NETWORK_TEST_REQUEST_COUNT, 0 ); // 0 -> run forever diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 9220a753439..d0f6ccf8be7 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -831,11 +831,14 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; } + void setDebugId(const UID& id) { debugId = id; } + ThreadReturnPromise done; ssl_socket& socket; ssl_socket::handshake_type type; boost::system::error_code err; NetworkAddress peerAddr; + UID debugId; }; void action(Handshake& h) { @@ -850,7 +853,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { if (h.err.failed()) { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit - : "N2_AcceptHandshakeError"_audit) + : "N2_AcceptHandshakeError"_audit, + h.debugId) .detail("PeerAddr", h.getPeerAddress()) .detail("PeerAddress", h.getPeerAddress()) .detail("PeerEndPoint", h.getPeerEndPointAddress()) @@ -864,7 +868,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { } catch (...) { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeUnknownError"_audit - : "N2_AcceptHandshakeUnknownError"_audit) + : "N2_AcceptHandshakeUnknownError"_audit, + h.debugId) .detail("PeerAddr", h.getPeerAddress()) .detail("PeerAddress", h.getPeerAddress()) .detail("PeerEndPoint", h.getPeerEndPointAddress()) @@ -941,8 +946,27 @@ class SSLConnection final : public IConnection, ReferenceCounted init(); } + void setHandshakeTimeout(double seconds, const UID& debugId) { + timeval timeout; + timeout.tv_sec = seconds; + timeout.tv_usec = 0; + int nativeSock = ssl_sock.next_layer().native_handle(); + if (closed || !ssl_sock.next_layer().is_open() || nativeSock < 0) { + TraceEvent(SevWarn, "N2_SetSSLSocketTimeoutError", debugId) + .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) + .detail("Closed", closed) + .detail("NativeSock", nativeSock) + .detail("Message", "Invalid native socket handle"); + return; + } + setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + } + ACTOR static void doAcceptHandshake(Reference self, Promise connected) { state Hold holder; + state bool hasSetHandshakeTimeout = false; try { Future onHandshook; @@ -959,6 +983,14 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server); handshake->setPeerAddr(self->getPeerAddress()); + handshake->setDebugId(self->id); + if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { + double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + self->setHandshakeTimeout(timeoutSecond, self->id); + hasSetHandshakeTimeout = true; + // Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown + } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); } else { @@ -970,6 +1002,9 @@ class SSLConnection final : public IConnection, ReferenceCounted self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p)); } wait(onHandshook); + if (hasSetHandshakeTimeout) { + self->setHandshakeTimeout(0, self->id); // reset + } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); } catch (...) { @@ -1012,6 +1047,9 @@ class SSLConnection final : public IConnection, ReferenceCounted } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { g_net2->countServerTLSHandshakesTimedout++; + TraceEvent("N2_AcceptHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -1035,6 +1073,7 @@ class SSLConnection final : public IConnection, ReferenceCounted ACTOR static void doConnectHandshake(Reference self, Promise connected) { state Hold holder; + state bool hasSetHandshakeTimeout = false; try { Future onHandshook; @@ -1058,6 +1097,14 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client); handshake->setPeerAddr(self->getPeerAddress()); + handshake->setDebugId(self->id); + if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { + double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + self->setHandshakeTimeout(timeoutSecond, self->id); + hasSetHandshakeTimeout = true; + // Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown + } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); } else { @@ -1069,6 +1116,9 @@ class SSLConnection final : public IConnection, ReferenceCounted self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p)); } wait(onHandshook); + if (hasSetHandshakeTimeout) { + self->setHandshakeTimeout(0, self->id); // reset + } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); } catch (...) { @@ -1092,6 +1142,9 @@ class SSLConnection final : public IConnection, ReferenceCounted } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { g_net2->countClientTLSHandshakesTimedout++; + TraceEvent("N2_ConnectHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -1210,6 +1263,7 @@ class SSLConnection final : public IConnection, ReferenceCounted NetworkAddress peer_address; Reference> sslContext; bool has_trusted_peer; + bool closed = false; void init() { // Socket settings that have to be set after connect or accept succeeds @@ -1225,6 +1279,7 @@ class SSLConnection final : public IConnection, ReferenceCounted socket.close(closeError); boost::system::error_code shutdownError; ssl_sock.shutdown(shutdownError); + closed = true; } void onReadError(const boost::system::error_code& error) { @@ -2323,7 +2378,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") { return Void(); } -void net2_test(){ +void net2_test() { /* g_network = newNet2(); // for promise serialization below diff --git a/flow/include/flow/Knobs.h b/flow/include/flow/Knobs.h index 2043a5acc9d..c6c789d32ce 100644 --- a/flow/include/flow/Knobs.h +++ b/flow/include/flow/Knobs.h @@ -203,6 +203,7 @@ class FlowKnobs : public KnobsImpl { int TLS_HANDSHAKE_LIMIT; bool DISABLE_MAINTHREAD_TLS_HANDSHAKE; int TLS_HANDSHAKE_FLOWLOCK_PRIORITY; + double TLS_HANDSHAKE_TIMEOUT_SECONDS; int NETWORK_TEST_CLIENT_COUNT; int NETWORK_TEST_REPLY_SIZE;