Skip to content

Commit da0e7e7

Browse files
committed
chore: use BackedArguments for implementing a pipeline message
Remove duplicate code for building backed arguments, similarly to #6171 Signed-off-by: Roman Gershman <[email protected]>
1 parent 6ec47d4 commit da0e7e7

File tree

4 files changed

+47
-76
lines changed

4 files changed

+47
-76
lines changed

src/common/backed_args.h

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@ class BackedArguments {
1818
BackedArguments() {
1919
}
2020

21-
template <typename I> BackedArguments(I begin, I end);
21+
// Construct the arguments from iterator range.
22+
// TODO: In general we could get away without the len argument,
23+
// but that would require fixing base::it::CompoundIterator tu support substraction.
24+
// Similarly, I wish that CompoundIterator support implement -> operator.
25+
template <typename I> BackedArguments(I begin, I end, size_t len) {
26+
Assign(begin, end, len);
27+
}
28+
29+
template <typename I> void Assign(I begin, I end, size_t len);
2230

2331
size_t HeapMemory() const {
2432
size_t s1 = lens_.capacity() <= kLenCap ? 0 : lens_.capacity() * sizeof(uint32_t);
@@ -57,22 +65,29 @@ class BackedArguments {
5765

5866
static_assert(sizeof(BackedArguments) == 128);
5967

60-
template <typename I> BackedArguments::BackedArguments(I begin, I end) {
61-
lens_.reserve(end - begin);
68+
template <typename I> void BackedArguments::Assign(I begin, I end, size_t len) {
69+
lens_.resize(len);
6270
size_t total_size = 0;
71+
unsigned idx = 0;
6372
for (auto it = begin; it != end; ++it) {
64-
lens_.push_back(it->size());
65-
total_size += it->size() + 1; // +1 for '\0'
73+
size_t sz = (*it).size();
74+
lens_[idx++] = sz;
75+
total_size += sz + 1; // +1 for '\0'
6676
}
6777
storage_.resize(total_size);
6878

79+
// Reclaim memory if we have too much allocated.
80+
if (storage_.capacity() > kStorageCap && total_size < storage_.capacity() / 2)
81+
storage_.shrink_to_fit();
82+
6983
char* next = storage_.data();
7084
for (auto it = begin; it != end; ++it) {
71-
if (!it->empty()) {
72-
memcpy(next, it->data(), it->size());
85+
size_t sz = (*it).size();
86+
if (sz > 0) {
87+
memcpy(next, (*it).data(), sz);
7388
}
74-
next[it->size()] = '\0';
75-
next += it->size() + 1;
89+
next[sz] = '\0';
90+
next += sz + 1;
7691
}
7792
}
7893

src/facade/dragonfly_connection.cc

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ void UpdateIoBufCapacity(const io::IoBuf& io_buf, ConnectionStats* stats,
152152
}
153153
}
154154

155+
size_t UsedMemoryInternal(const Connection::PipelineMessage& msg) {
156+
return sizeof(msg) + msg.HeapMemory();
157+
}
158+
155159
struct TrafficLogger {
156160
// protects agains closing the file while writing or data races when opening the file.
157161
// Also, makes sure that LogTraffic are executed atomically.
@@ -377,19 +381,6 @@ class PipelineCacheSizeTracker {
377381

378382
thread_local PipelineCacheSizeTracker tl_pipe_cache_sz_tracker;
379383

380-
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
381-
auto* next = storage.data();
382-
for (size_t i = 0; i < args.size(); ++i) {
383-
RespExpr::Buffer buf = args[i].GetBuf();
384-
size_t s = buf.size();
385-
if (s)
386-
memcpy(next, buf.data(), s);
387-
next[s] = '\0';
388-
this->args[i] = MutableSlice(next, s);
389-
next += (s + 1);
390-
}
391-
}
392-
393384
Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
394385
std::string_view value_in)
395386
: cmd{std::move(cmd_in)}, value{value_in}, backing_size{0} {
@@ -425,23 +416,13 @@ Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
425416
}
426417
}
427418

428-
void Connection::PipelineMessage::Reset(size_t nargs, size_t capacity) {
429-
storage.resize(capacity);
430-
args.resize(nargs);
431-
}
432-
433-
size_t Connection::PipelineMessage::StorageCapacity() const {
434-
return storage.capacity() + args.capacity();
435-
}
436-
437419
size_t Connection::MessageHandle::UsedMemory() const {
438420
struct MessageSize {
439421
size_t operator()(const PubMessagePtr& msg) {
440422
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
441423
}
442424
size_t operator()(const PipelineMessagePtr& msg) {
443-
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
444-
msg->storage.capacity();
425+
return UsedMemoryInternal(*msg);
445426
}
446427
size_t operator()(const MonitorMessage& msg) {
447428
return msg.capacity();
@@ -555,11 +536,10 @@ void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) {
555536
}
556537

557538
void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
558-
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
539+
DVLOG(2) << "Dispatching pipeline: " << msg.Front();
559540

560541
++self->local_stats_.cmds;
561-
self->service_->DispatchCommand(ParsedArgs{msg.args}, self->reply_builder_.get(),
562-
self->cc_.get());
542+
self->service_->DispatchCommand(ParsedArgs{msg}, self->reply_builder_.get(), self->cc_.get());
563543

564544
self->last_interaction_ = time(nullptr);
565545
self->skip_next_squashing_ = false;
@@ -644,7 +624,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
644624
static atomic_uint32_t next_id{1};
645625

646626
constexpr size_t kReqSz = sizeof(Connection::PipelineMessage);
647-
static_assert(kReqSz <= 256 && kReqSz >= 200);
627+
static_assert(kReqSz <= 256);
648628

649629
switch (protocol) {
650630
case Protocol::REDIS:
@@ -1537,16 +1517,17 @@ void Connection::SquashPipeline() {
15371517
DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported.
15381518

15391519
vector<ParsedArgs> squash_cmds;
1540-
squash_cmds.reserve(dispatch_q_.size());
1520+
1521+
squash_cmds.reserve(std::min<uint32_t>(dispatch_q_.size(), pipeline_squash_limit_cached));
15411522

15421523
uint64_t start = CycleClock::Now();
15431524

15441525
for (const auto& msg : dispatch_q_) {
15451526
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
15461527
<< msg.handle.index() << " on " << DebugInfo();
15471528

1548-
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1549-
squash_cmds.emplace_back(ParsedArgs(pmsg->args));
1529+
const auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1530+
squash_cmds.emplace_back(*pmsg);
15501531
if (squash_cmds.size() >= pipeline_squash_limit_cached) {
15511532
// We reached the limit of commands to squash, so we dispatch them.
15521533
break;
@@ -1755,25 +1736,15 @@ void Connection::AsyncFiber() {
17551736
}
17561737

17571738
Connection::PipelineMessagePtr Connection::FromArgs(const RespVec& args) {
1758-
DCHECK(!args.empty());
1759-
size_t backed_sz = 0;
1760-
for (const auto& arg : args) {
1761-
CHECK_EQ(RespExpr::STRING, arg.type);
1762-
backed_sz += arg.GetBuf().size() + 1; // for '\0'
1763-
}
1764-
DCHECK(backed_sz);
1765-
1766-
static_assert(alignof(PipelineMessage) == 8);
1767-
17681739
PipelineMessagePtr ptr;
1769-
if (ptr = GetFromPipelinePool(); ptr) {
1770-
ptr->Reset(args.size(), backed_sz);
1771-
} else {
1740+
if (ptr = GetFromPipelinePool(); !ptr) {
17721741
// We must construct in place here, since there is a slice that uses memory locations
1773-
ptr = make_unique<PipelineMessage>(args.size(), backed_sz);
1742+
ptr = make_unique<PipelineMessage>();
17741743
}
17751744

1776-
ptr->SetArgs(args);
1745+
auto map = [](const RespExpr& expr) { return expr.GetView(); };
1746+
auto range = base::it::Transform(map, base::it::Range(args.begin(), args.end()));
1747+
ptr->Assign(range.begin(), range.end(), args.size());
17771748
return ptr;
17781749
}
17791750

@@ -1782,7 +1753,7 @@ void Connection::ShrinkPipelinePool() {
17821753
return;
17831754

17841755
if (tl_pipe_cache_sz_tracker.CheckAndUpdateWatermark(pipeline_req_pool_.size())) {
1785-
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->StorageCapacity();
1756+
stats_->pipeline_cmd_cache_bytes -= UsedMemoryInternal(*pipeline_req_pool_.back());
17861757
pipeline_req_pool_.pop_back();
17871758
}
17881759
}
@@ -1792,7 +1763,7 @@ Connection::PipelineMessagePtr Connection::GetFromPipelinePool() {
17921763
return nullptr;
17931764

17941765
auto ptr = std::move(pipeline_req_pool_.back());
1795-
stats_->pipeline_cmd_cache_bytes -= ptr->StorageCapacity();
1766+
stats_->pipeline_cmd_cache_bytes -= UsedMemoryInternal(*ptr);
17961767
pipeline_req_pool_.pop_back();
17971768
return ptr;
17981769
}
@@ -1968,7 +1939,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
19681939
pending_pipeline_cmd_cnt_--;
19691940
pending_pipeline_bytes_ -= used_mem;
19701941
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
1971-
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
1942+
stats_->pipeline_cmd_cache_bytes += UsedMemoryInternal(*(*pipe));
19721943
pipeline_req_pool_.push_back(std::move(*pipe));
19731944
}
19741945
}

src/facade/dragonfly_connection.h

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <utility>
1414
#include <variant>
1515

16+
#include "common/backed_args.h"
1617
#include "facade/acl_commands_def.h"
1718
#include "facade/facade_types.h"
1819
#include "facade/memcache_parser.h"
@@ -80,23 +81,7 @@ class Connection : public util::Connection {
8081
};
8182

8283
// Pipeline message, accumulated Redis command to be executed.
83-
struct PipelineMessage {
84-
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
85-
}
86-
87-
void Reset(size_t nargs, size_t capacity);
88-
89-
void SetArgs(const RespVec& args);
90-
91-
size_t StorageCapacity() const;
92-
93-
// mi_stl_allocator uses mi heap internally.
94-
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
95-
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
96-
97-
absl::InlinedVector<std::string_view, 6> args;
98-
StorageType storage;
99-
};
84+
using PipelineMessage = cmn::BackedArguments;
10085

10186
// Pipeline message, accumulated Memcached command to be executed.
10287
struct MCPipelineMessage {

src/server/conn_context.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ static void SendSubscriptionChangedResponse(string_view action, std::optional<st
3636

3737
StoredCmd::StoredCmd(const CommandId* cid, facade::ArgSlice args, facade::ReplyMode mode)
3838
: cid_{cid}, args_{args}, reply_mode_{mode} {
39-
backed_ = std::make_unique<cmn::BackedArguments>(args.begin(), args.end());
39+
backed_ = std::make_unique<cmn::BackedArguments>(args.begin(), args.end(), args.size());
4040
args_ = facade::ParsedArgs{*backed_};
4141
}
4242

0 commit comments

Comments
 (0)