Skip to content

Conversation

@kostasrim
Copy link
Contributor

@kostasrim kostasrim commented Nov 17, 2025

This PR adds IoLoopV2 that removes fiber blocking read calls from the flow. This allows the connection fiber to handle other events while recv calls are handled asynchronously from readiness events triggered by a registered multishot poll.

  • add IoLoopV2 that uses non fiber blocking async reads via poll registration
  • add a manual trigger workflow to run all the regression tests (both epoll/uring) without tls

Follow up on a separate PR:

  • Allow also EnableRecvMultishot()

Resolves #6028

Workarounds and Failing tests

  1. test_timeout fails in assert int(info["timeout_disconnects"]) >= 1. It was non critical and this test has also failed upstream a couple of times.
  2. Added DflyInstance::has_arg to avoid propagating io_loop_v2 flag on tests that use older versions of dragonfly
  3. Calls ResetOnRecvHook inside Migrate() instead of OnPreMigrateThread. The reason for that is that OnPreMigrateThread check fails if the connection is closing and because deregistering the hook is preemptive -- this invariant might break. To mitigate this, we deregister inside Migrate() instead and if after it the connection is closing we exit instead of starting the migration.
  4. added migration_allowed_to_register_ to flag when a migration should register a callback. A connection might migrate before it reaches its asynchronous flow (io_loop_v2) which can lead to double registration errors (one from the migration and one once it reaches io_loop_v2). To respect the synchronous parts of the connection, we use a flag to indicate when a migration should also register a callback.
  5. corner case in ConsumeInput() read comment in: https://github.com/dragonflydb/dragonfly/pull/6069/files#diff-b88cbca1b2c9337ab2a37262f9d516b117f4996da25e2ae173e0e129f2b82b20R1207

}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No functional change from here and onwards in comparison IoLoop

await check_stats()


@pytest.mark.tls
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no changes to the tests. I added the tls label to disable tls tests on the manual workflow I introduced (because tls is broken for now).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that

Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
@kostasrim kostasrim marked this pull request as ready for review December 1, 2025 10:49
@kostasrim
Copy link
Contributor Author

@kostasrim kostasrim changed the title [wip] chore: asynchronous IO for connection fiber chore: asynchronous IO for connection fiber Dec 1, 2025
Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 5 suggestions posted.

Comment augment review to trigger a new review at any time.

export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF_ARG is only set when inputs.df-arg equals 'epoll', which prevents other values (e.g., the workflow’s 'expiremental_io_loop_v2') from being propagated to pytest.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have epoll argument why do you need this?

export FILTER="${{inputs.filter}} and not exclude_epoll"
# Run only replication tests with epoll
timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$?
timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extra --df with no value after force_epoll=true in the pytest command, which is likely to break argument parsing.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

io_ec_ = make_error_code(errc::connection_aborted);
}

LOG_IF(FATAL, !io_ec_) << "Recv error: " << strerror(-res) << " errno " << errno;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strerror(-res) uses the negated recv return value instead of errno, so the logged error message text will be incorrect.

🤖 Was this useful? React with 👍 or 👎

[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line prints ec, which is uninitialized here, instead of the actual I/O error io_ec_.

🤖 Was this useful? React with 👍 or 👎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

#include <absl/strings/str_cat.h>
#include <absl/time/time.h>

#include <condition_variable>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO remove the includes. plugin is too aggressive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please

@kostasrim kostasrim requested a review from romange December 1, 2025 10:54
@kostasrim
Copy link
Contributor Author

@romange you can start taking a look.

  1. I added a workflow + flag to run ioloopv2. The former is to avoid running the tests on both paths every time. Once we slowly migrate I will remove the workflow file -- it's just a temporary solution
  2. I only used epoll based notifications -- will follow up with enable_multi_shot_=true as well
  3. I introduced ioloopv2 to provide clear serparation between the two implementations

export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have epoll argument why do you need this?

export FILTER="${{inputs.filter}} and not exclude_epoll"
# Run only replication tests with epoll
timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$?
timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

#include <absl/strings/str_cat.h>
#include <absl/time/time.h>

#include <condition_variable>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please

}

io::MutableBytes buf = io_buf_.AppendBuffer();
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use TryRecv interface

