Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions fdbserver/networktest.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,47 @@ struct P2PNetworkTest {
}
}

ACTOR static Future<Void> run_impl_simple(P2PNetworkTest* self) {
state ActorCollection actors(false);

self->startTime = now();

fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n",
self->listeners.size(),
self->remotes.size(),
self->connectionsOut);

for (auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str());
}

for (auto el : self->listeners) {
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
}

if (!self->listeners.empty()) {
state Reference<IConnection> conn1 = wait(self->listeners[0]->accept());
printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str());
try {
wait(conn1->acceptHandshake());
printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str());
} catch (Error& e) {
printf("Server: handshake error %s\n", e.what());
}
threadSleep(11.0);
return Void();
}

if (!self->remotes.empty()) {
state Reference<IConnection> conn2 = wait(INetworkConnections::net()->connect(self->remotes[0]));
printf("Client: connected to %s\n", self->remotes[0].toString().c_str());
wait(conn2->connectHandshake());
printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str());
}

return Void();
}

Future<Void> run() { return run_impl(this); }
};

Expand Down
1 change: 1 addition & 0 deletions flow/Knobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( TLS_HANDSHAKE_LIMIT, 1000 );
init( DISABLE_MAINTHREAD_TLS_HANDSHAKE, false );
init( TLS_HANDSHAKE_FLOWLOCK_PRIORITY, static_cast<int>(TaskPriority::DefaultYield) );
init( TLS_HANDSHAKE_TIMEOUT_SECONDS, 0 ); // 0 -> no timeout
init( NETWORK_TEST_CLIENT_COUNT, 30 );
init( NETWORK_TEST_REPLY_SIZE, 600e3 );
init( NETWORK_TEST_REQUEST_COUNT, 0 ); // 0 -> run forever
Expand Down
61 changes: 58 additions & 3 deletions flow/Net2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,14 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {

void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; }

void setDebugId(const UID& id) { debugId = id; }

ThreadReturnPromise<Void> done;
ssl_socket& socket;
ssl_socket::handshake_type type;
boost::system::error_code err;
NetworkAddress peerAddr;
UID debugId;
};

void action(Handshake& h) {
Expand All @@ -850,7 +853,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
if (h.err.failed()) {
TraceEvent(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit
: "N2_AcceptHandshakeError"_audit)
: "N2_AcceptHandshakeError"_audit,
h.debugId)
.detail("PeerAddr", h.getPeerAddress())
.detail("PeerAddress", h.getPeerAddress())
.detail("PeerEndPoint", h.getPeerEndPointAddress())
Expand All @@ -864,7 +868,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
} catch (...) {
TraceEvent(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeUnknownError"_audit
: "N2_AcceptHandshakeUnknownError"_audit)
: "N2_AcceptHandshakeUnknownError"_audit,
h.debugId)
.detail("PeerAddr", h.getPeerAddress())
.detail("PeerAddress", h.getPeerAddress())
.detail("PeerEndPoint", h.getPeerEndPointAddress())
Expand Down Expand Up @@ -941,8 +946,27 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
init();
}

void setHandshakeTimeout(double seconds, const UID& debugId) {
timeval timeout;
timeout.tv_sec = seconds;
timeout.tv_usec = 0;
int nativeSock = ssl_sock.next_layer().native_handle();
if (closed || !ssl_sock.next_layer().is_open() || nativeSock < 0) {
TraceEvent(SevWarn, "N2_SetSSLSocketTimeoutError", debugId)
.detail("PeerAddr", peer_address)
.detail("PeerAddress", peer_address)
.detail("Closed", closed)
.detail("NativeSock", nativeSock)
.detail("Message", "Invalid native socket handle");
return;
}
setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
}

ACTOR static void doAcceptHandshake(Reference<SSLConnection> self, Promise<Void> connected) {
state Hold<int> holder;
state bool hasSetHandshakeTimeout = false;

try {
Future<Void> onHandshook;
Expand All @@ -959,6 +983,14 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
auto handshake =
new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server);
handshake->setPeerAddr(self->getPeerAddress());
handshake->setDebugId(self->id);
if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) {
double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS,
FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5);
self->setHandshakeTimeout(timeoutSecond, self->id);
hasSetHandshakeTimeout = true;
// Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown
}
onHandshook = handshake->done.getFuture();
N2::g_net2->sslHandshakerPool->post(handshake);
} else {
Expand All @@ -970,6 +1002,9 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p));
}
wait(onHandshook);
if (hasSetHandshakeTimeout) {
self->setHandshakeTimeout(0, self->id); // reset
}
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
Expand Down Expand Up @@ -1012,6 +1047,9 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
g_net2->countServerTLSHandshakesTimedout++;
TraceEvent("N2_AcceptHandshakeTimeout", self->id)
.suppressFor(1.0)
.detail("PeerAddress", self->getPeerAddress());
throw connection_failed();
}
}
Expand All @@ -1035,6 +1073,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

ACTOR static void doConnectHandshake(Reference<SSLConnection> self, Promise<Void> connected) {
state Hold<int> holder;
state bool hasSetHandshakeTimeout = false;

try {
Future<Void> onHandshook;
Expand All @@ -1058,6 +1097,14 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
auto handshake =
new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client);
handshake->setPeerAddr(self->getPeerAddress());
handshake->setDebugId(self->id);
if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) {
double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS,
FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5);
self->setHandshakeTimeout(timeoutSecond, self->id);
hasSetHandshakeTimeout = true;
// Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown
}
onHandshook = handshake->done.getFuture();
N2::g_net2->sslHandshakerPool->post(handshake);
} else {
Expand All @@ -1069,6 +1116,9 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
}
wait(onHandshook);
if (hasSetHandshakeTimeout) {
self->setHandshakeTimeout(0, self->id); // reset
}
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
Expand All @@ -1092,6 +1142,9 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
g_net2->countClientTLSHandshakesTimedout++;
TraceEvent("N2_ConnectHandshakeTimeout", self->id)
.suppressFor(1.0)
.detail("PeerAddress", self->getPeerAddress());
throw connection_failed();
}
}
Expand Down Expand Up @@ -1210,6 +1263,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
NetworkAddress peer_address;
Reference<ReferencedObject<boost::asio::ssl::context>> sslContext;
bool has_trusted_peer;
bool closed = false;

void init() {
// Socket settings that have to be set after connect or accept succeeds
Expand All @@ -1225,6 +1279,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
socket.close(closeError);
boost::system::error_code shutdownError;
ssl_sock.shutdown(shutdownError);
closed = true;
}

void onReadError(const boost::system::error_code& error) {
Expand Down Expand Up @@ -2323,7 +2378,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
return Void();
}

void net2_test(){
void net2_test() {
/*
g_network = newNet2(); // for promise serialization below

Expand Down
1 change: 1 addition & 0 deletions flow/include/flow/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> {
int TLS_HANDSHAKE_LIMIT;
bool DISABLE_MAINTHREAD_TLS_HANDSHAKE;
int TLS_HANDSHAKE_FLOWLOCK_PRIORITY;
double TLS_HANDSHAKE_TIMEOUT_SECONDS;

int NETWORK_TEST_CLIENT_COUNT;
int NETWORK_TEST_REPLY_SIZE;
Expand Down