Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement "Async" RPC #22

Closed
wants to merge 28 commits into from
Closed

feat: Implement "Async" RPC #22

wants to merge 28 commits into from

Conversation

L3tum
Copy link
Contributor

@L3tum L3tum commented Feb 5, 2024

Q A
Bugfix?
Breaks BC?
New feature? ✔️ TODO please update "Other Features" section in CHANGELOG.md file
Issues https://github.com/orgs/roadrunner-server/discussions/1850
Docs PR spiral/docs#... TODO?

Implements an "Async" RPC Interface and Implementation using multiple relays to effectively offer non-blocking IO in regards to the Roadrunner communication.

As discussed in the linked discussion, there's a (small but signifcant) delay in the RPC communication to Roadrunner that can have an impact on a service. In my case the service had a nominal response time of ~2ms, went up to 3ms with metric collection and also had a 2ms kernel.terminate listener to collect further metrics, so all in all was busy for ~5ms with only ~2ms spent on the actual request->response cycle.

As a fix I propose an "async" implementation of the RPC interface. I don't really like calling it async, because it doesn't offer the same versatility as a proper async implementation, but it changes the control flow and thus qualifies for that name, I guess.
The interface has 3 significant new methods as well as a few supporting changes in other code:

  • callIgnoreResponse => Calls a Roadrunner method but doesn't wait for the response and ignores it if it's received. This is intended for "best-effort" areas such as Metrics collection, where it is generally accepted if some Metrics get lost on the way
  • callAsync => Calls a Roadrunner method and tracks the response rather than wait for it. The response can then be received with...
  • getResponse => Gets and if needed waits for a response from a previous call to callAsync. The request/response is tracked with the already used $seq. The implementation keeps the response in the socket buffer as long as possible until it's needed to reduce unnecessary delays
  • The supporting methods hasResponse and hasAnyResponse just check if a response has been received

Results
I've measured the impact in a testcase using the same repro used in the discussion. The results are the following:

  • Sync Metrics Call: 1.228ms
  • AlwaysIgnoreResponseRPC (from the repro) Call: 0.1366ms
  • MultiRPC Call (this change): 0.1399ms
    As you can see the "good" implementation (this change) is within the normal deviation of the always-ignore implementation and offers an improvement ~10x performance. I've been using a (slightly modified) implementation of this in the aforementioned production service and could cut down the time spent collecting metrics from a total of 3ms to just 0,2ms, with a min/max of respectively 0.1ms and 0.3ms.

I've also added a change (PR following) in roadrunner-kv that uses the callAsync method to implement a setAsync function. The results so far are promising. With a follow-up check on whether the kv.Set call was successful, the async implementation is significantly faster:

  • "SyncKVImpactCacheStartup": "56.2324ms"
  • "SyncKVImpactNewItem": "0.5328ms"
  • "SyncKVImpactExistingItemSameValue: "0.2113ms"
  • "SyncKVImpactExistingItemNewValue": "0.2006ms"
  • "AsyncKVImpactCacheStartup": "21.7821ms"
  • "AsyncKVImpactNewItem": "0.1146ms"

Full results for the Metrics calls:

{
    "Base": {
        "Max": "0.2063ms",
        "Min": "0.0954ms",
        "Avg": "0.1228ms",
        "Total": "1.228ms"
    },
    "AlwaysIgnoreResponseRPC": {
        "Max": "0.0537ms",
        "Min": "0.0069ms",
        "Avg": "0.01366ms",
        "Total": "0.1366ms"
    },
    "MultiRPC": {
        "Max": "0.0714ms",
        "Min": "0.0061ms",
        "Avg": "0.01399ms",
        "Total": "0.1399ms"
    }
}

