Skip to content

Commit 2245d6d

Browse files
committed
chore: use BackedArguments for implementing a pipeline message
Remove duplicate code for collapsing multiple blobs, similarly to #6171 Signed-off-by: Roman Gershman <[email protected]>
1 parent 6ec47d4 commit 2245d6d

File tree

3 files changed

+36
-48
lines changed

3 files changed

+36
-48
lines changed

src/common/backed_args.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ class BackedArguments {
3333
return std::string_view{storage_.data(), lens_[0]};
3434
}
3535

36+
/*auto MakeArgs() const {
37+
absl::InlinedVector<std::string_view, 5> args(lens.size());
38+
const char* ptr = storage.data();
39+
for (size_t i = 0; i < lens.size(); ++i) {
40+
args[i] = std::string_view{ptr, lens[i]};
41+
ptr += lens[i] + 1; // +1 for '\0'
42+
}
43+
return args;
44+
}*/
45+
3646
size_t size() const {
3747
return lens_.size();
3848
}

src/facade/dragonfly_connection.cc

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -377,15 +377,18 @@ class PipelineCacheSizeTracker {
377377

378378
thread_local PipelineCacheSizeTracker tl_pipe_cache_sz_tracker;
379379

380-
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
381-
auto* next = storage.data();
382-
for (size_t i = 0; i < args.size(); ++i) {
383-
RespExpr::Buffer buf = args[i].GetBuf();
384-
size_t s = buf.size();
380+
void Connection::PipelineMessage::SetArgs(const RespVec& src, size_t backed_size) {
381+
storage_.resize(backed_size);
382+
lens_.resize(src.size());
383+
384+
auto* next = storage_.data();
385+
for (size_t i = 0; i < src.size(); ++i) {
386+
RespExpr::Buffer buf = src[i].GetBuf();
387+
uint32_t s = buf.size();
385388
if (s)
386389
memcpy(next, buf.data(), s);
387390
next[s] = '\0';
388-
this->args[i] = MutableSlice(next, s);
391+
lens_[i] = s;
389392
next += (s + 1);
390393
}
391394
}
@@ -425,23 +428,13 @@ Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
425428
}
426429
}
427430

