-
Notifications
You must be signed in to change notification settings - Fork 12
/
multiclientRecvVsPoll.cpp
179 lines (169 loc) · 8.58 KB
/
multiclientRecvVsPoll.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include "include/MulticlientRDMADistinctMrTransport.h"
#include "include/MulticlientRDMARecvTransport.h"
#include "include/MulticlientRDMATransport.h"
#include "include/RdmaTransport.h"
#include "util/Random32.h"
#include "util/bench.h"
#include "util/doNotOptimize.h"
#include <deque>
#include <future>
#include <thread>
#include <vector>
using namespace l5::transport;
using namespace std::chrono_literals;
static constexpr uint16_t port = 1234;
static std::string_view ip = "127.0.0.1";
template <typename Container, typename Size, typename... Args, typename Initializer, typename = std::enable_if_t<std::is_same_v<std::invoke_result_t<Initializer, std::remove_pointer_t<typename Container::value_type::pointer>&>, void>>>
void emplace_initialize_n(Container& container, Size n, Args&&... args, Initializer&& init) {
container.reserve(n);
for (Size i = 0; i < n; ++i) {
init(*container.emplace_back(std::make_unique<typename std::remove_pointer_t<typename Container::value_type::pointer>>(std::forward<Args>(args)...)));
}
}
template <typename Server, typename Client>
void doRun(bool isClient, const std::string& connection, size_t concurrentInFlight, const std::string& method) {
static constexpr auto numMessages = size_t(1e6);
for (size_t run = 0; run < 5;) {
try {
if (isClient) {
auto rand = Random32();
auto msgs = std::vector<uint32_t>();
msgs.reserve(numMessages);
std::generate_n(std::back_inserter(msgs), numMessages, [&] { return rand.next(); });
auto numThreads = std::min(concurrentInFlight, size_t(std::thread::hardware_concurrency()));
auto concurrentPerThread = concurrentInFlight / numThreads;
auto messagesPerThread = numMessages / numThreads;
auto threads = std::vector<std::thread>();
threads.reserve(numThreads);
auto futures = std::vector<std::future<void>>();
futures.reserve(numThreads);
auto connected = std::atomic<size_t>(0);
for (size_t threadId = 0; threadId < numThreads; ++threadId) {
bool needsExtraConcurrent = (concurrentInFlight % numThreads) > threadId;
bool needsExtraMessage = (numMessages % numThreads) > threadId;
auto task = std::packaged_task<void()>(
[thisThreadConcurrent = concurrentPerThread + needsExtraConcurrent, thisThreadMessages = messagesPerThread + needsExtraMessage, &msgs, &connection, &connected, &concurrentInFlight] {
auto clients = std::vector<std::unique_ptr<Client>>();
emplace_initialize_n(clients, thisThreadConcurrent, [&](Client& client) {
for (int i = 0;; ++i) {
try {
client.connect(connection);
break;
} catch (...) {
if (i > 1000) throw;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
++connected;
});
// sync all clients, so we start after the server finished accepting
while (connected != concurrentInFlight)
;
auto inFlight = std::deque<std::tuple<Client&, uint32_t>>();
size_t done = 0;
for (size_t i = 0; i < thisThreadMessages; ++i) {
if (i >= thisThreadConcurrent) {
uint32_t response = 0;
auto [finClient, expected] = inFlight.front();
inFlight.pop_front();
finClient.read(response);
if (expected != response) throw std::runtime_error("unexpected value!");
++done;
}
auto current = i % thisThreadConcurrent;
auto value = msgs[i];
auto& client = *clients[current];
client.write(value);
inFlight.emplace_back(client, value);
}
for (; done < thisThreadMessages; ++done) {
uint32_t response = 0;
auto [finClient, expected] = inFlight.front();
inFlight.pop_front();
finClient.read(response);
if (expected != response) throw std::runtime_error("unexpected value!");
}
std::cout << "#" << std::flush;
});
futures.emplace_back(task.get_future());
threads.emplace_back(move(task));
}
auto timeout = false;
for (size_t i = 0; i < futures.size(); ++i) {
auto& future = futures[i];
if (future.wait_for(10s) == std::future_status::timeout) {
pthread_cancel(threads[i].native_handle());
timeout = true;
}
}
for (auto& thread : threads) { thread.join(); }
std::cout << std::endl;
if (timeout) {
throw std::runtime_error("run took longer than 10s");
}
} else { // server
std::cout << concurrentInFlight << method << std::flush;
auto task = std::packaged_task<void()>([&] {
auto server = Server(connection, (concurrentInFlight + 15u) & ~15u); // next multiple of 16
for (size_t i = 0; i < concurrentInFlight; ++i) { server.accept(); }
server.finishListen();
bench(numMessages, [&] {
for (size_t i = 0; i < numMessages; ++i) {
uint32_t message = {};
auto client = server.read(message);
server.write(client, message);
}
});
});
auto future = task.get_future();
auto thread = std::thread(move(task));
auto timeout = false;
if (future.wait_for(10s) == std::future_status::timeout) {
pthread_cancel(thread.native_handle());
timeout = true;
}
thread.join();
if (timeout) {
throw std::runtime_error("run took longer than 10s");
}
}
++run;
} catch (const std::runtime_error& e) {
std::cout << "error: " << e.what() << ", retrying..." << std::endl;
}
}
}
int main(int argc, char** argv) {
if (argc < 2) {
std::cout << "Usage: " << argv[0] << " <client / server> <(concurrent, optional)> <(IP, optional) 127.0.0.1>" << std::endl;
return -1;
}
const auto isClient = std::string_view(argv[1]) == "client";
auto concurrent = std::optional<size_t>();
if (argc >= 3) concurrent = atoi(argv[2]);
if (argc >= 4) ip = argv[3];
std::string connectionString;
if (isClient) {
connectionString = std::string(ip) + ":" + std::to_string(port);
} else {
connectionString = std::to_string(port);
}
if (!isClient) std::cout << "concurrent, method, messages, seconds, msgps, user, kernel, total\n";
if (concurrent) {
// MulticlientRDMADistinctMr -> Suitable for *few* clients (x < ???)
doRun<MulticlientRDMADistinctMrTransportServer, MulticlientRDMADistinctMrTransportClient>(isClient, connectionString, *concurrent, ", Direct, ");
// MulticlientRDMADoorbells -> Suitable for *most* clients (??? < x < 9)
doRun<MulticlientRDMATransportServer, MultiClientRDMATransportClient>(isClient, connectionString, *concurrent, ", Doorbells, ");
// MulticlientRDMARecv -> Suitable for *many* clients (9 < x)
doRun<MulticlientRDMARecvTransportServer, MulticlientRDMARecvTransportClient>(isClient, connectionString, *concurrent, ", Recv, ");
} else {
for (size_t i = 1; i < 50; ++i) {
// MulticlientRDMADistinctMr -> Suitable for *few* clients (x < ???)
doRun<MulticlientRDMADistinctMrTransportServer, MulticlientRDMADistinctMrTransportClient>(isClient, connectionString, i, ", Direct, ");
// MulticlientRDMADoorbells -> Suitable for *most* clients (??? < x < 9)
doRun<MulticlientRDMATransportServer, MultiClientRDMATransportClient>(isClient, connectionString, i, ", Doorbells, ");
// MulticlientRDMARecv -> Suitable for *many* clients (9 < x)
doRun<MulticlientRDMARecvTransportServer, MulticlientRDMARecvTransportClient>(isClient, connectionString, i, ", Recv, ");
}
}
}