Skip to content

Commit

Permalink
Merge pull request #23 from roadrunner-php/feature-proto-payloads
Browse files Browse the repository at this point in the history
Proto payloads between RR and PHP Worker
  • Loading branch information
msmakouz committed Apr 1, 2024
2 parents 0bd1ef8 + 8b35fd9 commit 2963562
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 29 deletions.
10 changes: 6 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
"ext-json": "*",
"psr/http-factory": "^1.0.1",
"psr/http-message": "^1.0.1 || ^2.0",
"spiral/roadrunner": "^2023.3",
"spiral/roadrunner-worker": "^3.1.0"
"spiral/roadrunner": "^2023.3 || ^2024.1",
"spiral/roadrunner-worker": "^3.5",
"roadrunner-php/roadrunner-api-dto": "^1.6",
"symfony/polyfill-php83": "^1.29"
},
"require-dev": {
"buggregator/trap": "^1.0",
"jetbrains/phpstorm-attributes": "^1.0",
"nyholm/psr7": "^1.3",
"phpunit/phpunit": "^10.0",
Expand All @@ -72,7 +73,8 @@
"analyze": "psalm"
},
"suggest": {
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools"
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools",
"ext-protobuf": "Provides Protocol Buffers support. Without it, performance will be lower."
},
"config": {
"sort-packages": true
Expand Down
133 changes: 115 additions & 18 deletions src/HttpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
namespace Spiral\RoadRunner\Http;

use Generator;
use RoadRunner\HTTP\DTO\V1\HeaderValue;
use RoadRunner\HTTP\DTO\V1\Request as RequestProto;
use RoadRunner\HTTP\DTO\V1\Response;
use Spiral\Goridge\Frame;
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Message\Command\StreamStop;
use Spiral\RoadRunner\Payload;
Expand Down Expand Up @@ -34,6 +38,8 @@
*/
class HttpWorker implements HttpWorkerInterface
{
private static ?int $codec = null;

public function __construct(
private readonly WorkerInterface $worker,
) {
Expand All @@ -56,13 +62,25 @@ public function waitRequest(): ?Request
return null;
}

if (static::$codec === null) {
static::$codec = \json_validate($payload->header) ? Frame::CODEC_JSON : Frame::CODEC_PROTO;
}

if (static::$codec === Frame::CODEC_PROTO) {
$message = new RequestProto();
$message->mergeFromString($payload->header);

return $this->requestFromProto($payload->body, $message);
}

/** @var RequestContext $context */
$context = \json_decode($payload->header, true, 512, \JSON_THROW_ON_ERROR);

return $this->createRequest($payload->body, $context);
return $this->arrayToRequest($payload->body, $context);
}

/**
* @param array<array-key, array<array-key, string>> $headers
* @throws \JsonException
*/
public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void
Expand All @@ -76,21 +94,15 @@ public function respond(int $status, string|Generator $body = '', array $headers
return;
}

$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$this->worker->respond(new Payload($body, $head, $endOfStream));
/** @psalm-suppress TooManyArguments */
$this->worker->respond($this->createRespondPayload($status, $body, $headers, $endOfStream), static::$codec);
}

/**
* @param array<array-key, array<array-key, string>> $headers
*/
private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void
{
$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$worker = $this->worker instanceof StreamWorkerInterface
? $this->worker->withStreamMode()
: $this->worker;
Expand All @@ -103,7 +115,11 @@ private function respondStream(int $status, Generator $body, array $headers = []
// We don't need to send an empty frame if the stream is not ended
return;
}
$worker->respond(new Payload($content, $head, $endOfStream));
/** @psalm-suppress TooManyArguments */
$worker->respond(
$this->createRespondPayload($status, $content, $headers, $endOfStream),
static::$codec
);
break;
}

Expand All @@ -117,9 +133,11 @@ private function respondStream(int $status, Generator $body, array $headers = []
return;
}

// Send a chunk of data
$worker->respond(new Payload($content, $head, false));
$head = null;
/**
* Send a chunk of data
* @psalm-suppress TooManyArguments
*/
$worker->respond($this->createRespondPayload($status, $content, $headers, false), static::$codec);

