-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement "Async" RPC #22
Conversation
WalkthroughThe recent updates bring a suite of enhancements focusing on asynchronous and multi-relay RPC communication. Key changes include the introduction of a Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## 4.x #22 +/- ##
============================================
+ Coverage 70.52% 73.37% +2.84%
- Complexity 120 195 +75
============================================
Files 9 12 +3
Lines 302 507 +205
============================================
+ Hits 213 372 +159
- Misses 89 135 +46 ☔ View full report in Codecov by Sentry. |
…pportunities to lose a relay
…ting in lost relays (actually improves performance as well)
Hey @msmakouz , I saw @rustatian 's comment, thank you for the review :) I've just pushed some last changes that were needed after I tried using this implementation in my prod service. It didn't behave very well and once I started down that rabbit hole I realized I had made this more complicated than it really needed to. So I done away with the distinction between occupied relays with/out responses and also removed the limitation from the MultiRelayHelper of not supporting a map (aka non-int indexed array). I'm sorry if you had already started looking through the code, but from what I can tell after having spent a significant amount of time debugging the tests the whole thing should be pretty final now. |
Hey @msmakouz , last (functional) change from my side (hopefully). I fell into the trap of cloning an array of objects, not realizing they get shared between the instances. |
|
@L3tum Hi, thanks for the work done! Do you plan to make any more changes to the code, or can we look at your proposed implementation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/RPC/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRPC.php (1 hunks)
- tests/Goridge/TCPMultiRPCTest.php (1 hunks)
- tests/Goridge/TCPRPCTest.php (1 hunks)
Files skipped from review due to trivial changes (1)
- tests/Goridge/TCPRPCTest.php
Files skipped from review as they are similar to previous changes (2)
- src/RPC/MultiRPC.php
- tests/Goridge/MultiRPC.php
Additional comments: 1
tests/Goridge/TCPMultiRPCTest.php (1)
- 10-15: The class
TCPMultiRPCTest
correctly extendsMultiRPC
and defines constants for socket configuration. Ensure that the socket address, port, and type are appropriately configured for the test environment.
@msmakouz I've updated most of the things the AI complained about. I've looked through the missing test coverage and most of it seems to be in more "abstract" error handling. I'll see if I can add some more tests for that, but I'll do that later |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files selected for processing (7)
- src/ConnectedRelayInterface.php (1 hunks)
- src/MultiRelayHelper.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- src/SocketRelay.php (3 hunks)
- tests/Goridge/MsgPackMultiRPCTest.php (1 hunks)
- tests/Goridge/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRelayHelperTest.php (1 hunks)
Files skipped from review as they are similar to previous changes (5)
- src/MultiRelayHelper.php
- src/RPC/MultiRPC.php
- src/SocketRelay.php
- tests/Goridge/MsgPackMultiRPCTest.php
- tests/Goridge/MultiRPC.php
Hey @msmakouz ! I think I implemented all suggestions by the AI and now I'm only massaging it to re-check, but I don't expect any further issues to pop up. In case you waited for this to review the changes yourself. |
@coderabbitai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/ConnectedRelayInterface.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- src/SocketRelay.php (4 hunks)
- tests/Goridge/MultiRelayHelperTest.php (1 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/ConnectedRelayInterface.php
- src/RPC/MultiRPC.php
- src/SocketRelay.php
Hello. I think the API can be greatly simplified if we introduce a // an example
interface DeferredResult
{
/**
* Check whether the request is pending.
*/
public function isPending(): bool;
/**
* Await and return the result. Throws an exception if the call failed.s
*
* @throws GoridgeException
* @throws RPCException
* @throws ServiceException
*/
public function getResult(): mixed;
} In addition, I would make all RPC asynchronous right away, then the call to the synchronous method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/MultiRelayHelper.php (1 hunks)
- src/RPC/AbstractRPC.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- src/RPC/RPC.php (4 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/MultiRelayHelper.php
- src/RPC/AbstractRPC.php
- src/RPC/MultiRPC.php
Additional comments: 2
src/RPC/RPC.php (2)
- 13-13: The
RPC
class now extendsAbstractRPC
, aligning with object-oriented principles to leverage polymorphism and code reuse. This change likely introduces a more structured hierarchy and shared functionality across RPC-related classes.- 49-52: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [43-50]
The static
create
method provides a convenient factory pattern for instantiating theRPC
class with a relay and codec. This method enhances usability by abstracting the creation logic and allowing for easy setup with different configurations. It's a positive change that promotes ease of use and flexibility.
private readonly RelayInterface $relay, | ||
private CodecInterface $codec = new JsonCodec(), | ||
CodecInterface $codec = new JsonCodec(), | ||
) { | ||
} | ||
|
||
/** | ||
* @psalm-pure | ||
*/ | ||
public function withServicePrefix(string $service): RPCInterface | ||
{ | ||
/** @psalm-suppress ImpureVariable */ | ||
$rpc = clone $this; | ||
$rpc->service = $service; | ||
|
||
return $rpc; | ||
} | ||
|
||
/** | ||
* @psalm-pure | ||
*/ | ||
public function withCodec(CodecInterface $codec): RPCInterface | ||
{ | ||
/** @psalm-suppress ImpureVariable */ | ||
$rpc = clone $this; | ||
$rpc->codec = $codec; | ||
|
||
return $rpc; | ||
parent::__construct($codec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor's signature has been updated to include CodecInterface $codec
with a default value of new JsonCodec()
. This change enforces type safety and dependency injection for the codec used, improving modularity and testability. However, instantiating a default codec within the constructor parameters might not be the best practice for dependency injection, as it couples the RPC
class to the JsonCodec
class.
Consider injecting the codec dependency through the constructor without a default value or using a dependency injection container to manage such dependencies.
// wait for the frame confirmation | ||
$frame = $this->relay->waitFrame(); | ||
|
||
if (\count($frame->options) !== 2) { | ||
if (count($frame->options) !== 2) { | ||
throw new RPCException('Invalid RPC frame, options missing'); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [30-36]
The error handling within the call
method checks for the presence and sequence of options in the received frame. This is a good practice for ensuring the integrity of the communication protocol. However, the reference to self::$seq
suggests that there was a static property for tracking sequence numbers, which has been removed. This could potentially break the sequence validation logic.
Ensure that the sequence tracking mechanism is correctly implemented and integrated with the new class structure.
Hey @roxblnfk , sure that looks good. The question is whether we want to use something custom or use either a |
For now, there's no need to use promises or futures from third-party providers. Moreover, I can't quite imagine how we could use fibers for asynchronous requests without implementing a coroutine. So far, the best option seems to be to avoid all of this and implement our own simple abstraction. We can also expose a public method |
I've used Fibers for that in this repo. A fiber is in the end just a promise so we can use it as a promise. |
I understand your reasoning. I myself use fibers and promises in other projects a lot. Let me remind you that this package (goridge) is the lowest level of abstraction. Therefore, it's highly undesirable to introduce any extra dependencies that might cause conflicts in the future. Our task here is not to drag in some popular promises hoping they will work almost everywhere. Our goal is to provide capabilities for higher levels of abstraction (witch may add promises over). Using fibers would be nice, but only where no additional boilerplate is required. That is, in the |
In that case I would argue that an abstraction such as Here's the patch for the change [1]. As you can see there really isn't that much actual change in there, and arguably no reduction of complexity. The biggest code change are the changes to [1] Index: src/RPC/MultiRPC.php
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php
--- a/src/RPC/MultiRPC.php (revision c4b41cff3d9269ae999aedb4e206c596a99690eb)
+++ b/src/RPC/MultiRPC.php (date 1708624735013)
@@ -139,34 +139,15 @@
public function call(string $method, mixed $payload, mixed $options = null): mixed
{
- $relayIndex = $this->ensureFreeRelayAvailable();
- $relay = $this->freeRelays[$relayIndex];
-
- $relay->send($this->packFrame($method, $payload));
-
- // wait for the frame confirmation
- $frame = $this->getResponseFromRelay($relay, self::$seq, true);
-
- self::$seq++;
-
- return $this->decodeResponse($frame, $relay, $options);
+ return $this->callAsync($method, $payload)->getResult($options);
}
public function callIgnoreResponse(string $method, mixed $payload): void
{
- $relayIndex = $this->ensureFreeRelayAvailable();
- $relay = $this->freeRelays[$relayIndex];
-
- $relay->send($this->packFrame($method, $payload));
-
- $seq = self::$seq;
- self::$seq++;
- $this->occupiedRelays[$seq] = $relay;
- // Last index so no need for array_pop or stuff
- unset($this->freeRelays[$relayIndex]);
+ $this->callAsync($method, $payload)->ignore();
}
- public function callAsync(string $method, mixed $payload): int
+ public function callAsync(string $method, mixed $payload): DeferredResult
{
// Flush buffer if someone doesn't call getResponse
if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) {
@@ -188,7 +169,7 @@
// Last index so no need for array_pop or stuff
unset($this->freeRelays[$relayIndex]);
- return $seq;
+ return new DeferredResult($this, $seq);
}
public function hasResponse(int $seq): bool
@@ -206,8 +187,20 @@
return false;
}
- public function hasResponses(array $seqs): array
+ /**
+ * @param DeferredResult[] $results
+ * @return DeferredResult[]
+ */
+ public function hasResponses(array $results): array
{
+ $seqMap = [];
+ $seqs = [];
+
+ foreach ($results as $result) {
+ $seqMap[$result->id] = $result;
+ $seqs[] = $result->id;
+ }
+
$relays = [];
/** @var array<int, positive-int> $relayIndexToSeq */
$relayIndexToSeq = [];
@@ -233,7 +226,12 @@
$seqsWithResponse[] = $relayIndexToSeq[$relayIndex];
}
- return $seqsWithResponse;
+ $resultsWithResponses = [];
+ foreach ($seqsWithResponse as $seq) {
+ $resultsWithResponses[] = $seqMap[$seq];
+ }
+
+ return $resultsWithResponses;
}
public function getResponse(int $seq, mixed $options = null): mixed
@@ -257,13 +255,15 @@
return $this->decodeResponse($frame, $relay, $options);
}
- public function getResponses(array $seqs, mixed $options = null): iterable
+ public function getResponses(array $results, mixed $options = null): iterable
{
// Quick return
- if (count($seqs) === 0) {
+ if (count($results) === 0) {
return;
}
+ $seqs = array_map(static fn(DeferredResult $result) => $result->id, $results);
+
// Flip the array to use the $seqs for key indexing
$seqsKeyed = [];
|
To be honest, I would keep the original call method the same as before. Yeah, it duplicates a bit, but who cares? It has worked perfectly for 5+ years. It's under the interface, so we can change to a fiber-friendly implementation later. We also don't need DeferredResult, as you saying, it's unnecessary at this level. For now, this PR looks good to me. Can we make sure that tests pass? I would assume that 99% of new use-cases will be on |
@L3tum Added you to the |
Thank you @rustatian ! I'll close this PR then and push a branch to open a new one. Would also remove the mess I made of the code review AI 😆 @wolfy-j Sounds good to me. @msmakouz / @roxblnfk / @wolfy-j I've fixed the tests and also fixed a bug related to |
I've moved the PR to #25 :) |
Implements an "Async" RPC Interface and Implementation using multiple relays to effectively offer non-blocking IO in regards to the Roadrunner communication.
As discussed in the linked discussion, there's a (small but signifcant) delay in the RPC communication to Roadrunner that can have an impact on a service. In my case the service had a nominal response time of ~2ms, went up to 3ms with metric collection and also had a 2ms
kernel.terminate
listener to collect further metrics, so all in all was busy for ~5ms with only ~2ms spent on the actual request->response cycle.As a fix I propose an "async" implementation of the RPC interface. I don't really like calling it async, because it doesn't offer the same versatility as a proper async implementation, but it changes the control flow and thus qualifies for that name, I guess.
The interface has 3 significant new methods as well as a few supporting changes in other code:
hasResponse
andhasAnyResponse
just check if a response has been receivedResults
I've measured the impact in a testcase using the same repro used in the discussion. The results are the following:
As you can see the "good" implementation (this change) is within the normal deviation of the always-ignore implementation and offers an improvement ~10x performance. I've been using a (slightly modified) implementation of this in the aforementioned production service and could cut down the time spent collecting metrics from a total of 3ms to just 0,2ms, with a min/max of respectively 0.1ms and 0.3ms.
I've also added a change (PR following) in roadrunner-kv that uses the
callAsync
method to implement asetAsync
function. The results so far are promising. With a follow-up check on whether thekv.Set
call was successful, the async implementation is significantly faster:Full results for the Metrics calls:
Considerations
socket_select
andstream_select
. I've commented it as well as marked them with@internal
but it's definitely not ideal.The overflow response buffer currently has no upper limit and thus it's somewhat easy to get a memory leak here. I could add a check inImplemented a flush at 1000 entries (realistically the most a normal application should ever see is maybe ~50) -- I've upped this to 10_000 entries because in testing in particular Doctrine caches get hammered extremely hard and 1_000 is not enough.callAsync
to see if the response buffer is above a certain limit (1000?) and if so flush it. I'll probably do so when adding the tests.TODO
I have glanced at the tests in this repo and will try to implement a few for this change if you've given me the go-ahead with the current implementation.Added a number of testsSummary by CodeRabbit
AbstractRPC
with various method and property adjustments for streamlined operation.SocketRelay
andStreamRelay
to public, with added documentation for these changes.SleepEcho
function for testing purposes.