-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore: asynchronous IO for connection fiber #6069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| phase_ = PROCESS; | ||
| bool is_iobuf_full = io_buf_.AppendLen() == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No functional change from here and onwards in comparison IoLoop
tests/dragonfly/connection_test.py
Outdated
| await check_stats() | ||
|
|
||
|
|
||
| @pytest.mark.tls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no changes to the tests. I added the tls label to disable tls tests on the manual workflow I introduced (because tls is broken for now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 5 suggestions posted.
Comment augment review to trigger a new review at any time.
| export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search" | ||
| export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors | ||
| if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DF_ARG is only set when inputs.df-arg equals 'epoll', which prevents other values (e.g., the workflow’s 'expiremental_io_loop_v2') from being propagated to pytest.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have epoll argument why do you need this?
| export FILTER="${{inputs.filter}} and not exclude_epoll" | ||
| # Run only replication tests with epoll | ||
| timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$? | ||
| timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an extra --df with no value after force_epoll=true in the pytest command, which is likely to break argument parsing.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/facade/dragonfly_connection.cc
Outdated
| io_ec_ = make_error_code(errc::connection_aborted); | ||
| } | ||
|
|
||
| LOG_IF(FATAL, !io_ec_) << "Recv error: " << strerror(-res) << " errno " << errno; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strerror(-res) uses the negated recv return value instead of errno, so the logged error message text will be incorrect.
🤖 Was this useful? React with 👍 or 👎
src/facade/dragonfly_connection.cc
Outdated
| [this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; }); | ||
|
|
||
| if (io_ec_) { | ||
| LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log line prints ec, which is uninitialized here, instead of the actual I/O error io_ec_.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
src/facade/dragonfly_connection.cc
Outdated
| #include <absl/strings/str_cat.h> | ||
| #include <absl/time/time.h> | ||
|
|
||
| #include <condition_variable> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO remove the includes. plugin is too aggressive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, please
|
@romange you can start taking a look.
|
| export ROOT_DIR="${GITHUB_WORKSPACE}/tests/dragonfly/valkey_search" | ||
| export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors | ||
| if [[ "${{inputs.df-arg}}" == 'epoll' ]]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have epoll argument why do you need this?
| export FILTER="${{inputs.filter}} and not exclude_epoll" | ||
| # Run only replication tests with epoll | ||
| timeout 80m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$? | ||
| timeout 80m pytest -m "$FILTER" $DF_ARG --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --df --log-cli-level=INFO || code=$? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/facade/dragonfly_connection.cc
Outdated
| #include <absl/strings/str_cat.h> | ||
| #include <absl/time/time.h> | ||
|
|
||
| #include <condition_variable> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, please
src/facade/dragonfly_connection.cc
Outdated
| } | ||
|
|
||
| io::MutableBytes buf = io_buf_.AppendBuffer(); | ||
| int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use TryRecv interface
src/facade/dragonfly_connection.cc
Outdated
| do { | ||
| HandleMigrateRequest(); | ||
|
|
||
| // We *must* poll again for readiness. The event handler we registered above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible to improve for sure. 'We must' is a too strong statement as you can introduce a state that tracks whether the read is needed and when it's not needed.
Either change to TODO or fix it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, is it a regression to poll unconditionally ? I imagine, the overhead for EAGAIN is minimal so we don't really need a variable for this. I wrote must because I found poll to be the simplest way. I reworded the comment to be less strong. My only question is, what is the reason to prefer a flag over an unconditional poll assuming EAGAIN is not eating CPU time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a system call and we add another system-call per each loop. It's fine for now but I think we will have to add a state variable to track when this call is needed. I do not know how it effects performance, in fact, we will need to do some benchmarking tests on this PR. I will do it next week
src/facade/dragonfly_connection.cc
Outdated
| [this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; }); | ||
|
|
||
| if (io_ec_) { | ||
| LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
src/facade/dragonfly_connection.h
Outdated
| util::fb2::Fiber async_fb_; // async fiber (if started) | ||
|
|
||
| std::error_code io_ec_; | ||
| util::fb2::EventCount io_event_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not use EventCount for thread-local synchronization - use CondVarAny instead
tests/dragonfly/connection_test.py
Outdated
| await check_stats() | ||
|
|
||
|
|
||
| @pytest.mark.tls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, so instead - avoid enabling v2 for tls sockets. we have is_tls_ flag for that
Signed-off-by: Kostas Kyrimis <[email protected]>
Signed-off-by: Kostas Kyrimis <[email protected]>
src/facade/dragonfly_connection.cc
Outdated
| return; | ||
| } | ||
|
|
||
| LOG_EVERY_T(ERROR, 10) << "Recv error: " << ec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
src/facade/dragonfly_connection.cc
Outdated
| io::Result<size_t> res = socket_->TryRecv(buf); | ||
|
|
||
| // error path | ||
| if (!res) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for readability, I would reverse this condition and handle the happy case first. aka
if (res && *res) {
tests/dragonfly/pymemcached_test.py
Outdated
|
|
||
| from . import dfly_args | ||
| from .instance import DflyInstance | ||
| from .utility import * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed?
src/facade/dragonfly_connection.cc
Outdated
| UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(64); }); | ||
| auto res = IoLoop(); | ||
| variant<error_code, Connection::ParserStatus> res; | ||
| if (GetFlag(FLAGS_expiremental_io_loop_v2) && !is_tls_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
introduce a boolean variable instead of duplicating condition below.
Signed-off-by: Kostas Kyrimis <[email protected]>
|
augment review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 2 suggestions posted.
Comment augment review to trigger a new review at any time.
src/facade/dragonfly_connection.cc
Outdated
| } | ||
|
|
||
| variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() { | ||
| error_code ec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error_code ec; is declared but never used in IoLoopV2(), which can trigger an unused-variable warning; consider removing it.
🤖 Was this useful? React with 👍 or 👎
Signed-off-by: Kostas Kyrimis <[email protected]>
| VLOG(1) << "Connection::OnShutdown"; | ||
|
|
||
| BreakOnce(POLLHUP); | ||
| io_ec_ = make_error_code(errc::connection_aborted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that it's not needed as Connection::Shutdown() should generate a socket notification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't for some some reason -- test_no_tls_on_admin_port
I want to get this in a green state and then I can follow up
src/facade/dragonfly_connection.cc
Outdated
|
|
||
| void Connection::OnPostMigrateThread() { | ||
| DVLOG(1) << "[" << id_ << "] OnPostMigrateThread"; | ||
| DVLOG(1) << "[" << id_ << "] OnPostMigrateThread " << GetClientId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not needed. GetClientId(); is id_ which is already printed in DVLOG
src/facade/dragonfly_connection.cc
Outdated
| const bool io_loop_v2 = GetFlag(FLAGS_experimental_io_loop_v2); | ||
| if (io_loop_v2 && !is_tls_ && socket_ && socket_->IsOpen() && allowed_to_register_) { | ||
| socket_->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) { | ||
| CHECK(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove check(this) - it's always true.
src/facade/dragonfly_connection.cc
Outdated
| } | ||
|
|
||
| void Connection::HandleMigrateRequest() { | ||
| void Connection::HandleMigrateRequest(bool unregister) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not use unregister
src/facade/dragonfly_connection.cc
Outdated
| // Migrations should call RegisterRecv if the connection has reached here once. | ||
| // Otherwise, a migration won't register and instead wait for the connection state | ||
| // to first reach here and then call RegisterRecv inside IoLoopV2. | ||
| allowed_to_register_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think allowed_to_register_ is redundant - I did not see how you use it and it's always true inside RegisterRecv
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's needed here as we might end up calling RegisterOnRecv twice. Everything above the call to IoLoopV2 is synchronous and fiber blocking. So, the connection might migrate before it reaches IoLoopV2, call RegisteOnRecv() as part of the PostMigration(). Eventually, it will reach IoLoopV2 body which will try to register and check fail. I edited the comment.
romange
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
several minor comments
| if (io_loop_v2 && !is_tls_) { | ||
| // Everything above the IoLoopV2 is fiber blocking. A connection can migrate before | ||
| // it reaches here and will cause a double RegisterOnRecv check fail. To avoid this, | ||
| // a migration shall only call RegisterOnRev if it reached the main IoLoopV2 below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not enough. Similarly to CancelOnErrorCb - you need to unregister from the polling in OnPreMigrateThread function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIll take a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So, I do have it, but inside Migrate() and not in OnPreMigrateThread. The reason is that ResetOnRecvHook() is preemptive and by the time we wake up the check:
658 CHECK(!cc_->conn_closing);
Can fail. To avoid this, I moved it to Migrate() such that if after reseting the hook the connection is closing, we exit instead of moving on with the migration. It's a subtle corner case.
Logically, we should deregister in OnPreMigrate just like we register on OnPostMigrate -- however I don't see a clean way to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could call ResetOnRecvHook after the check(), after socket_->CancelOnErrorCb();,
but ok - we can check it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not want you to do more changes in this PR. Can you please summarize
TODOs - failing/skipped tests, unresolved issues/workarounds like this one, pubsub failure etc in the PR description so it won't get lost completely after we merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could call ResetOnRecvHook after the check(), after
socket_->CancelOnErrorCb();,
but ok - we can check it later.
Yes but then we would move on with a migration on a closing connection. Doin that seems at least redundant 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not want you to do more changes in this PR. Can you please summarize TODOs - failing/skipped tests, unresolved issues/workarounds like this one, pubsub failure etc in the PR description so it won't get lost completely after we merge?
sure! one sec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@romange done!
This PR adds
IoLoopV2that removes fiber blocking read calls from the flow. This allows the connection fiber to handle other events while recv calls are handled asynchronously from readiness events triggered by a registered multishot poll.Follow up on a separate PR:
Resolves #6028
Workarounds and Failing tests
assert int(info["timeout_disconnects"]) >= 1. It was non critical and this test has also failed upstream a couple of times.ConsumeInput()read comment in: https://github.com/dragonflydb/dragonfly/pull/6069/files#diff-b88cbca1b2c9337ab2a37262f9d516b117f4996da25e2ae173e0e129f2b82b20R1207