try {
$body->next();
Expand All @@ -134,7 +152,7 @@ private function respondStream(int $status, Generator $body, array $headers = []
/**
* @param RequestContext $context
*/
private function createRequest(string $body, array $context): Request
private function arrayToRequest(string $body, array $context): Request
{
\parse_str($context['rawQuery'], $query);
return new Request(
Expand All @@ -154,6 +172,37 @@ private function createRequest(string $body, array $context): Request
);
}

private function requestFromProto(string $body, RequestProto $message): Request
{
/** @var UploadedFilesList $uploads */
$uploads = \json_decode($message->getUploads(), true) ?? [];
$headers = $this->headerValueToArray($message->getHeader());

\parse_str($message->getRawQuery(), $query);
/** @psalm-suppress ArgumentTypeCoercion, MixedArgumentTypeCoercion */
return new Request(
remoteAddr: $message->getRemoteAddr(),
protocol: $message->getProtocol(),
method: $message->getMethod(),
uri: $message->getUri(),
headers: $this->filterHeaders($headers),
cookies: \array_map(
static fn(array $values) => \implode(',', $values),
$this->headerValueToArray($message->getCookies()),
),
uploads: $uploads,
attributes: [
Request::PARSED_BODY_ATTRIBUTE_NAME => $message->getParsed(),
] + \array_map(
static fn(array $values) => \array_shift($values),
$this->headerValueToArray($message->getAttributes()),
),
query: $query,
body: $message->getParsed() && empty($body) ? '{}' : $body,
parsed: $message->getParsed(),
);
}

/**
* Remove all non-string and empty-string keys
*
Expand All @@ -164,7 +213,7 @@ private function filterHeaders(array $headers): array
{
foreach ($headers as $key => $_) {
if (!\is_string($key) || $key === '') {
// ignore invalid header names or values (otherwise, the worker will be crashed)
// ignore invalid header names or values (otherwise, the worker might be crashed)
// @see: <https://git.io/JzjgJ>
unset($headers[$key]);
}
Expand All @@ -173,4 +222,52 @@ private function filterHeaders(array $headers): array
/** @var HeadersList $headers */
return $headers;
}

/**
* @param \Traversable<non-empty-string, HeaderValue> $message
*/
private function headerValueToArray(\Traversable $message): array
{
$result = [];
/**
* @var non-empty-string $key
* @var HeaderValue $value
*/
foreach ($message as $key => $value) {
$result[$key] = \iterator_to_array($value->getValue());
}

return $result;
}

/**
* @param array<array-key, array<array-key, string>> $headers
* @return array<non-empty-string, HeaderValue>
*/
private function arrayToHeaderValue(array $headers = []): array
{
$result = [];
/**
* @var non-empty-string $key
* @var array<array-key, string> $value
*/
foreach ($headers as $key => $value) {
$result[$key] = new HeaderValue(['value' => $value]);
}

return $result;
}

/**
* @param array<array-key, array<array-key, string>> $headers
*/
private function createRespondPayload(int $status, string $body, array $headers = [], bool $eos = true): Payload
{
$head = static::$codec === Frame::CODEC_PROTO
? (new Response(['status' => $status, 'headers' => $this->arrayToHeaderValue($headers)]))
->serializeToString()
: \json_encode(['status' => $status, 'headers' => $headers ?: (object)[]], \JSON_THROW_ON_ERROR);

return new Payload(body: $body, header: $head, eos: $eos);
}
}
5 changes: 3 additions & 2 deletions src/HttpWorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public function waitRequest(): ?Request;
* If the body is a generator, then each yielded value will be sent as a separated stream chunk.
* Returned value will be sent as a last stream package.
* Note: Stream response is supported by RoadRunner since version 2023.3
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
* and each value MUST be an array of strings for that header.
* @param HeadersList|array<array-key, array<array-key, string>> $headers $headers An associative array of the
* message's headers. Each key MUST be a header name, and each value MUST be an array of strings for
* that header.
*/
public function respond(int $status, string|Generator $body, array $headers = []): void;
}
Loading

0 comments on commit 2963562

Please sign in to comment.