@@ -152,6 +152,10 @@ void UpdateIoBufCapacity(const io::IoBuf& io_buf, ConnectionStats* stats,
152152 }
153153}
154154
155+ size_t UsedMemoryInternal (const Connection::PipelineMessage& msg) {
156+ return sizeof (msg) + msg.HeapMemory ();
157+ }
158+
155159struct TrafficLogger {
156160 // protects agains closing the file while writing or data races when opening the file.
157161 // Also, makes sure that LogTraffic are executed atomically.
@@ -377,19 +381,6 @@ class PipelineCacheSizeTracker {
377381
378382thread_local PipelineCacheSizeTracker tl_pipe_cache_sz_tracker;
379383
380- void Connection::PipelineMessage::SetArgs (const RespVec& args) {
381- auto * next = storage.data ();
382- for (size_t i = 0 ; i < args.size (); ++i) {
383- RespExpr::Buffer buf = args[i].GetBuf ();
384- size_t s = buf.size ();
385- if (s)
386- memcpy (next, buf.data (), s);
387- next[s] = ' \0 ' ;
388- this ->args [i] = MutableSlice (next, s);
389- next += (s + 1 );
390- }
391- }
392-
393384Connection::MCPipelineMessage::MCPipelineMessage (MemcacheParser::Command cmd_in,
394385 std::string_view value_in)
395386 : cmd{std::move (cmd_in)}, value{value_in}, backing_size{0 } {
@@ -425,23 +416,13 @@ Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
425416 }
426417}
427418
428- void Connection::PipelineMessage::Reset (size_t nargs, size_t capacity) {
429- storage.resize (capacity);
430- args.resize (nargs);
431- }
432-
433- size_t Connection::PipelineMessage::StorageCapacity () const {
434- return storage.capacity () + args.capacity ();
435- }
436-
437419size_t Connection::MessageHandle::UsedMemory () const {
438420 struct MessageSize {
439421 size_t operator ()(const PubMessagePtr& msg) {
440422 return sizeof (PubMessage) + (msg->channel .size () + msg->message .size ());
441423 }
442424 size_t operator ()(const PipelineMessagePtr& msg) {
443- return sizeof (PipelineMessage) + msg->args .capacity () * sizeof (MutableSlice) +
444- msg->storage .capacity ();
425+ return UsedMemoryInternal (*msg);
445426 }
446427 size_t operator ()(const MonitorMessage& msg) {
447428 return msg.capacity ();
@@ -555,11 +536,10 @@ void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) {
555536}
556537
557538void Connection::AsyncOperations::operator ()(Connection::PipelineMessage& msg) {
558- DVLOG (2 ) << " Dispatching pipeline: " << ToSV ( msg.args . front () );
539+ DVLOG (2 ) << " Dispatching pipeline: " << msg.Front ( );
559540
560541 ++self->local_stats_ .cmds ;
561- self->service_ ->DispatchCommand (ParsedArgs{msg.args }, self->reply_builder_ .get (),
562- self->cc_ .get ());
542+ self->service_ ->DispatchCommand (ParsedArgs{msg}, self->reply_builder_ .get (), self->cc_ .get ());
563543
564544 self->last_interaction_ = time (nullptr );
565545 self->skip_next_squashing_ = false ;
@@ -644,7 +624,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
644624 static atomic_uint32_t next_id{1 };
645625
646626 constexpr size_t kReqSz = sizeof (Connection::PipelineMessage);
647- static_assert (kReqSz <= 256 && kReqSz >= 200 );
627+ static_assert (kReqSz <= 256 );
648628
649629 switch (protocol) {
650630 case Protocol::REDIS:
@@ -1536,36 +1516,32 @@ void Connection::SquashPipeline() {
15361516 DCHECK_EQ (dispatch_q_.size (), pending_pipeline_cmd_cnt_);
15371517 DCHECK_EQ (reply_builder_->GetProtocol (), Protocol::REDIS); // Only Redis is supported.
15381518
1539- vector<ParsedArgs> squash_cmds;
1540- squash_cmds.reserve (dispatch_q_.size ());
1519+ unsigned pipeline_count = std::min<uint32_t >(dispatch_q_.size (), pipeline_squash_limit_cached);
15411520
15421521 uint64_t start = CycleClock::Now ();
15431522
1544- for (const auto & msg : dispatch_q_) {
1545- CHECK (holds_alternative<PipelineMessagePtr>(msg.handle ))
1546- << msg.handle .index () << " on " << DebugInfo ();
1523+ // We use indexes as iterators are invalidated when pushing into the queue.
1524+ auto get_next_fn = [i = 0 , this ]() mutable -> ParsedArgs {
1525+ const auto & elem = dispatch_q_[i++];
1526+ CHECK (holds_alternative<PipelineMessagePtr>(elem.handle ));
1527+ const auto & pmsg = get<PipelineMessagePtr>(elem.handle );
15471528
1548- auto & pmsg = get<PipelineMessagePtr>(msg.handle );
1549- squash_cmds.emplace_back (ParsedArgs (pmsg->args ));
1550- if (squash_cmds.size () >= pipeline_squash_limit_cached) {
1551- // We reached the limit of commands to squash, so we dispatch them.
1552- break ;
1553- }
1554- }
1529+ return *pmsg;
1530+ };
15551531
15561532 // async_dispatch is a guard to prevent concurrent writes into reply_builder_, hence
15571533 // it must guard the Flush() as well.
15581534 cc_->async_dispatch = true ;
15591535
15601536 DispatchManyResult result =
1561- service_->DispatchManyCommands (absl::MakeSpan (squash_cmds) , reply_builder_.get (), cc_.get ());
1537+ service_->DispatchManyCommands (get_next_fn, pipeline_count , reply_builder_.get (), cc_.get ());
15621538
15631539 uint32_t dispatched = result.processed ;
15641540 uint64_t before_flush = CycleClock::Now ();
15651541 //
15661542 // TODO: to investigate if always flushing will improve P99 latency because otherwise we
15671543 // wait for the next batch to finish before fully flushing the current response.
1568- if (pending_pipeline_cmd_cnt_ == squash_cmds. size () ||
1544+ if (pending_pipeline_cmd_cnt_ == pipeline_count ||
15691545 always_flush_pipeline_cached) { // Flush if no new commands appeared
15701546 reply_builder_->Flush ();
15711547 reply_builder_->SetBatchMode (false ); // in case the next dispatch is sync
@@ -1597,7 +1573,7 @@ void Connection::SquashPipeline() {
15971573 dispatch_q_.erase (it, it + dispatched);
15981574
15991575 // If interrupted due to pause, fall back to regular dispatch
1600- skip_next_squashing_ = dispatched != squash_cmds. size () ;
1576+ skip_next_squashing_ = dispatched != pipeline_count ;
16011577}
16021578
16031579void Connection::ClearPipelinedMessages () {
@@ -1755,25 +1731,15 @@ void Connection::AsyncFiber() {
17551731}
17561732
17571733Connection::PipelineMessagePtr Connection::FromArgs (const RespVec& args) {
1758- DCHECK (!args.empty ());
1759- size_t backed_sz = 0 ;
1760- for (const auto & arg : args) {
1761- CHECK_EQ (RespExpr::STRING, arg.type );
1762- backed_sz += arg.GetBuf ().size () + 1 ; // for '\0'
1763- }
1764- DCHECK (backed_sz);
1765-
1766- static_assert (alignof (PipelineMessage) == 8 );
1767-
17681734 PipelineMessagePtr ptr;
1769- if (ptr = GetFromPipelinePool (); ptr) {
1770- ptr->Reset (args.size (), backed_sz);
1771- } else {
1735+ if (ptr = GetFromPipelinePool (); !ptr) {
17721736 // We must construct in place here, since there is a slice that uses memory locations
1773- ptr = make_unique<PipelineMessage>(args. size (), backed_sz );
1737+ ptr = make_unique<PipelineMessage>();
17741738 }
17751739
1776- ptr->SetArgs (args);
1740+ auto map = [](const RespExpr& expr) { return expr.GetView (); };
1741+ auto range = base::it::Transform (map, base::it::Range (args.begin (), args.end ()));
1742+ ptr->Assign (range.begin (), range.end (), args.size ());
17771743 return ptr;
17781744}
17791745
@@ -1782,7 +1748,7 @@ void Connection::ShrinkPipelinePool() {
17821748 return ;
17831749
17841750 if (tl_pipe_cache_sz_tracker.CheckAndUpdateWatermark (pipeline_req_pool_.size ())) {
1785- stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back ()-> StorageCapacity ( );
1751+ stats_->pipeline_cmd_cache_bytes -= UsedMemoryInternal (* pipeline_req_pool_.back ());
17861752 pipeline_req_pool_.pop_back ();
17871753 }
17881754}
@@ -1792,7 +1758,7 @@ Connection::PipelineMessagePtr Connection::GetFromPipelinePool() {
17921758 return nullptr ;
17931759
17941760 auto ptr = std::move (pipeline_req_pool_.back ());
1795- stats_->pipeline_cmd_cache_bytes -= ptr-> StorageCapacity ( );
1761+ stats_->pipeline_cmd_cache_bytes -= UsedMemoryInternal (*ptr );
17961762 pipeline_req_pool_.pop_back ();
17971763 return ptr;
17981764}
@@ -1968,7 +1934,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
19681934 pending_pipeline_cmd_cnt_--;
19691935 pending_pipeline_bytes_ -= used_mem;
19701936 if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit ) {
1971- stats_->pipeline_cmd_cache_bytes += (* pipe)-> StorageCapacity ( );
1937+ stats_->pipeline_cmd_cache_bytes += UsedMemoryInternal (*(* pipe));
19721938 pipeline_req_pool_.push_back (std::move (*pipe));
19731939 }
19741940 }
0 commit comments