Considerations

  • The whole implementation is "opt-in", which can also be seen in my two follow-up PRs (#1 and #2). I didn't want to force it in case someone depends on the existing implementation. It makes some of the code look a bit messy due to having to support both code paths but I think it's the better solution. I'm open to change it though if you disagree.
  • I had to "open" two previously private fields in the Relay implementations to make more effective use of socket_select and stream_select. I've commented it as well as marked them with @internal but it's definitely not ideal.
  • The overflow response buffer currently has no upper limit and thus it's somewhat easy to get a memory leak here. I could add a check in callAsync to see if the response buffer is above a certain limit (1000?) and if so flush it. I'll probably do so when adding the tests. Implemented a flush at 1000 entries (realistically the most a normal application should ever see is maybe ~50) -- I've upped this to 10_000 entries because in testing in particular Doctrine caches get hammered extremely hard and 1_000 is not enough.

TODO

  • I have glanced at the tests in this repo and will try to implement a few for this change if you've given me the go-ahead with the current implementation. Added a number of tests
  • Add a Changelog.md entry
  • Add documentation for this (?)

Summary by CodeRabbit

  • New Features
    • Introduced multi-relay functionality for enhanced message and relay management.
    • Added asynchronous RPC communication capabilities over multiple relays.
    • Improved RPC functionality with new methods and class structures for more efficient communication and coding practices.
  • Bug Fixes
    • Corrected a typo in RPCInterface method description for clarity.
  • Refactor
    • Updated the RPC class to extend from AbstractRPC with various method and property adjustments for streamlined operation.
    • Modified visibility of certain properties in SocketRelay and StreamRelay to public, with added documentation for these changes.
  • Tests
    • Added a comprehensive test suite for multiple RPC interactions, including exception handling and asynchronous calls.
    • Introduced test cases specifically for MsgPack and TCP communication in the MultiRPC context.
  • Documentation
    • Enhanced internal documentation to explain visibility changes in relay properties.
  • Chores
    • Implemented minor server-side enhancements, including a new SleepEcho function for testing purposes.

Copy link

coderabbitai bot commented Feb 5, 2024

Walkthrough

The recent updates bring a suite of enhancements focusing on asynchronous and multi-relay RPC communication. Key changes include the introduction of a MultiRPC class for managing asynchronous RPC over multiple relays efficiently, along with test expansions covering various scenarios.

Changes

File(s) Change Summary
src/RPC/AbstractRPC.php Introduces AbstractRPC abstract class implementing RPCInterface with various RPC methods.
src/RPC/MultiRPC.php
tests/Goridge/MultiRPC.php
tests/Goridge/MsgPackMultiRPCTest.php
Introduces MultiRPC class for multi-relay RPC and related test cases.
src/RPC/RPC.php Extends AbstractRPC, changes in properties and methods for RPC calls.
src/ConnectedRelayInterface.php Introduces ConnectedRelayInterface for connection management.
src/MultiRelayHelper.php Provides functionality for managing relays in a multi-relay setup.
src/SocketRelay.php Updates visibility of $socket property and implements __clone method.
tests/Goridge/MultiRelayHelperTest.php Adds PHPUnit test case for MultiRelayHelper class.
tests/Goridge/TCPMultiRPCTest.php Defines TCPMultiRPCTest class for TCP communication constants.
tests/Goridge/TCPRPCTest.php Renames class from TPCRPCTest to TCPRPCTest.

🐇🌟
In the world of bytes, where the rabbit does play,
MultiRPC dances, making RPCs its way.
Tests expanded, scenarios galore,
TCP connections now knocking at the door.
With a hop and a skip, the code evolves anew,
Bringing efficiency and joy, through and through.
🌼🐰

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share

Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit-tests for this file.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit tests for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository from git and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit tests.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • The JSON schema for the configuration file is available here.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/coderabbit-overrides.v2.json

CodeRabbit Discord Community

Join our Discord Community to get help, request features, and share feedback.

@codecov-commenter
Copy link

codecov-commenter commented Feb 5, 2024

Codecov Report

Attention: 54 lines in your changes are missing coverage. Please review.

Comparison is base (a5d6744) 70.52% compared to head (74aa106) 73.37%.
Report is 3 commits behind head on 4.x.

❗ Current head 74aa106 differs from pull request most recent head e19db31. Consider uploading reports for the commit e19db31 to get more accurate results

Files Patch % Lines
src/RPC/MultiRPC.php 77.27% 35 Missing ⚠️
src/MultiRelayHelper.php 61.70% 18 Missing ⚠️
src/RPC/AbstractRPC.php 95.45% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##                4.x      #22      +/-   ##
============================================
+ Coverage     70.52%   73.37%   +2.84%     
- Complexity      120      195      +75     
============================================
  Files             9       12       +3     
  Lines           302      507     +205     
============================================
+ Hits            213      372     +159     
- Misses           89      135      +46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 7, 2024

Hey @msmakouz , I saw @rustatian 's comment, thank you for the review :)

I've just pushed some last changes that were needed after I tried using this implementation in my prod service. It didn't behave very well and once I started down that rabbit hole I realized I had made this more complicated than it really needed to. So I done away with the distinction between occupied relays with/out responses and also removed the limitation from the MultiRelayHelper of not supporting a map (aka non-int indexed array). I'm sorry if you had already started looking through the code, but from what I can tell after having spent a significant amount of time debugging the tests the whole thing should be pretty final now.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 8, 2024

Hey @msmakouz , last (functional) change from my side (hopefully). I fell into the trap of cloning an array of objects, not realizing they get shared between the instances.
I experimented with making the arrays static instead, but it felt like a bit of a hack, especially in regards to fetching a response from the buffer of another RPC (thus running into codec issues, for example).
Overall I'm a bit disappointed Psalm didn't catch this

@L3tum
Copy link
Contributor Author

L3tum commented Feb 8, 2024

Hey @msmakouz / @rustatian , one thing I noticed now is that the number of Goroutines apparently explodes with this change. I'm not sure where or why because I'm honestly not knowledgable in Go at all. In my case I've noticed a growth from ~500 to ~2000. That's with 100 sockets per Worker with 8 Workers opened. So I guess there's a number of goroutines running per socket or something like that?

The issue is mostly during high load scenarios, cause it spawned up to 15k Goroutines until RR crashed completely.
They do get cleaned up afterwards (without a crash, I mean) so I don't think it'ss a leak.

If I understand Go correctly a socket there spawns two Goroutines (Read & Write), so at 100 sockets would be 200 Goroutines per Worker, with 8 Workers at 1600 Goroutines, so that makes sense. What I don't get is why there's 15k Goroutines when any type of load is applied
Disregard, I did an oopsie and had the MultiRPC not shared between classes

@msmakouz
Copy link
Member

@L3tum Hi, thanks for the work done! Do you plan to make any more changes to the code, or can we look at your proposed implementation?
At first glance, I see that there is insufficient unit test coverage, but I will first look at the implementation itself, if it is already ready.

@msmakouz
Copy link
Member

One thing, there's a few marks I'd like to tell it to "shut up", is there a way for that?

You can leave a comment and click the "Resolve conversation" button if you think the AI is wrong. If you don't have this button, then don't pay attention to this comment.

33

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 812da39 and 74aa106.
Files selected for processing (4)
  • src/RPC/MultiRPC.php (1 hunks)
  • tests/Goridge/MultiRPC.php (1 hunks)
  • tests/Goridge/TCPMultiRPCTest.php (1 hunks)
  • tests/Goridge/TCPRPCTest.php (1 hunks)
Files skipped from review due to trivial changes (1)
  • tests/Goridge/TCPRPCTest.php
Files skipped from review as they are similar to previous changes (2)
  • src/RPC/MultiRPC.php
  • tests/Goridge/MultiRPC.php
Additional comments: 1
tests/Goridge/TCPMultiRPCTest.php (1)
  • 10-15: The class TCPMultiRPCTest correctly extends MultiRPC and defines constants for socket configuration. Ensure that the socket address, port, and type are appropriately configured for the test environment.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 12, 2024

@msmakouz I've updated most of the things the AI complained about. I've looked through the missing test coverage and most of it seems to be in more "abstract" error handling. I'll see if I can add some more tests for that, but I'll do that later

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 74aa106 and 33b234a.
Files selected for processing (7)
  • src/ConnectedRelayInterface.php (1 hunks)
  • src/MultiRelayHelper.php (1 hunks)
  • src/RPC/MultiRPC.php (1 hunks)
  • src/SocketRelay.php (3 hunks)
  • tests/Goridge/MsgPackMultiRPCTest.php (1 hunks)
  • tests/Goridge/MultiRPC.php (1 hunks)
  • tests/Goridge/MultiRelayHelperTest.php (1 hunks)
Files skipped from review as they are similar to previous changes (5)
  • src/MultiRelayHelper.php
  • src/RPC/MultiRPC.php
  • src/SocketRelay.php
  • tests/Goridge/MsgPackMultiRPCTest.php
  • tests/Goridge/MultiRPC.php

tests/Goridge/MultiRelayHelperTest.php Show resolved Hide resolved
src/ConnectedRelayInterface.php Show resolved Hide resolved
@L3tum
Copy link
Contributor Author

L3tum commented Feb 13, 2024

Hey @msmakouz !

I think I implemented all suggestions by the AI and now I'm only massaging it to re-check, but I don't expect any further issues to pop up. In case you waited for this to review the changes yourself.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 13, 2024

@coderabbitai review

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 33b234a and c4b41cf.
Files selected for processing (4)
  • src/ConnectedRelayInterface.php (1 hunks)
  • src/RPC/MultiRPC.php (1 hunks)
  • src/SocketRelay.php (4 hunks)
  • tests/Goridge/MultiRelayHelperTest.php (1 hunks)
Files skipped from review as they are similar to previous changes (3)
  • src/ConnectedRelayInterface.php
  • src/RPC/MultiRPC.php
  • src/SocketRelay.php

tests/Goridge/MultiRelayHelperTest.php Show resolved Hide resolved
tests/Goridge/MultiRelayHelperTest.php Show resolved Hide resolved
@roxblnfk
Copy link
Member

roxblnfk commented Feb 19, 2024

Hello.

I think the API can be greatly simplified if we introduce a DeferredResult object. This object would be returned from the callAsync() method. If the user is interested in the result, they will use DeferredResult. If not, the object will be naturally deleted after the method is executed.

// an example
interface DeferredResult
{
    /**
     * Check whether the request is pending.
     */
    public function isPending(): bool;

    /**
     * Await and return the result. Throws an exception if the call failed.s
     *
     * @throws GoridgeException
     * @throws RPCException
     * @throws ServiceException
     */
    public function getResult(): mixed;
}

In addition, I would make all RPC asynchronous right away, then the call to the synchronous method call() would look like callAsync()->await().

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between c4b41cf and e19db31.
Files selected for processing (4)
  • src/MultiRelayHelper.php (1 hunks)
  • src/RPC/AbstractRPC.php (1 hunks)
  • src/RPC/MultiRPC.php (1 hunks)
  • src/RPC/RPC.php (4 hunks)
Files skipped from review as they are similar to previous changes (3)
  • src/MultiRelayHelper.php
  • src/RPC/AbstractRPC.php
  • src/RPC/MultiRPC.php
Additional comments: 2
src/RPC/RPC.php (2)
  • 13-13: The RPC class now extends AbstractRPC, aligning with object-oriented principles to leverage polymorphism and code reuse. This change likely introduces a more structured hierarchy and shared functionality across RPC-related classes.
  • 49-52: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [43-50]

The static create method provides a convenient factory pattern for instantiating the RPC class with a relay and codec. This method enhances usability by abstracting the creation logic and allowing for easy setup with different configurations. It's a positive change that promotes ease of use and flexibility.

Comment on lines 17 to +20
private readonly RelayInterface $relay,
private CodecInterface $codec = new JsonCodec(),
CodecInterface $codec = new JsonCodec(),
) {
}