428-
void Connection::PipelineMessage::Reset(size_t nargs, size_t capacity) {
429-
storage.resize(capacity);
430-
args.resize(nargs);
431-
}
432-
433-
size_t Connection::PipelineMessage::StorageCapacity() const {
434-
return storage.capacity() + args.capacity();
435-
}
436-
437431
size_t Connection::MessageHandle::UsedMemory() const {
438432
struct MessageSize {
439433
size_t operator()(const PubMessagePtr& msg) {
440434
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
441435
}
442436
size_t operator()(const PipelineMessagePtr& msg) {
443-
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
444-
msg->storage.capacity();
437+
return sizeof(PipelineMessage) + msg->HeapMemory();
445438
}
446439
size_t operator()(const MonitorMessage& msg) {
447440
return msg.capacity();
@@ -555,11 +548,10 @@ void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) {
555548
}
556549

557550
void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
558-
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
551+
DVLOG(2) << "Dispatching pipeline: " << msg.Front();
559552

560553
++self->local_stats_.cmds;
561-
self->service_->DispatchCommand(ParsedArgs{msg.args}, self->reply_builder_.get(),
562-
self->cc_.get());
554+
self->service_->DispatchCommand(ParsedArgs{msg}, self->reply_builder_.get(), self->cc_.get());
563555

564556
self->last_interaction_ = time(nullptr);
565557
self->skip_next_squashing_ = false;
@@ -644,7 +636,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
644636
static atomic_uint32_t next_id{1};
645637

646638
constexpr size_t kReqSz = sizeof(Connection::PipelineMessage);
647-
static_assert(kReqSz <= 256 && kReqSz >= 200);
639+
static_assert(kReqSz <= 256);
648640

649641
switch (protocol) {
650642
case Protocol::REDIS:
@@ -1537,16 +1529,17 @@ void Connection::SquashPipeline() {
15371529
DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported.
15381530

15391531
vector<ParsedArgs> squash_cmds;
1540-
squash_cmds.reserve(dispatch_q_.size());
1532+
1533+
squash_cmds.reserve(std::min<uint32_t>(dispatch_q_.size(), pipeline_squash_limit_cached));
15411534

15421535
uint64_t start = CycleClock::Now();
15431536

15441537
for (const auto& msg : dispatch_q_) {
15451538
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
15461539
<< msg.handle.index() << " on " << DebugInfo();
15471540

1548-
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1549-
squash_cmds.emplace_back(ParsedArgs(pmsg->args));
1541+
const auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1542+
squash_cmds.emplace_back(*pmsg);
15501543
if (squash_cmds.size() >= pipeline_squash_limit_cached) {
15511544
// We reached the limit of commands to squash, so we dispatch them.
15521545
break;
@@ -1766,14 +1759,12 @@ Connection::PipelineMessagePtr Connection::FromArgs(const RespVec& args) {
17661759
static_assert(alignof(PipelineMessage) == 8);
17671760

17681761
PipelineMessagePtr ptr;
1769-
if (ptr = GetFromPipelinePool(); ptr) {
1770-
ptr->Reset(args.size(), backed_sz);
1771-
} else {
1762+
if (ptr = GetFromPipelinePool(); !ptr) {
17721763
// We must construct in place here, since there is a slice that uses memory locations
1773-
ptr = make_unique<PipelineMessage>(args.size(), backed_sz);
1764+
ptr = make_unique<PipelineMessage>();
17741765
}
17751766

1776-
ptr->SetArgs(args);
1767+
ptr->SetArgs(args, backed_sz);
17771768
return ptr;
17781769
}
17791770

@@ -1782,7 +1773,7 @@ void Connection::ShrinkPipelinePool() {
17821773
return;
17831774

17841775
if (tl_pipe_cache_sz_tracker.CheckAndUpdateWatermark(pipeline_req_pool_.size())) {
1785-
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->StorageCapacity();
1776+
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->HeapMemory();
17861777
pipeline_req_pool_.pop_back();
17871778
}
17881779
}
@@ -1792,7 +1783,7 @@ Connection::PipelineMessagePtr Connection::GetFromPipelinePool() {
17921783
return nullptr;
17931784

17941785
auto ptr = std::move(pipeline_req_pool_.back());
1795-
stats_->pipeline_cmd_cache_bytes -= ptr->StorageCapacity();
1786+
stats_->pipeline_cmd_cache_bytes -= ptr->HeapMemory();
17961787
pipeline_req_pool_.pop_back();
17971788
return ptr;
17981789
}
@@ -1968,7 +1959,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
19681959
pending_pipeline_cmd_cnt_--;
19691960
pending_pipeline_bytes_ -= used_mem;
19701961
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
1971-
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
1962+
stats_->pipeline_cmd_cache_bytes += (*pipe)->HeapMemory();
19721963
pipeline_req_pool_.push_back(std::move(*pipe));
19731964
}
19741965
}

src/facade/dragonfly_connection.h

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <utility>
1414
#include <variant>
1515

16+
#include "common/backed_args.h"
1617
#include "facade/acl_commands_def.h"
1718
#include "facade/facade_types.h"
1819
#include "facade/memcache_parser.h"
@@ -80,22 +81,8 @@ class Connection : public util::Connection {
8081
};
8182

8283
// Pipeline message, accumulated Redis command to be executed.
83-
struct PipelineMessage {
84-
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
85-
}
86-
87-
void Reset(size_t nargs, size_t capacity);
88-
89-
void SetArgs(const RespVec& args);
90-
91-
size_t StorageCapacity() const;
92-
93-
// mi_stl_allocator uses mi heap internally.
94-
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
95-
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
96-
97-
absl::InlinedVector<std::string_view, 6> args;
98-
StorageType storage;
84+
struct PipelineMessage : public cmn::BackedArguments {
85+
void SetArgs(const RespVec& args, size_t backed_size);
9986
};
10087

10188
// Pipeline message, accumulated Memcached command to be executed.

0 commit comments

Comments
 (0)