do {
HandleMigrateRequest();

// We *must* poll again for readiness. The event handler we registered above
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible to improve for sure. 'We must' is a too strong statement as you can introduce a state that tracks whether the read is needed and when it's not needed.
Either change to TODO or fix it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, is it a regression to poll unconditionally ? I imagine, the overhead for EAGAIN is minimal so we don't really need a variable for this. I wrote must because I found poll to be the simplest way. I reworded the comment to be less strong. My only question is, what is the reason to prefer a flag over an unconditional poll assuming EAGAIN is not eating CPU time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a system call and we add another system-call per each loop. It's fine for now but I think we will have to add a state variable to track when this call is needed. I do not know how it effects performance, in fact, we will need to do some benchmarking tests on this PR. I will do it next week

[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

util::fb2::Fiber async_fb_; // async fiber (if started)

std::error_code io_ec_;
util::fb2::EventCount io_event_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use EventCount for thread-local synchronization - use CondVarAny instead

await check_stats()


@pytest.mark.tls
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that

Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
@kostasrim kostasrim requested a review from romange December 3, 2025 10:44
return;
}

LOG_EVERY_T(ERROR, 10) << "Recv error: " << ec;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

io::Result<size_t> res = socket_->TryRecv(buf);

// error path
if (!res) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability, I would reverse this condition and handle the happy case first. aka
if (res && *res) {


from . import dfly_args
from .instance import DflyInstance
from .utility import *
Copy link
Collaborator

@romange romange Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(64); });
auto res = IoLoop();
variant<error_code, Connection::ParserStatus> res;
if (GetFlag(FLAGS_expiremental_io_loop_v2) && !is_tls_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduce a boolean variable instead of duplicating condition below.

@kostasrim kostasrim requested a review from romange December 10, 2025 09:39
@kostasrim
Copy link
Contributor Author

augment review

@kostasrim
Copy link
Contributor Author

kostasrim commented Dec 10, 2025

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Comment augment review to trigger a new review at any time.

}

variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
error_code ec;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error_code ec; is declared but never used in IoLoopV2(), which can trigger an unused-variable warning; consider removing it.

🤖 Was this useful? React with 👍 or 👎

VLOG(1) << "Connection::OnShutdown";

BreakOnce(POLLHUP);
io_ec_ = make_error_code(errc::connection_aborted);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that it's not needed as Connection::Shutdown() should generate a socket notification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't for some some reason -- test_no_tls_on_admin_port

I want to get this in a green state and then I can follow up


void Connection::OnPostMigrateThread() {
DVLOG(1) << "[" << id_ << "] OnPostMigrateThread";
DVLOG(1) << "[" << id_ << "] OnPostMigrateThread " << GetClientId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed. GetClientId(); is id_ which is already printed in DVLOG

const bool io_loop_v2 = GetFlag(FLAGS_experimental_io_loop_v2);
if (io_loop_v2 && !is_tls_ && socket_ && socket_->IsOpen() && allowed_to_register_) {
socket_->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) {
CHECK(this);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove check(this) - it's always true.

}

void Connection::HandleMigrateRequest() {
void Connection::HandleMigrateRequest(bool unregister) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not use unregister

// Migrations should call RegisterRecv if the connection has reached here once.
// Otherwise, a migration won't register and instead wait for the connection state
// to first reach here and then call RegisterRecv inside IoLoopV2.
allowed_to_register_ = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think allowed_to_register_ is redundant - I did not see how you use it and it's always true inside RegisterRecv

Copy link
Contributor Author

@kostasrim kostasrim Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed here as we might end up calling RegisterOnRecv twice. Everything above the call to IoLoopV2 is synchronous and fiber blocking. So, the connection might migrate before it reaches IoLoopV2, call RegisteOnRecv() as part of the PostMigration(). Eventually, it will reach IoLoopV2 body which will try to register and check fail. I edited the comment.

Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

several minor comments

if (io_loop_v2 && !is_tls_) {
// Everything above the IoLoopV2 is fiber blocking. A connection can migrate before
// it reaches here and will cause a double RegisterOnRecv check fail. To avoid this,
// a migration shall only call RegisterOnRev if it reached the main IoLoopV2 below.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not enough. Similarly to CancelOnErrorCb - you need to unregister from the polling in OnPreMigrateThread function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIll take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So, I do have it, but inside Migrate() and not in OnPreMigrateThread. The reason is that ResetOnRecvHook() is preemptive and by the time we wake up the check:

 658   CHECK(!cc_->conn_closing);

Can fail. To avoid this, I moved it to Migrate() such that if after reseting the hook the connection is closing, we exit instead of moving on with the migration. It's a subtle corner case.

Logically, we should deregister in OnPreMigrate just like we register on OnPostMigrate -- however I don't see a clean way to do this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could call ResetOnRecvHook after the check(), after socket_->CancelOnErrorCb();,
but ok - we can check it later.

Copy link
Collaborator

@romange romange Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want you to do more changes in this PR. Can you please summarize
TODOs - failing/skipped tests, unresolved issues/workarounds like this one, pubsub failure etc in the PR description so it won't get lost completely after we merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could call ResetOnRecvHook after the check(), after socket_->CancelOnErrorCb();,
but ok - we can check it later.

Yes but then we would move on with a migration on a closing connection. Doin that seems at least redundant 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want you to do more changes in this PR. Can you please summarize TODOs - failing/skipped tests, unresolved issues/workarounds like this one, pubsub failure etc in the PR description so it won't get lost completely after we merge?

sure! one sec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange done!

@kostasrim kostasrim merged commit 128df73 into main Dec 11, 2025
14 of 15 checks passed
@kostasrim kostasrim deleted the kpr31 branch December 11, 2025 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate and test OnRecv in ConnFb

3 participants