Skip to content

Commit

Permalink
Support for more friendly distributed network connections
Browse files Browse the repository at this point in the history
  • Loading branch information
nlighting authored and zichao.zhang committed Jul 13, 2023
1 parent 5b4833b commit 959d86f
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions src/net/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,37 @@ seastar::future<> connection_manager::clients_side::start(connection_table& conn
auto num_machs = static_cast<uint32_t>(server_list.size());
return seastar::parallel_for_each(boost::irange(0u, num_machs),[&conn_table, &server_list](unsigned id) mutable {
auto timeout_point = seastar::timer<>::clock::now() + std::chrono::seconds(network_config::get().timeout_seconds());
return seastar::with_timeout(timeout_point, seastar::connect(server_list[id].first)
).then_wrapped([id, &conn_table](seastar::future<seastar::connected_socket> fut) {
auto& sv_list = network_config::get().get_server_list(local_shard_id());
uint32_t target_mid = sv_list[id].second;
try {
conn_table.add(std::make_pair(target_mid, connection{fut.get0()}));
auto& out = conn_table.find(target_mid).out;
auto this_mid = network_config::get().machine_id();
return out.write(reinterpret_cast<const char*>(&this_mid), 4).then([&out] {
return out.flush();
uint32_t target_mid = server_list[id].second;
return seastar::with_timeout(
timeout_point,
seastar::repeat([&conn_table, &server_list, id, target_mid] {
return seastar::connect(server_list[id].first
).then_wrapped([&conn_table, target_mid] (seastar::future<seastar::connected_socket> fut) {
if (fut.failed()) {
fut.ignore_ready_future();
return seastar::sleep(std::chrono::seconds(1)).then([] {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::no);
});
}
conn_table.add(std::make_pair(target_mid, connection{fut.get0()}));
auto& out = conn_table.find(target_mid).out;
auto this_mid = network_config::get().machine_id();
return out.write(reinterpret_cast<const char*>(&this_mid), 4).then([&out] {
return out.flush();
}).then([] {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes);
});
});
} catch (...) {
fmt::print("[ Connection Failed ] Cannot connect to server {} on machine {}.\n", sv_list[id].first, target_mid);
})
).then_wrapped([&server_list, id, target_mid] (seastar::future<> fut) {
try {
fut.get0();
}
catch (...) {
fmt::print("[ Connection Failed ] Cannot connect to server {} on machine {}.\n", server_list[id].first, target_mid);
return seastar::make_exception_future<>(std::current_exception());
}
return seastar::make_ready_future<>();
});
});
}
Expand Down

0 comments on commit 959d86f

Please sign in to comment.