Skip to content

Commit 94f3579

Browse files
committed
udp: enforce single receiver at a time
1 parent 83f767c commit 94f3579

File tree

3 files changed

+58
-9
lines changed

3 files changed

+58
-9
lines changed

test/udp_test.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,37 @@ TEST(UdpTest, testBroadcast) {
252252
run_loop(setup);
253253
}
254254

255+
TEST(UdpTest, simultaneousReceiveDies) {
256+
auto setup = [&](const Loop &loop) -> uvco::Promise<void> {
257+
Udp server{loop};
258+
co_await server.bind("::1", 9999, 0);
259+
260+
MultiPromise<std::pair<std::string, AddressHandle>> packets =
261+
server.receiveMany();
262+
263+
EXPECT_DEATH(
264+
{ auto packets2 = server.receiveMany(); }, "== nullptr' failed");
265+
266+
co_await server.close();
267+
};
268+
269+
run_loop(setup);
270+
}
271+
272+
TEST(UdpTest, simultaneousReceiveOneDies) {
273+
auto setup = [&](const Loop &loop) -> uvco::Promise<void> {
274+
Udp server{loop};
275+
co_await server.bind("::1", 9999, 0);
276+
277+
Promise<std::string> packet = server.receiveOne();
278+
EXPECT_DEATH({ auto packet = server.receiveOne(); }, "== nullptr' failed");
279+
280+
co_await server.close();
281+
};
282+
283+
run_loop(setup);
284+
}
285+
255286
TEST(UdpTest, udpNoClose) {
256287
uint64_t counter = 0;
257288
const uv_udp_t *underlying{};

uvco/udp.cc

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ Promise<void> Udp::connect(const AddressHandle &address) {
9898
Promise<void> Udp::send(std::span<char> buffer,
9999
std::optional<AddressHandle> address) {
100100
SendAwaiter_ sendAwaiter{};
101-
uv_udp_send_t req;
101+
uv_udp_send_t req{};
102+
BOOST_ASSERT(uv_req_get_data((uv_req_t *)&req) == nullptr);
102103
uv_req_set_data((uv_req_t *)&req, &sendAwaiter);
103104

104105
std::array<uv_buf_t, 1> bufs{};
@@ -115,14 +116,18 @@ Promise<void> Udp::send(std::span<char> buffer,
115116
const uv_status status =
116117
uv_udp_send(&req, udp_.get(), bufs.begin(), 1, addr, onSendDone);
117118
if (status != 0) {
119+
uv_req_set_data((uv_req_t *)&req, nullptr);
118120
throw UvcoException{status, "uv_udp_send() failed immediately"};
119121
}
120122

121123
const uv_status status_done = co_await sendAwaiter;
122124
if (status_done != 0) {
125+
uv_req_set_data((uv_req_t *)&req, nullptr);
123126
throw UvcoException{status_done, "uv_udp_send() failed while sending"};
124127
}
125128

129+
uv_req_set_data((uv_req_t *)&req, nullptr);
130+
126131
co_return;
127132
}
128133

@@ -133,9 +138,11 @@ Promise<std::string> Udp::receiveOne() {
133138

134139
Promise<std::pair<std::string, AddressHandle>> Udp::receiveOneFrom() {
135140
RecvAwaiter_ awaiter{};
136-
udp_->data = &awaiter;
141+
BOOST_ASSERT(uv_handle_get_data((uv_handle_t *)udp_.get()) == nullptr);
142+
uv_handle_set_data((uv_handle_t *)udp_.get(), &awaiter);
137143
const uv_status status = udpStartReceive();
138144
if (status != 0) {
145+
uv_handle_set_data((uv_handle_t *)udp_.get(), nullptr);
139146
throw UvcoException(status, "uv_udp_recv_start()");
140147
}
141148

@@ -144,19 +151,19 @@ Promise<std::pair<std::string, AddressHandle>> Udp::receiveOneFrom() {
144151
co_await awaiter;
145152

146153
// Any exceptions are thrown in RecvAwaiter_::await_resume
147-
udp_->data = nullptr;
154+
uv_handle_set_data((uv_handle_t *)udp_.get(), nullptr);
148155
co_return std::move(packet.value());
149156
}
150157

151158
MultiPromise<std::pair<std::string, AddressHandle>> Udp::receiveMany() {
152-
FlagGuard receivingGuard{is_receiving_};
153-
154159
RecvAwaiter_ awaiter{};
155160
awaiter.stop_receiving_ = false;
156-
udp_->data = &awaiter;
161+
BOOST_ASSERT(uv_handle_get_data((uv_handle_t *)udp_.get()) == nullptr);
162+
uv_handle_set_data((uv_handle_t *)udp_.get(), &awaiter);
157163

158164
const uv_status status = udpStartReceive();
159165
if (status != 0) {
166+
uv_handle_set_data((uv_handle_t *)udp_.get(), nullptr);
160167
throw UvcoException(status, "receiveMany(): uv_udp_recv_start()");
161168
}
162169

@@ -169,11 +176,12 @@ MultiPromise<std::pair<std::string, AddressHandle>> Udp::receiveMany() {
169176
}
170177
// It's possible that co_yield doesn't resume anymore, therefore clear
171178
// reference to local awaiter.
172-
udp_->data = nullptr;
179+
uv_handle_set_data((uv_handle_t *)udp_.get(), nullptr);
173180
co_yield std::move(buffer.value());
174-
udp_->data = &awaiter;
181+
BOOST_ASSERT(uv_handle_get_data((uv_handle_t *)udp_.get()) == nullptr);
182+
uv_handle_set_data((uv_handle_t *)udp_.get(), &awaiter);
175183
}
176-
udp_->data = nullptr;
184+
uv_handle_set_data((uv_handle_t *)udp_.get(), nullptr);
177185
udpStopReceive();
178186
co_return;
179187
}

uvco/udp.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,24 @@ class Udp {
6060
/// Receive a single UDP packet.
6161
///
6262
/// TODO: use a better-suited buffer type.
63+
///
64+
/// Only one coroutine can be receiving at a time. This is currently enforced
65+
/// by assertions.
6366
Promise<std::string> receiveOne();
6467

6568
/// Receive a single UDP packet and also return the sender's address.
69+
///
70+
/// Only one coroutine can be receiving at a time. This is currently enforced
71+
/// by assertions.
6672
Promise<std::pair<std::string, AddressHandle>> receiveOneFrom();
6773

6874
/// Generate packets received on socket. Call stopReceiveMany() when no more
6975
/// packets are desired; otherwise this will continue indefinitely.
76+
///
77+
/// Only one coroutine can be receiving at a time. This is currently enforced
78+
/// by assertions.
7079
MultiPromise<std::pair<std::string, AddressHandle>> receiveMany();
80+
7181
/// Stop receiving with `receiveMany()` by cancelling the receiving generator
7282
/// coroutine.
7383
void

0 commit comments

Comments
 (0)