Skip to content

Commit 616810d

Browse files
hanidamlajmeta-codesync[bot]
authored andcommitted
WtStreamManager inline comments, lints, refactors, dbg logs, feedback
Summary: * improves variable names for clarity * adds some additional documentation * adds DBG logs in some useful locations Reviewed By: afrind Differential Revision: D84759224 fbshipit-source-id: d6aa13bf3528285ad2037f2304a89aa38f4e8649
1 parent 11cbf9e commit 616810d

File tree

5 files changed

+314
-240
lines changed

5 files changed

+314
-240
lines changed

proxygen/lib/http/coro/test/HttpWtUpstreamTests.cpp

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,17 @@ TEST(WtStreamManager, BasicSelfBidi) {
184184

185185
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
186186
// 0x00 is the next expected bidi stream id for client
187-
auto bidiRes = streamManager.getBidiHandle(0x00);
187+
auto bidiRes = streamManager.getOrCreateBidiHandle(0x00);
188188
EXPECT_NE(bidiRes.readHandle, nullptr);
189189
EXPECT_NE(bidiRes.writeHandle, nullptr);
190190

191191
// 0x08 is not the next expected bidi stream id for client
192-
bidiRes = streamManager.getBidiHandle(0x08);
192+
bidiRes = streamManager.getOrCreateBidiHandle(0x08);
193193
EXPECT_EQ(bidiRes.readHandle, nullptr);
194194
EXPECT_EQ(bidiRes.writeHandle, nullptr);
195195

196196
// 0x04 is the next expected bidi stream id for client
197-
bidiRes = streamManager.getBidiHandle(0x04);
197+
bidiRes = streamManager.getOrCreateBidiHandle(0x04);
198198
EXPECT_NE(bidiRes.readHandle, nullptr);
199199
EXPECT_NE(bidiRes.writeHandle, nullptr);
200200
}
@@ -207,17 +207,17 @@ TEST(WtStreamManager, BasicSelfUni) {
207207

208208
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
209209
// 0x02 is the next expected uni stream id for client
210-
auto bidiRes = streamManager.getBidiHandle(0x02);
210+
auto bidiRes = streamManager.getOrCreateBidiHandle(0x02);
211211
EXPECT_EQ(bidiRes.readHandle, nullptr); // egress only
212212
EXPECT_NE(bidiRes.writeHandle, nullptr);
213213

214214
// 0x0a is not the next expected uni stream id for client
215-
bidiRes = streamManager.getBidiHandle(0x0a);
215+
bidiRes = streamManager.getOrCreateBidiHandle(0x0a);
216216
EXPECT_EQ(bidiRes.readHandle, nullptr); // egress only
217217
EXPECT_EQ(bidiRes.writeHandle, nullptr);
218218

219219
// 0x06 is the next expected uni stream id for client
220-
bidiRes = streamManager.getBidiHandle(0x06);
220+
bidiRes = streamManager.getOrCreateBidiHandle(0x06);
221221
EXPECT_EQ(bidiRes.readHandle, nullptr); // egress only
222222
EXPECT_NE(bidiRes.writeHandle, nullptr);
223223
}
@@ -230,17 +230,17 @@ TEST(WtStreamManager, BasicPeerBidi) {
230230

231231
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
232232
// 0x01 is the next expected bidi stream for server
233-
auto bidiRes = streamManager.getBidiHandle(0x01);
233+
auto bidiRes = streamManager.getOrCreateBidiHandle(0x01);
234234
EXPECT_NE(bidiRes.readHandle, nullptr);
235235
EXPECT_NE(bidiRes.writeHandle, nullptr);
236236

237237
// 0x09 is not the next expected bidi stream for server
238-
bidiRes = streamManager.getBidiHandle(0x09);
238+
bidiRes = streamManager.getOrCreateBidiHandle(0x09);
239239
EXPECT_EQ(bidiRes.readHandle, nullptr);
240240
EXPECT_EQ(bidiRes.writeHandle, nullptr);
241241

242242
// 0x05 is the next expected bidi stream for server
243-
bidiRes = streamManager.getBidiHandle(0x05);
243+
bidiRes = streamManager.getOrCreateBidiHandle(0x05);
244244
EXPECT_NE(bidiRes.readHandle, nullptr);
245245
EXPECT_NE(bidiRes.writeHandle, nullptr);
246246
}
@@ -253,17 +253,17 @@ TEST(WtStreamManager, BasicPeerUni) {
253253

254254
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
255255
// 0x03 is the next expected uni stream for server
256-
auto bidiRes = streamManager.getBidiHandle(0x03);
256+
auto bidiRes = streamManager.getOrCreateBidiHandle(0x03);
257257
EXPECT_NE(bidiRes.readHandle, nullptr);
258258
EXPECT_EQ(bidiRes.writeHandle, nullptr); // ingress only
259259

260260
// 0x0b is not the next expected uni stream for server
261-
bidiRes = streamManager.getBidiHandle(0x0b);
261+
bidiRes = streamManager.getOrCreateBidiHandle(0x0b);
262262
EXPECT_EQ(bidiRes.readHandle, nullptr);
263263
EXPECT_EQ(bidiRes.writeHandle, nullptr);
264264

265265
// 0x07 is the next expected bidi stream for server
266-
bidiRes = streamManager.getBidiHandle(0x07);
266+
bidiRes = streamManager.getOrCreateBidiHandle(0x07);
267267
EXPECT_NE(bidiRes.readHandle, nullptr);
268268
EXPECT_EQ(bidiRes.writeHandle, nullptr); // ingress only
269269
}
@@ -276,19 +276,19 @@ TEST(WtStreamManager, NextBidiUniHandle) {
276276

277277
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
278278
// next egress handle tests
279-
auto uni = CHECK_NOTNULL(streamManager.nextEgressHandle());
279+
auto uni = CHECK_NOTNULL(streamManager.createEgressHandle());
280280
EXPECT_EQ(uni->getID(), 0x02);
281-
uni = streamManager.nextEgressHandle();
281+
uni = streamManager.createEgressHandle();
282282
EXPECT_EQ(uni->getID(), 0x06);
283283

284284
// next bidi handle test
285-
auto bidi = streamManager.nextBidiHandle();
285+
auto bidi = streamManager.createBidiHandle();
286286
EXPECT_NE(bidi.readHandle, nullptr);
287287
EXPECT_NE(bidi.writeHandle, nullptr);
288288
EXPECT_EQ(bidi.readHandle->getID(), bidi.writeHandle->getID());
289289
EXPECT_EQ(bidi.readHandle->getID(), 0x00);
290290

291-
bidi = streamManager.nextBidiHandle();
291+
bidi = streamManager.createBidiHandle();
292292
EXPECT_NE(bidi.readHandle, nullptr);
293293
EXPECT_NE(bidi.writeHandle, nullptr);
294294
EXPECT_EQ(bidi.readHandle->getID(), bidi.writeHandle->getID());
@@ -306,20 +306,20 @@ TEST(WtStreamManager, StreamLimits) {
306306
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
307307

308308
// a single egress handle should succeed
309-
auto uni = streamManager.nextEgressHandle();
309+
auto uni = streamManager.createEgressHandle();
310310
EXPECT_NE(uni, nullptr);
311311

312312
// a single bidi handle should succeed
313-
auto bidi = streamManager.nextBidiHandle();
313+
auto bidi = streamManager.createBidiHandle();
314314
EXPECT_NE(bidi.readHandle, nullptr);
315315
EXPECT_NE(bidi.writeHandle, nullptr);
316316

317317
// next egress handle should fail
318-
uni = streamManager.nextEgressHandle();
318+
uni = streamManager.createEgressHandle();
319319
EXPECT_EQ(uni, nullptr);
320320

321321
// next bidi handle should fail
322-
bidi = streamManager.nextBidiHandle();
322+
bidi = streamManager.createBidiHandle();
323323
EXPECT_EQ(bidi.readHandle, nullptr);
324324
EXPECT_EQ(bidi.writeHandle, nullptr);
325325

@@ -328,11 +328,11 @@ TEST(WtStreamManager, StreamLimits) {
328328
EXPECT_TRUE(streamManager.onMaxStreams(MaxStreamsUni{2}));
329329

330330
// next egress handle should succeed
331-
uni = CHECK_NOTNULL(streamManager.nextEgressHandle());
331+
uni = CHECK_NOTNULL(streamManager.createEgressHandle());
332332
EXPECT_NE(uni, nullptr);
333333

334334
// next bidi handle should succeed
335-
bidi = streamManager.nextBidiHandle();
335+
bidi = streamManager.createBidiHandle();
336336
EXPECT_NE(bidi.readHandle, nullptr);
337337
EXPECT_NE(bidi.writeHandle, nullptr);
338338
}
@@ -345,9 +345,9 @@ TEST(WtStreamManager, EnqueueIngressData) {
345345
WtStreamManagerCb cb;
346346
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
347347

348-
// next nextBidiHandle should succeed
349-
auto one = streamManager.nextBidiHandle();
350-
auto two = streamManager.nextBidiHandle();
348+
// next createBidiHandle should succeed
349+
auto one = streamManager.createBidiHandle();
350+
auto two = streamManager.createBidiHandle();
351351
CHECK(one.readHandle && one.writeHandle && two.readHandle && two.writeHandle);
352352

353353
constexpr auto kBufLen = 65'535;
@@ -378,9 +378,9 @@ TEST(WtStreamManager, WriteEgressHandle) {
378378
WtStreamManagerCb cb;
379379
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
380380

381-
// next two ::nextBidiHandle should succeed
382-
auto one = streamManager.nextBidiHandle();
383-
auto two = streamManager.nextBidiHandle();
381+
// next two ::createBidiHandle should succeed
382+
auto one = streamManager.createBidiHandle();
383+
auto two = streamManager.createBidiHandle();
384384
CHECK(one.readHandle && one.writeHandle && two.readHandle && two.writeHandle);
385385

386386
constexpr auto kBufLen = 65'535;
@@ -443,8 +443,8 @@ TEST(WtStreamManager, BidiHandleCancellation) {
443443
WtStreamManagerCb cb;
444444
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
445445

446-
// next ::nextBidiHandle should succeed
447-
auto one = streamManager.nextBidiHandle();
446+
// next ::createBidiHandle should succeed
447+
auto one = streamManager.createBidiHandle();
448448
CHECK(one.readHandle && one.writeHandle);
449449

450450
auto res = one.writeHandle->writeStreamData(
@@ -475,8 +475,8 @@ TEST(WtStreamManager, GrantFlowControlCredit) {
475475

476476
constexpr auto kBufLen = 65'535;
477477

478-
// next ::nextBidiHandle should succeed
479-
auto one = streamManager.nextBidiHandle();
478+
// next ::createBidiHandle should succeed
479+
auto one = streamManager.createBidiHandle();
480480
CHECK(one.readHandle && one.writeHandle);
481481
// fills up both conn- & stream-level flow control
482482
EXPECT_TRUE(streamManager.enqueue(*one.readHandle,
@@ -497,8 +497,8 @@ TEST(WtStreamManager, GrantFlowControlCredit) {
497497
EXPECT_EQ(maxStreamData.maxData, kBufLen * 2);
498498
EXPECT_EQ(maxConnData.maxData, kBufLen * 2);
499499

500-
// next ::nextBidiHandle should succeed
501-
auto two = streamManager.nextBidiHandle();
500+
// next ::createBidiHandle should succeed
501+
auto two = streamManager.createBidiHandle();
502502
CHECK(two.readHandle && two.writeHandle);
503503
// fills up both conn- & stream-level flow control
504504
fut = two.readHandle->readStreamData();
@@ -518,8 +518,8 @@ TEST(WtStreamManager, StopSendingResetStreamTest) {
518518
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
519519
constexpr auto kBufLen = 65'535;
520520

521-
// next ::nextBidiHandle should succeed
522-
auto one = streamManager.nextBidiHandle();
521+
// next ::createBidiHandle should succeed
522+
auto one = streamManager.createBidiHandle();
523523
CHECK(one.readHandle && one.writeHandle);
524524
auto id = one.readHandle->getID();
525525

@@ -553,7 +553,7 @@ TEST(WtStreamManager, StopSendingResetStreamTest) {
553553
EXPECT_FALSE(streamManager.hasStreams());
554554

555555
// ::resetStream on a unidirectional egress stream should erase the stream
556-
auto* egress = CHECK_NOTNULL(streamManager.nextEgressHandle());
556+
auto* egress = CHECK_NOTNULL(streamManager.createEgressHandle());
557557
EXPECT_TRUE(streamManager.hasStreams());
558558
egress->resetStream(/*error=*/0); // read side is closed for an egress-only
559559
// handle; bidirectionally complete after
@@ -569,8 +569,8 @@ TEST(WtStreamManager, AwaitWritableTest) {
569569
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
570570

571571
constexpr auto kBufLen = 65'535;
572-
// next ::nextBidiHandle should succeed
573-
auto eh = CHECK_NOTNULL(streamManager.nextEgressHandle());
572+
// next ::createBidiHandle should succeed
573+
auto eh = CHECK_NOTNULL(streamManager.createEgressHandle());
574574

575575
// await writable future should be synchronously ready & equal to kBufLen
576576
// (default egress stream fc)
@@ -608,9 +608,9 @@ TEST(WtStreamManager, WritableStreams) {
608608
constexpr auto kBufLen = 65'535;
609609
constexpr auto kAtMost = std::numeric_limits<uint64_t>::max();
610610

611-
// next two ::nextEgressHandle should succeed
612-
auto one = CHECK_NOTNULL(streamManager.nextEgressHandle());
613-
auto two = CHECK_NOTNULL(streamManager.nextEgressHandle());
611+
// next two ::createEgressHandle should succeed
612+
auto one = CHECK_NOTNULL(streamManager.createEgressHandle());
613+
auto two = CHECK_NOTNULL(streamManager.createEgressHandle());
614614

615615
// 1 byte + eof; next writableStream == one
616616
auto writeRes = one->writeStreamData(
@@ -666,8 +666,8 @@ TEST(WtStreamManager, DrainWtSession) {
666666
streamManager.onDrainSession({});
667667

668668
// all self- and peer-initiated streams will fail
669-
auto one = streamManager.nextBidiHandle();
670-
auto* two = streamManager.nextEgressHandle();
669+
auto one = streamManager.createBidiHandle();
670+
auto* two = streamManager.createEgressHandle();
671671
EXPECT_TRUE(one.readHandle == nullptr && one.writeHandle == nullptr);
672672
EXPECT_TRUE(two == nullptr);
673673
}
@@ -680,10 +680,10 @@ TEST(WtStreamManager, CloseWtSession) {
680680
WtStreamManager streamManager{detail::WtDir::Client, self, peer, cb};
681681

682682
// ensure cancellation source is cancelled when invoked ::onCloseSession
683-
auto one = streamManager.nextBidiHandle();
683+
auto one = streamManager.createBidiHandle();
684684
auto oneRead = one.readHandle->readStreamData();
685685

686-
auto* two = streamManager.nextEgressHandle();
686+
auto* two = streamManager.createEgressHandle();
687687
auto cts = {one.readHandle->getCancelToken(),
688688
one.writeHandle->getCancelToken(),
689689
two->getCancelToken()};

proxygen/lib/http/coro/util/WtEgressContainer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@
1111

1212
namespace proxygen::coro {
1313

14-
bool WtEgressContainer::enqueue(std::unique_ptr<folly::IOBuf> data,
15-
bool fin) noexcept {
14+
WtBufferedStreamData::FcRes WtBufferedStreamData::enqueue(
15+
std::unique_ptr<folly::IOBuf> data, bool fin) noexcept {
1616
XCHECK(!fin_) << "enqueue after fin";
1717
auto len = data ? data->computeChainDataLength() : 0;
1818
data_.append(std::move(data)); // ok if nullptr
1919
fin_ = fin;
2020
return window_.buffer(len);
2121
}
2222

23-
WtEgressContainer::DequeueResult WtEgressContainer::dequeue(
23+
WtBufferedStreamData::DequeueResult WtBufferedStreamData::dequeue(
2424
uint64_t atMost) noexcept {
2525
// min of maxBytes and how many bytes remaining in egress window
2626
atMost = std::min({atMost, window_.getAvailable(), data_.chainLength()});

proxygen/lib/http/coro/util/WtEgressContainer.h

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ namespace proxygen::coro {
2424
* the application has buffered more than 64KiB bytes.
2525
*/
2626
struct BufferedFlowController {
27-
BufferedFlowController(uint64_t initMax = 0) : window_(initMax) {
27+
explicit BufferedFlowController(uint64_t initMax = 0) : window_(initMax) {
2828
}
2929

3030
// advances the bufferedOffset_ by len bytes – returns true if buffering len
3131
// bytes has exceeded send window
32-
bool buffer(uint64_t len) {
32+
enum FcRes : uint8_t { Unblocked = 0, Blocked = 1 };
33+
[[nodiscard]] FcRes buffer(uint64_t len) {
3334
bufferedOffset_ += len; // TODO(@damlaj) overflow
34-
return isBlocked();
35+
return isBlocked() ? FcRes::Blocked : FcRes::Unblocked;
3536
}
3637

3738
// adv currentOffset_ by len bytes (must be <= bufferedOffset_ after
@@ -41,32 +42,32 @@ struct BufferedFlowController {
4142
CHECK_LE(getCurrentOffset(), getBufferedOffset());
4243
}
4344

44-
bool grant(uint64_t offset) {
45+
[[nodiscard]] bool grant(uint64_t offset) {
4546
return window_.grant(offset);
4647
}
4748

4849
[[nodiscard]] bool isBlocked() const {
4950
return getBufferAvailable() == 0;
5051
}
5152

52-
uint64_t getBufferedOffset() const {
53+
[[nodiscard]] uint64_t getBufferedOffset() const {
5354
return bufferedOffset_;
5455
}
5556

56-
uint64_t getCurrentOffset() const {
57+
[[nodiscard]] uint64_t getCurrentOffset() const {
5758
return window_.getCurrentOffset();
5859
}
5960

60-
uint64_t getMaxOffset() const {
61+
[[nodiscard]] uint64_t getMaxOffset() const {
6162
return window_.getMaxOffset();
6263
}
6364

64-
uint64_t getBufferAvailable() const {
65+
[[nodiscard]] uint64_t getBufferAvailable() const {
6566
const auto bufferedBytes = bufferedOffset_ - getCurrentOffset();
6667
return bufferedBytes >= kMaxEgressBuf ? 0 : (kMaxEgressBuf - bufferedBytes);
6768
}
6869

69-
uint64_t getAvailable() const {
70+
[[nodiscard]] uint64_t getAvailable() const {
7071
return window_.getAvailable();
7172
}
7273

@@ -76,28 +77,30 @@ struct BufferedFlowController {
7677
uint64_t bufferedOffset_{0};
7778
};
7879

79-
class WtEgressContainer {
80+
class WtBufferedStreamData {
8081
public:
81-
WtEgressContainer(uint64_t initMax = 0) : window_(initMax) {
82+
explicit WtBufferedStreamData(uint64_t initMax = 0) : window_(initMax) {
8283
}
8384

8485
const BufferedFlowController& window() {
8586
return window_;
8687
}
8788

8889
/**
89-
* enqueues data into the container's buffer – returns true if the egress is
90-
* now flow control blocked
90+
* enqueues data into the container's buffer – returns FcRes::Blocked if the
91+
* egress is now flow control blocked, FcRes::Unblocked otherwise
9192
*/
92-
bool enqueue(std::unique_ptr<folly::IOBuf> data, bool fin) noexcept;
93+
using FcRes = BufferedFlowController::FcRes;
94+
FcRes enqueue(std::unique_ptr<folly::IOBuf> data, bool fin) noexcept;
9395

9496
struct DequeueResult {
9597
std::unique_ptr<folly::IOBuf> data;
9698
bool fin{false};
9799
};
100+
98101
/**
99-
* dequeues data from the container's buffer, returning min(atMost,
100-
* window_available) bytes
102+
* Dequeues data from the container's buffer, returning min(atMost,
103+
* window_available, bytes_enqueued) bytes.
101104
*/
102105
DequeueResult dequeue(uint64_t atMost) noexcept;
103106

0 commit comments

Comments
 (0)