Skip to content

Commit abf86d0

Browse files
committed
chore: introduce BackedArguments
In order to support async Dispatch we must use arguments allocated on heap. This PR introduces a basic ParsedArgument class that will be used by the protocol parser. Also, we introduce a new directory that will contain low-level common utilities and classes that can be used by any other components: facade, core and server. We move StringOrView there. Signed-off-by: Roman Gershman <[email protected]>
1 parent c43c845 commit abf86d0

File tree

9 files changed

+242
-164
lines changed

9 files changed

+242
-164
lines changed

src/common/backed_args.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <absl/container/inlined_vector.h>
8+
9+
#include <string_view>
10+
11+
namespace cmn {
12+
13+
#ifdef ABSL_HAVE_ADDRESS_SANITIZER
14+
constexpr size_t kReqStorageSize = 88;
15+
#else
16+
constexpr size_t kReqStorageSize = 88;
17+
#endif
18+
19+
class BackedArguments {
20+
public:
21+
BackedArguments() {
22+
}
23+
24+
template <typename I> BackedArguments(I begin, I end);
25+
26+
size_t UsedMemory() const {
27+
return lens_.capacity() * sizeof(uint32_t) + storage_.capacity();
28+
}
29+
30+
// The capacity is chosen so that we allocate a fully utilized (128 bytes) block.
31+
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
32+
33+
std::string_view Front() const {
34+
return std::string_view{storage_.data(), lens_[0]};
35+
}
36+
37+
/*auto MakeArgs() const {
38+
absl::InlinedVector<std::string_view, 5> args(lens.size());
39+
const char* ptr = storage.data();
40+
for (size_t i = 0; i < lens.size(); ++i) {
41+
args[i] = std::string_view{ptr, lens[i]};
42+
ptr += lens[i] + 1; // +1 for '\0'
43+
}
44+
return args;
45+
}*/
46+
47+
size_t size() const {
48+
return lens_.size();
49+
}
50+
51+
size_t elem_len(size_t i) const {
52+
return lens_[i];
53+
}
54+
55+
size_t elem_capacity(size_t i) const {
56+
return elem_len(i) + 1;
57+
}
58+
59+
std::string_view at(uint32_t index, uint32_t offset) const {
60+
const char* ptr = storage_.data() + offset;
61+
return std::string_view{ptr + offset, lens_[index]};
62+
}
63+
64+
protected:
65+
absl::InlinedVector<uint32_t, 5> lens_;
66+
StorageType storage_;
67+
};
68+
69+
static_assert(sizeof(BackedArguments) == 128);
70+
71+
template <typename I> BackedArguments::BackedArguments(I begin, I end) {
72+
lens_.reserve(end - begin);
73+
size_t total_size = 0;
74+
for (auto it = begin; it != end; ++it) {
75+
lens_.push_back(it->size());
76+
total_size += it->size() + 1; // +1 for '\0'
77+
}
78+
storage_.resize(total_size);
79+
80+
char* next = storage_.data();
81+
for (auto it = begin; it != end; ++it) {
82+
if (!it->empty()) {
83+
memcpy(next, it->data(), it->size());
84+
}
85+
next[it->size()] = '\0';
86+
next += it->size() + 1;
87+
}
88+
}
89+
90+
} // namespace cmn

src/facade/dragonfly_connection.cc

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -377,15 +377,18 @@ class PipelineCacheSizeTracker {
377377

378378
thread_local PipelineCacheSizeTracker tl_pipe_cache_sz_tracker;
379379

380-
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
381-
auto* next = storage.data();
382-
for (size_t i = 0; i < args.size(); ++i) {
383-
RespExpr::Buffer buf = args[i].GetBuf();
384-
size_t s = buf.size();
380+
void Connection::PipelineMessage::SetArgs(const RespVec& src, size_t backed_size) {
381+
storage_.resize(backed_size);
382+
lens_.resize(src.size());
383+
384+
auto* next = storage_.data();
385+
for (size_t i = 0; i < src.size(); ++i) {
386+
RespExpr::Buffer buf = src[i].GetBuf();
387+
uint32_t s = buf.size();
385388
if (s)
386389
memcpy(next, buf.data(), s);
387390
next[s] = '\0';
388-
this->args[i] = MutableSlice(next, s);
391+
lens_[i] = s;
389392
next += (s + 1);
390393
}
391394
}
@@ -425,23 +428,13 @@ Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
425428
}
426429
}
427430