/**
* @psalm-pure
*/
public function withServicePrefix(string $service): RPCInterface
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->service = $service;

return $rpc;
}

/**
* @psalm-pure
*/
public function withCodec(CodecInterface $codec): RPCInterface
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->codec = $codec;

return $rpc;
parent::__construct($codec);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor's signature has been updated to include CodecInterface $codec with a default value of new JsonCodec(). This change enforces type safety and dependency injection for the codec used, improving modularity and testability. However, instantiating a default codec within the constructor parameters might not be the best practice for dependency injection, as it couples the RPC class to the JsonCodec class.

Consider injecting the codec dependency through the constructor without a default value or using a dependency injection container to manage such dependencies.

Comment on lines 27 to 33
// wait for the frame confirmation
$frame = $this->relay->waitFrame();

if (\count($frame->options) !== 2) {
if (count($frame->options) !== 2) {
throw new RPCException('Invalid RPC frame, options missing');
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [30-36]

The error handling within the call method checks for the presence and sequence of options in the received frame. This is a good practice for ensuring the integrity of the communication protocol. However, the reference to self::$seq suggests that there was a static property for tracking sequence numbers, which has been removed. This could potentially break the sequence validation logic.

Ensure that the sequence tracking mechanism is correctly implemented and integrated with the new class structure.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 19, 2024

Hello.

I think the API can be greatly simplified if we introduce a DeferredResult object. This object would be returned from the callAsync() method. If the user is interested in the result, they will use DeferredResult. If not, the object will be naturally deleted after the method is executed.

// an example
interface DeferredResult
{
    /**
     * Check whether the request is pending.
     */
    public function isPending(): bool;

    /**
     * Await and return the result. Throws an exception if the call failed.s
     *
     * @throws GoridgeException
     * @throws RPCException
     * @throws ServiceException
     */
    public function getResult(): mixed;
}

In addition, I would make all RPC asynchronous right away, then the call to the synchronous method call() would look like callAsync()->await().

Hey @roxblnfk , sure that looks good. The question is whether we want to use something custom or use either a Promise library so it can be adapted into other frameworks, or even Fiber directly. We'd have to put some logic into its destructor though to manage the state in MultiRPC, since it would need to delete its entry in seqToRelayMap at least. It would stretch out some of the code and simplify other parts so overall it'd be a good change I think.

@roxblnfk
Copy link
Member

For now, there's no need to use promises or futures from third-party providers. Moreover, I can't quite imagine how we could use fibers for asynchronous requests without implementing a coroutine. So far, the best option seems to be to avoid all of this and implement our own simple abstraction.

We can also expose a public method process() so that it can be called from a coroutine at a higher level of abstraction. However, to make everything work without a coroutine, the process() will have to be called with each call/callAsync invocation.

@rustatian rustatian marked this pull request as draft February 20, 2024 19:50
@L3tum
Copy link
Contributor Author

L3tum commented Feb 22, 2024

I've used Fibers for that in this repo. A fiber is in the end just a promise so we can use it as a promise.
Regarding using a promise lib, the idea behind that is to enable interop with all the frameworks that roadrunner supports. We'd still write our own Promise, but the shared interface would enable it to work with others

@roxblnfk
Copy link
Member

I've used Fibers for that in this repo. A fiber is in the end just a promise so we can use it as a promise. Regarding using a promise lib, the idea behind that is to enable interop with all the frameworks that roadrunner supports. We'd still write our own Promise, but the shared interface would enable it to work with others

I understand your reasoning. I myself use fibers and promises in other projects a lot.

Let me remind you that this package (goridge) is the lowest level of abstraction. Therefore, it's highly undesirable to introduce any extra dependencies that might cause conflicts in the future.

Our task here is not to drag in some popular promises hoping they will work almost everywhere. Our goal is to provide capabilities for higher levels of abstraction (witch may add promises over).

Using fibers would be nice, but only where no additional boilerplate is required. That is, in the call() method (which would wait for the execution of callAsync() and may call Fiber::suspend() between handler calls). Fibers inside callAsync() would make this method synchronous in the context in which it is called. Therefore, we need to proceed carefully and flexibly. And fibers can always be added if necessary.

@L3tum
Copy link
Contributor Author

L3tum commented Feb 22, 2024

In that case I would argue that an abstraction such as DeferredResult (or w/e) just adds overhead when it isn't needed. In my tests the repeated object instantiation alone can make up around 20% of performance regression. And adding an object pool just isn't really practical for that type of object in PHP, since we can't access the reference count.

Here's the patch for the change [1]. As you can see there really isn't that much actual change in there, and arguably no reduction of complexity. The biggest code change are the changes to hasResponses and getResponses, which would be kind of awkward with this API. The simplification of the three call* methods is nice, but there's a reason why they are separate, since unifying them also adds a few unnecessary array operations, which are pretty bad for performance.

[1]

Index: src/RPC/MultiRPC.php
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php
--- a/src/RPC/MultiRPC.php	(revision c4b41cff3d9269ae999aedb4e206c596a99690eb)
+++ b/src/RPC/MultiRPC.php	(date 1708624735013)
@@ -139,34 +139,15 @@
 
     public function call(string $method, mixed $payload, mixed $options = null): mixed
     {
-        $relayIndex = $this->ensureFreeRelayAvailable();
-        $relay = $this->freeRelays[$relayIndex];
-
-        $relay->send($this->packFrame($method, $payload));
-
-        // wait for the frame confirmation
-        $frame = $this->getResponseFromRelay($relay, self::$seq, true);
-
-        self::$seq++;
-
-        return $this->decodeResponse($frame, $relay, $options);
+        return $this->callAsync($method, $payload)->getResult($options);
     }
 
     public function callIgnoreResponse(string $method, mixed $payload): void
     {
-        $relayIndex = $this->ensureFreeRelayAvailable();
-        $relay = $this->freeRelays[$relayIndex];
-
-        $relay->send($this->packFrame($method, $payload));
-
-        $seq = self::$seq;
-        self::$seq++;
-        $this->occupiedRelays[$seq] = $relay;
-        // Last index so no need for array_pop or stuff
-        unset($this->freeRelays[$relayIndex]);
+        $this->callAsync($method, $payload)->ignore();
     }
 
-    public function callAsync(string $method, mixed $payload): int
+    public function callAsync(string $method, mixed $payload): DeferredResult
     {
         // Flush buffer if someone doesn't call getResponse
         if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) {
@@ -188,7 +169,7 @@
         // Last index so no need for array_pop or stuff
         unset($this->freeRelays[$relayIndex]);
 
-        return $seq;
+        return new DeferredResult($this, $seq);
     }
 
     public function hasResponse(int $seq): bool
@@ -206,8 +187,20 @@
         return false;
     }
 
-    public function hasResponses(array $seqs): array
+    /**
+     * @param DeferredResult[] $results
+     * @return DeferredResult[]
+     */
+    public function hasResponses(array $results): array
     {
+        $seqMap = [];
+        $seqs = [];
+
+        foreach ($results as $result) {
+            $seqMap[$result->id] = $result;
+            $seqs[] = $result->id;
+        }
+
         $relays = [];
         /** @var array<int, positive-int> $relayIndexToSeq */
         $relayIndexToSeq = [];
@@ -233,7 +226,12 @@
             $seqsWithResponse[] = $relayIndexToSeq[$relayIndex];
         }
 
-        return $seqsWithResponse;
+        $resultsWithResponses = [];
+        foreach ($seqsWithResponse as $seq) {
+            $resultsWithResponses[] = $seqMap[$seq];
+        }
+
+        return $resultsWithResponses;
     }
 
     public function getResponse(int $seq, mixed $options = null): mixed
