Skip to content

Commit

Permalink
Fix sequence counting (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
shellphy authored Jun 26, 2024
1 parent d5303e6 commit ab4b100
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
flags: php
fail_ci_if_error: false
fail_ci_if_error: false
23 changes: 23 additions & 0 deletions examples/swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

use Spiral\Goridge;
use Swoole\Coroutine as Co;
use Swoole\Coroutine\Barrier;

require 'vendor/autoload.php';

Co::set(['hook_flags'=> SWOOLE_HOOK_ALL]);
Co\Run(function () {
$barrier = Barrier::make();
for ($i = 0; $i < 3; $i++) {
go(function () use ($barrier) {
$rpc = new Goridge\RPC\RPC(
Goridge\Relay::create('tcp://127.0.0.1:6001')
);
echo $rpc->call('App.Hi', 'Antony');
});
}
Barrier::wait($barrier);
});
17 changes: 6 additions & 11 deletions src/RPC/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ class RPC implements RPCInterface
*/
private ?string $service = null;

/**
* @var positive-int
*/
private static int $seq = 1;

/**
* @param RelayInterface $relay
* @param CodecInterface|null $codec
Expand Down Expand Up @@ -76,7 +71,9 @@ public function withCodec(CodecInterface $codec): RPCInterface
*/
public function call(string $method, $payload, $options = null)
{
$this->relay->send($this->packFrame($method, $payload));
$seq = $this->relay->getNextSeq();

$this->relay->send($this->packFrame($method, $payload, $seq));

// wait for the frame confirmation
$frame = $this->relay->waitFrame();
Expand All @@ -85,12 +82,10 @@ public function call(string $method, $payload, $options = null)
throw new RPCException('Invalid RPC frame, options missing');
}

if ($frame->options[0] !== self::$seq) {
if ($frame->options[0] !== $seq) {
throw new RPCException('Invalid RPC frame, sequence mismatch');
}

self::$seq++;

return $this->decodeResponse($frame, $options);
}

Expand Down Expand Up @@ -163,13 +158,13 @@ private function decodeResponse(Frame $frame, $options = null)
* @param mixed $payload
* @return Frame
*/
private function packFrame(string $method, $payload): Frame
private function packFrame(string $method, $payload, int $seq): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . \ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, \strlen($method)], $this->codec->getIndex());
return new Frame($body, [$seq, \strlen($method)], $this->codec->getIndex());
}
}
13 changes: 13 additions & 0 deletions src/Relay.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ abstract class Relay implements RelayInterface
public const PIPES = 'pipes';
protected const CONNECTION_EXP = '/(?P<protocol>[^:\/]+):\/\/(?P<arg1>[^:]+)(:(?P<arg2>[^:]+))?/';

/**
* @var int
*/
private int $seq = 1;

/**
* Create relay using string address.
*
Expand Down Expand Up @@ -93,4 +98,12 @@ private static function openOut(string $output)

return $resource;
}

/**
* @return int
*/
public function getNextSeq(): int
{
return $this->seq++;
}
}
5 changes: 5 additions & 0 deletions src/RelayInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public function waitFrame(): Frame;
* @param Frame $frame
*/
public function send(Frame $frame): void;

/**
* @return int
*/
public function getNextSeq(): int;
}

0 comments on commit ab4b100

Please sign in to comment.