428-
void Connection::PipelineMessage::Reset(size_t nargs, size_t capacity) {
429-
storage.resize(capacity);
430-
args.resize(nargs);
431-
}
432-
433-
size_t Connection::PipelineMessage::StorageCapacity() const {
434-
return storage.capacity() + args.capacity();
435-
}
436-
437431
size_t Connection::MessageHandle::UsedMemory() const {
438432
struct MessageSize {
439433
size_t operator()(const PubMessagePtr& msg) {
440434
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
441435
}
442436
size_t operator()(const PipelineMessagePtr& msg) {
443-
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
444-
msg->storage.capacity();
437+
return sizeof(PipelineMessage) + msg->UsedMemory();
445438
}
446439
size_t operator()(const MonitorMessage& msg) {
447440
return msg.capacity();
@@ -555,11 +548,10 @@ void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) {
555548
}
556549

557550
void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
558-
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
551+
DVLOG(2) << "Dispatching pipeline: " << msg.Front();
559552

560553
++self->local_stats_.cmds;
561-
self->service_->DispatchCommand(ParsedArgs{msg.args}, self->reply_builder_.get(),
562-
self->cc_.get());
554+
self->service_->DispatchCommand(ParsedArgs{msg}, self->reply_builder_.get(), self->cc_.get());
563555

564556
self->last_interaction_ = time(nullptr);
565557
self->skip_next_squashing_ = false;
@@ -644,7 +636,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
644636
static atomic_uint32_t next_id{1};
645637

646638
constexpr size_t kReqSz = sizeof(Connection::PipelineMessage);
647-
static_assert(kReqSz <= 256 && kReqSz >= 200);
639+
static_assert(kReqSz <= 256);
648640

649641
switch (protocol) {
650642
case Protocol::REDIS:
@@ -1537,16 +1529,17 @@ void Connection::SquashPipeline() {
15371529
DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported.
15381530

15391531
vector<ParsedArgs> squash_cmds;
1540-
squash_cmds.reserve(dispatch_q_.size());
1532+
1533+
squash_cmds.reserve(std::min<uint32_t>(dispatch_q_.size(), pipeline_squash_limit_cached));
15411534

15421535
uint64_t start = CycleClock::Now();
15431536

15441537
for (const auto& msg : dispatch_q_) {
15451538
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
15461539
<< msg.handle.index() << " on " << DebugInfo();
15471540

1548-
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1549-
squash_cmds.emplace_back(ParsedArgs(pmsg->args));
1541+
const auto& pmsg = get<PipelineMessagePtr>(msg.handle);
1542+
squash_cmds.emplace_back(*pmsg);
15501543
if (squash_cmds.size() >= pipeline_squash_limit_cached) {
15511544
// We reached the limit of commands to squash, so we dispatch them.
15521545
break;
@@ -1766,14 +1759,12 @@ Connection::PipelineMessagePtr Connection::FromArgs(const RespVec& args) {
17661759
static_assert(alignof(PipelineMessage) == 8);
17671760

17681761
PipelineMessagePtr ptr;
1769-
if (ptr = GetFromPipelinePool(); ptr) {
1770-
ptr->Reset(args.size(), backed_sz);
1771-
} else {
1762+
if (ptr = GetFromPipelinePool(); !ptr) {
17721763
// We must construct in place here, since there is a slice that uses memory locations
1773-
ptr = make_unique<PipelineMessage>(args.size(), backed_sz);
1764+
ptr = make_unique<PipelineMessage>();
17741765
}
17751766

1776-
ptr->SetArgs(args);
1767+
ptr->SetArgs(args, backed_sz);
17771768
return ptr;
17781769
}
17791770

@@ -1782,7 +1773,7 @@ void Connection::ShrinkPipelinePool() {
17821773
return;
17831774

17841775
if (tl_pipe_cache_sz_tracker.CheckAndUpdateWatermark(pipeline_req_pool_.size())) {
1785-
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->StorageCapacity();
1776+
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->UsedMemory();
17861777
pipeline_req_pool_.pop_back();
17871778
}
17881779
}
@@ -1792,7 +1783,7 @@ Connection::PipelineMessagePtr Connection::GetFromPipelinePool() {
17921783
return nullptr;
17931784

17941785
auto ptr = std::move(pipeline_req_pool_.back());
1795-
stats_->pipeline_cmd_cache_bytes -= ptr->StorageCapacity();
1786+
stats_->pipeline_cmd_cache_bytes -= ptr->UsedMemory();
17961787
pipeline_req_pool_.pop_back();
17971788
return ptr;
17981789
}
@@ -1968,7 +1959,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
19681959
pending_pipeline_cmd_cnt_--;
19691960
pending_pipeline_bytes_ -= used_mem;
19701961
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
1971-
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
1962+
stats_->pipeline_cmd_cache_bytes += (*pipe)->UsedMemory();
19721963
pipeline_req_pool_.push_back(std::move(*pipe));
19731964
}
19741965
}

src/facade/dragonfly_connection.h

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <utility>
1414
#include <variant>
1515

16+
#include "common/backed_args.h"
1617
#include "facade/acl_commands_def.h"
1718
#include "facade/facade_types.h"
1819
#include "facade/memcache_parser.h"
@@ -80,22 +81,8 @@ class Connection : public util::Connection {
8081
};
8182

8283
// Pipeline message, accumulated Redis command to be executed.
83-
struct PipelineMessage {
84-
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
85-
}
86-
87-
void Reset(size_t nargs, size_t capacity);
88-
89-
void SetArgs(const RespVec& args);
90-
91-
size_t StorageCapacity() const;
92-
93-
// mi_stl_allocator uses mi heap internally.
94-
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
95-
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
96-
97-
absl::InlinedVector<std::string_view, 6> args;
98-
StorageType storage;
84+
struct PipelineMessage : public cmn::BackedArguments {
85+
void SetArgs(const RespVec& args, size_t backed_size);
9986
};
10087

10188
// Pipeline message, accumulated Memcached command to be executed.

src/facade/facade_types.h

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <variant>
1414

1515
#include "base/iterator.h"
16+
#include "common/backed_args.h"
1617
#include "facade/op_status.h"
1718

1819
namespace facade {
@@ -45,31 +46,102 @@ class ParsedArgs {
4546
public:
4647
ParsedArgs() = default;
4748

48-
explicit ParsedArgs(ArgSlice args) : args_(args) { // NOLINT google-explicit-constructor
49+
ParsedArgs(const cmn::BackedArguments& bargs) // NOLINT google-explicit-constructor
50+
: args_(&bargs) {
4951
}
5052

53+
// TODO: to remove this constructor in favor of BackedArguments.
54+
// The only reason to keep it is because our RedisParser returns ArgSlice with memory
55+
// held elsewhere. We will have to use BackedArguments everywhere to support async invocations.
56+
ParsedArgs(ArgSlice slice) // NOLINT google-explicit-constructor
57+
: args_(slice) {
58+
}
59+
60+
ParsedArgs(const ParsedArgs& other) = default;
61+
ParsedArgs& operator=(const ParsedArgs& bargs) = default;
62+
5163
size_t size() const {
52-
return args_.size();
64+
return std::visit([](const auto& args) { return args.size(); }, args_);
5365
}
5466

5567
bool empty() const {
56-
return args_.empty();
68+
return size() == 0;
5769
}
5870

5971
ParsedArgs Tail() const {
60-
return ParsedArgs{args_.subspan(1)};
72+
return std::visit([](const auto& args) { return args.Tail(); }, args_);
6173
}
6274

6375
std::string_view Front() const {
64-
return args_.front();
76+
return std::visit([](const auto& args) { return args.front(); }, args_);
6577
}
6678

67-
ArgSlice ToSlice() const {
68-
return args_;
79+
ArgSlice ToSlice(CmdArgVec* scratch) const {
80+
return std::visit([scratch](const auto& args) { return args.ToSlice(scratch); }, args_);
81+
}
82+
83+
size_t UsedMemory() const {
84+
return std::visit([](const auto& args) { return args.UsedMemory(); }, args_);
6985
}
7086

7187
private:
72-
absl::Span<const std::string_view> args_;
88+
struct WrapperBacked {
89+
WrapperBacked(const cmn::BackedArguments* args) : args_(args) { // NOLINT
90+
}
91+
92+
const cmn::BackedArguments* args_;
93+
uint32_t index_ = 0;
94+
uint32_t ofs_bytes_ = 0;
95+
96+
ParsedArgs Tail() const {
97+
ParsedArgs res(*args_);
98+
WrapperBacked* wb = std::get_if<WrapperBacked>(&res.args_);
99+
wb->index_ = index_ + 1;
100+
wb->ofs_bytes_ = ofs_bytes_ + args_->elem_capacity(index_);
101+
return res;
102+
};
103+
104+
size_t size() const {
105+
return args_->size() - index_;
106+
}
107+
108+
std::string_view front() const {
109+
return args_->at(index_, ofs_bytes_);
110+
}
111+
112+
ArgSlice ToSlice(CmdArgVec* scratch) const {
113+
scratch->resize(size());
114+
size_t offset = ofs_bytes_;
115+
for (size_t i = 0; i < scratch->size(); ++i) {
116+
(*scratch)[i] = args_->at(index_ + i, offset);
117+
offset += args_->elem_capacity(index_ + i);
118+
}
119+
return ArgSlice{scratch->data(), scratch->size()};
120+
}
121+
122+
size_t UsedMemory() const {
123+
return args_->UsedMemory();
124+
}
125+
};
126+
127+
struct Slice : public ArgSlice {
128+
using ArgSlice::ArgSlice;
129+
Slice(ArgSlice other) : ArgSlice(other) { // NOLINT
130+
}
131+
132+
ParsedArgs Tail() const {
133+
return ParsedArgs{subspan(1)};
134+
}
135+
136+
ArgSlice ToSlice(std::vector<std::string_view>* /*scratch*/) const {
137+
return *this;
138+
}
139+
140+
constexpr size_t UsedMemory() const {
141+
return 0;
142+
}
143+
};
144+
std::variant<Slice, WrapperBacked> args_;
73145
};
74146

75147
inline std::string_view ToSV(std::string_view slice) {
@@ -188,7 +260,8 @@ struct ReplyStats {
188260
absl::flat_hash_map<std::string, uint64_t> err_count;
189261
size_t script_error_count = 0;
190262

191-
// This variable can be updated directly from shard threads when they allocate memory for replies.
263+
// This variable can be updated directly from shard threads when they allocate memory for
264+
// replies.
192265
std::atomic<size_t> squashing_current_reply_size{0};
193266

194267
ReplyStats() = default;

0 commit comments

Comments
 (0)