@@ -257,13 +255,15 @@
         return $this->decodeResponse($frame, $relay, $options);
     }
 
-    public function getResponses(array $seqs, mixed $options = null): iterable
+    public function getResponses(array $results, mixed $options = null): iterable
     {
         // Quick return
-        if (count($seqs) === 0) {
+        if (count($results) === 0) {
             return;
         }
 
+        $seqs = array_map(static fn(DeferredResult $result) => $result->id, $results);
+
         // Flip the array to use the $seqs for key indexing
         $seqsKeyed = [];
 

@wolfy-j
Copy link
Contributor

wolfy-j commented Feb 22, 2024

To be honest, I would keep the original call method the same as before. Yeah, it duplicates a bit, but who cares? It has worked perfectly for 5+ years. It's under the interface, so we can change to a fiber-friendly implementation later. We also don't need DeferredResult, as you saying, it's unnecessary at this level.

For now, this PR looks good to me. Can we make sure that tests pass?

I would assume that 99% of new use-cases will be on callIgnoreResponse => push and forget. Low-level getResponses, and has responses will be used while integrating into async frameworks.

@rustatian
Copy link
Contributor

@L3tum Added you to the contributors team. You won't need to create a fork to push a PR to this repo. Please don't hesitate to ping me if you want to contribute more, so, I'll try you make your life with forks easier 😄

@L3tum
Copy link
Contributor Author

L3tum commented Feb 23, 2024

Thank you @rustatian !

I'll close this PR then and push a branch to open a new one. Would also remove the mess I made of the code review AI 😆

@wolfy-j Sounds good to me.

@msmakouz / @roxblnfk / @wolfy-j I've fixed the tests and also fixed a bug related to cloneing the MultiRPC. In particular the issue stems from the usage of it in the Cache class in roadrunner-php/kv, but also others that (would) make use of it. While the cloning was handled alright, each clone would increase the number of relays created. In my prod service for example I've used 7 different cache services. The result was that each worker created 400 relays, at 12 workers per container so 4800 relays/sockets, with 2 goroutines per socket so 9600 goroutines. That exploded the memory usage to an insane ~1,5GB entirely in socket buffers.
Because of that I had to change a lot of tests, but the implementation itself just moved to static fields so I hope it doesn't add too much burden on you to review that. I'll tag you on the new PR once that is ready :)

@L3tum
Copy link
Contributor Author

L3tum commented Feb 23, 2024

I've moved the PR to #25 :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants