From 189dcf773db4a20794001fe82bd930fb79ea504b Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Sat, 16 Sep 2023 23:19:20 +0400 Subject: [PATCH 1/4] Implement Ping-Pong strategy for worker with blocking relay --- composer.json | 2 +- src/Message/Command/Pong.php | 15 ++++++++ src/PayloadFactory.php | 5 +++ src/StreamWorkerInterface.php | 10 +++++ src/Worker.php | 70 ++++++++++++++++++++++++++++++++--- 5 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 src/Message/Command/Pong.php create mode 100644 src/StreamWorkerInterface.php diff --git a/composer.json b/composer.json index 6e37dff..735e7df 100644 --- a/composer.json +++ b/composer.json @@ -41,7 +41,7 @@ "ext-json": "*", "ext-sockets": "*", "psr/log": "^2.0|^3.0", - "spiral/goridge": "^4.0", + "spiral/goridge": "dev-ping-pong as 4.1.0", "spiral/roadrunner": "^2023.1", "composer-runtime-api": "^2.0" }, diff --git a/src/Message/Command/Pong.php b/src/Message/Command/Pong.php new file mode 100644 index 0000000..1731a08 --- /dev/null +++ b/src/Message/Command/Pong.php @@ -0,0 +1,15 @@ +byte10 & Frame::BYTE10_PONG) !== 0) { + return new Pong($payload); + } + return new Payload( \substr($payload, $frame->options[0]), \substr($payload, 0, $frame->options[0]), diff --git a/src/StreamWorkerInterface.php b/src/StreamWorkerInterface.php new file mode 100644 index 0000000..d213d54 --- /dev/null +++ b/src/StreamWorkerInterface.php @@ -0,0 +1,10 @@ + */ -class Worker implements WorkerInterface +class Worker implements StreamWorkerInterface { private const JSON_ENCODE_FLAGS = \JSON_THROW_ON_ERROR | \JSON_PRESERVE_ZERO_FRACTION; /** @var array */ private array $payloads = []; + private bool $streamMode = false; + /** @var int<0, max> Count of frames sent in stream mode */ + private int $framesSent = 0; + private bool $shouldPing = false; + private bool $waitingPong = false; + public function __construct( private readonly RelayInterface $relay, bool $interceptSideEffects = true, @@ -63,18 +71,33 @@ public function waitPayload(): ?Payload case $payload::class === Payload::class: return $payload; case $payload instanceof WorkerStop: + $this->waitingPong = false; return null; case $payload::class === GetProcessId::class: $this->sendProcessId(); - // no break + continue 2; + case $payload instanceof Pong: + $this->waitingPong = false; + continue 2; case $payload instanceof SkipMessage: continue 2; } } } + public function withStreamMode(): static + { + $clone = clone $this; + $clone->streamMode = true; + $clone->framesSent = 0; + $clone->shouldPing = false; + $clone->waitingPong = false; + return $clone; + } + public function respond(Payload $payload): void { + $this->streamMode and ++$this->framesSent; $this->send($payload->body, $payload->header, $payload->eos); } @@ -133,7 +156,7 @@ private function findPayload(string $class = null): ?int } $payload = $this->pullPayload(); - if ($payload === null) { + if ($payload === null || $payload instanceof Pong) { break; } @@ -151,12 +174,28 @@ private function findPayload(string $class = null): ?int */ private function pullPayload(): ?Payload { + if (!$this->waitingPong && $this->relay instanceof BlockedRelayInterface) { + if (!$this->streamMode) { + return null; + } + + $this->haveToPing(); + return null; + } + if (!$this->relay->hasFrame()) { return null; } $frame = $this->relay->waitFrame(); - return PayloadFactory::fromFrame($frame); + $payload = PayloadFactory::fromFrame($frame); + + if ($payload instanceof Pong) { + $this->waitingPong = false; + return null; + } + + return $payload; } private function send(string $body = '', string $header = '', bool $eos = true): void @@ -164,7 +203,11 @@ private function send(string $body = '', string $header = '', bool $eos = true): $frame = new Frame($header . $body, [\strlen($header)]); if (!$eos) { - $frame->byte10 = Frame::BYTE10_STREAM; + $frame->byte10 |= Frame::BYTE10_STREAM; + } + + if ($this->shouldPing) { + $frame->byte10 |= Frame::BYTE10_PING; } $this->sendFrame($frame); @@ -173,6 +216,12 @@ private function send(string $body = '', string $header = '', bool $eos = true): private function sendFrame(Frame $frame): void { try { + if ($this->streamMode && ($frame->byte10 & Frame::BYTE10_STREAM) && $this->shouldPing) { + $frame->byte10 |= Frame::BYTE10_PING; + $this->shouldPing = false; + $this->waitingPong = true; + } + $this->relay->send($frame); } catch (GoridgeException $e) { throw new TransportException($e->getMessage(), $e->getCode(), $e); @@ -221,4 +270,15 @@ private function sendProcessId(): static $this->sendFrame($frame); return $this; } + + private function haveToPing(): void + { + if ($this->waitingPong || $this->framesSent === 0) { + return; + } + + if ($this->framesSent % 5 === 0) { + $this->shouldPing = true; + } + } } From b7f40b5dcb7d333fe6b0c2884a3c6cc1a5ee69f1 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 3 Oct 2023 19:16:43 +0400 Subject: [PATCH 2/4] Rename the `BlockedRelayInterface` to `BlockingRelayInterface` --- src/Worker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Worker.php b/src/Worker.php index ffd2ea4..6718349 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -5,7 +5,7 @@ namespace Spiral\RoadRunner; use Psr\Log\LoggerInterface; -use Spiral\Goridge\BlockedRelayInterface; +use Spiral\Goridge\BlockingRelayInterface; use Spiral\Goridge\Exception\GoridgeException; use Spiral\Goridge\Exception\TransportException; use Spiral\Goridge\Frame; @@ -174,7 +174,7 @@ private function findPayload(string $class = null): ?int */ private function pullPayload(): ?Payload { - if (!$this->waitingPong && $this->relay instanceof BlockedRelayInterface) { + if (!$this->waitingPong && $this->relay instanceof BlockingRelayInterface) { if (!$this->streamMode) { return null; } From d24b5bd19e145624fd6f92a10456bdfd2f8a525e Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 3 Oct 2023 22:50:02 +0400 Subject: [PATCH 3/4] Set the requirement spiral/goridge to ^4.1.0 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 735e7df..c143093 100644 --- a/composer.json +++ b/composer.json @@ -41,7 +41,7 @@ "ext-json": "*", "ext-sockets": "*", "psr/log": "^2.0|^3.0", - "spiral/goridge": "dev-ping-pong as 4.1.0", + "spiral/goridge": "^4.1.0", "spiral/roadrunner": "^2023.1", "composer-runtime-api": "^2.0" }, From 075ddc6e4dc1a17dd78aae68a27b902a74656933 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 3 Oct 2023 22:54:53 +0400 Subject: [PATCH 4/4] Fix psalm issues --- src/Worker.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Worker.php b/src/Worker.php index 6718349..c4f0b78 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -257,8 +257,11 @@ public static function createFromEnvironment( bool $interceptSideEffects = true, LoggerInterface $logger = new Logger(), ): self { + $address = $env->getRelayAddress(); + \assert($address !== '', 'Relay address must be specified in environment'); + return new self( - relay: Relay::create($env->getRelayAddress()), + relay: Relay::create($address), interceptSideEffects: $interceptSideEffects, logger: $logger );