Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto payloads between RR and PHP Worker #23

Merged
merged 11 commits into from
Apr 1, 2024
8 changes: 5 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
"psr/http-factory": "^1.0.1",
"psr/http-message": "^1.0.1 || ^2.0",
"spiral/roadrunner": "^2023.3",
msmakouz marked this conversation as resolved.
Show resolved Hide resolved
"spiral/roadrunner-worker": "^3.1.0"
"spiral/roadrunner-worker": "^3.1",
"roadrunner-php/roadrunner-api-dto": "^1.4",
"symfony/polyfill-php83": "^1.29"
},
"require-dev": {
"buggregator/trap": "^1.0",
"jetbrains/phpstorm-attributes": "^1.0",
"nyholm/psr7": "^1.3",
"phpunit/phpunit": "^10.0",
Expand All @@ -72,7 +73,8 @@
"analyze": "psalm"
},
"suggest": {
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools"
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools",
"ext-protobuf": "Provides Protocol Buffers support"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be beneficial to include a recommendation about this extension in the documentation. It's important to emphasize that it impacts performance. Without this extension, JSON operates faster than Proto.

},
"config": {
"sort-packages": true
Expand Down
137 changes: 120 additions & 17 deletions src/HttpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
namespace Spiral\RoadRunner\Http;

use Generator;
use RoadRunner\HTTP\DTO\V1BETA1\FileUpload;
use RoadRunner\HTTP\DTO\V1BETA1\HeaderValue;
use RoadRunner\HTTP\DTO\V1BETA1\Request as RequestProto;
use RoadRunner\HTTP\DTO\V1BETA1\Response;
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Message\Command\StreamStop;
use Spiral\RoadRunner\Payload;
Expand Down Expand Up @@ -34,6 +38,8 @@
*/
class HttpWorker implements HttpWorkerInterface
{
private static ?bool $isProto = null;

public function __construct(
private readonly WorkerInterface $worker,
) {
Expand All @@ -56,13 +62,21 @@
return null;
}

if ($this->isProtoPayload($payload)) {
$message = new RequestProto();
$message->mergeFromString($payload->header);

Check warning on line 67 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L65-L67

Added lines #L65 - L67 were not covered by tests

return $this->requestFromProto($payload->body, $message);

Check warning on line 69 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L69

Added line #L69 was not covered by tests
}

/** @var RequestContext $context */
$context = \json_decode($payload->header, true, 512, \JSON_THROW_ON_ERROR);

return $this->createRequest($payload->body, $context);
return $this->arrayToRequest($payload->body, $context);

Check warning on line 75 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L75

Added line #L75 was not covered by tests
}

/**
* @param array<array-key, array<array-key, string>> $headers
* @throws \JsonException
*/
public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void
Expand All @@ -76,21 +90,14 @@
return;
}

$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$this->worker->respond(new Payload($body, $head, $endOfStream));
$this->worker->respond($this->createRespondPayload($status, $body, $headers, $endOfStream));

Check warning on line 93 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L93

Added line #L93 was not covered by tests
}

/**
* @param array<array-key, array<array-key, string>> $headers
*/
private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void
{
$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$worker = $this->worker instanceof StreamWorkerInterface
? $this->worker->withStreamMode()
: $this->worker;
Expand All @@ -103,7 +110,7 @@
// We don't need to send an empty frame if the stream is not ended
return;
}
$worker->respond(new Payload($content, $head, $endOfStream));
$worker->respond($this->createRespondPayload($status, $content, $headers, $endOfStream));
break;
}

Expand All @@ -118,8 +125,7 @@
}

// Send a chunk of data
$worker->respond(new Payload($content, $head, false));
$head = null;
$worker->respond($this->createRespondPayload($status, $content, $headers, false));

try {
$body->next();
Expand All @@ -134,7 +140,7 @@
/**
* @param RequestContext $context
*/
private function createRequest(string $body, array $context): Request
private function arrayToRequest(string $body, array $context): Request

Check warning on line 143 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L143

Added line #L143 was not covered by tests
{
\parse_str($context['rawQuery'], $query);
return new Request(
Expand All @@ -154,6 +160,46 @@
);
}

private function requestFromProto(string $body, RequestProto $message): Request

Check warning on line 163 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L163

Added line #L163 was not covered by tests
{
$headers = $this->headerValueToArray($message->getHeader());
$uploadedFiles = [];

Check warning on line 166 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L165-L166

Added lines #L165 - L166 were not covered by tests

/**
* @var FileUpload $uploads
*/
foreach ($message->getUploads()?->getList() ?? [] as $uploads) {
$uploadedFiles[$uploads->getName()] = [
'name' => $uploads->getName(),
'mime' => $uploads->getMime(),
'size' => (int) $uploads->getSize(),
'error' => (int) $uploads->getError(),
'tmpName' => $uploads->getTempFilename(),
];

Check warning on line 178 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L171-L178

Added lines #L171 - L178 were not covered by tests
}

\parse_str($message->getRawQuery(), $query);

Check warning on line 181 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L181

Added line #L181 was not covered by tests
/** @psalm-suppress ArgumentTypeCoercion, MixedArgumentTypeCoercion */
return new Request(
remoteAddr: $message->getRemoteAddr(),
protocol: $message->getProtocol(),
method: $message->getMethod(),
uri: $message->getUri(),
headers: $this->filterHeaders($headers),
cookies: \array_map(
static fn(array $values) => \implode(',', $values),
$this->headerValueToArray($message->getCookies()),
),
uploads: $uploadedFiles,
attributes: [
Request::PARSED_BODY_ATTRIBUTE_NAME => $message->getParsed(),
] + \iterator_to_array($message->getAttributes()),
query: $query,
body: $body,
parsed: $message->getParsed(),

Check warning on line 199 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L183-L199

Added lines #L183 - L199 were not covered by tests
);
}

/**
* Remove all non-string and empty-string keys
*
Expand All @@ -164,7 +210,7 @@
{
foreach ($headers as $key => $_) {
if (!\is_string($key) || $key === '') {
// ignore invalid header names or values (otherwise, the worker will be crashed)
// ignore invalid header names or values (otherwise, the worker might be crashed)
// @see: <https://git.io/JzjgJ>
unset($headers[$key]);
}
Expand All @@ -173,4 +219,61 @@
/** @var HeadersList $headers */
return $headers;
}

/**
* @param \Traversable<non-empty-string, HeaderValue> $message
*/
private function headerValueToArray(\Traversable $message): array

Check warning on line 226 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L226

Added line #L226 was not covered by tests
{
$result = [];

Check warning on line 228 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L228

Added line #L228 was not covered by tests
/**
* @var non-empty-string $key
* @var HeaderValue $value
*/
foreach ($message as $key => $value) {
$result[$key] = \iterator_to_array($value->getValue());

Check warning on line 234 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L233-L234

Added lines #L233 - L234 were not covered by tests
}

return $result;

Check warning on line 237 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L237

Added line #L237 was not covered by tests
}

/**
* @param array<array-key, array<array-key, string>> $headers
* @return array<non-empty-string, HeaderValue>
*/
private function arrayToHeaderValue(array $headers = []): array

Check warning on line 244 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L244

Added line #L244 was not covered by tests
{
$result = [];

Check warning on line 246 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L246

Added line #L246 was not covered by tests
/**
* @var non-empty-string $key
* @var array<array-key, string> $value
*/
foreach ($headers as $key => $value) {
$result[$key] = new HeaderValue(['value' => $value]);

Check warning on line 252 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L251-L252

Added lines #L251 - L252 were not covered by tests
}

return $result;

Check warning on line 255 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L255

Added line #L255 was not covered by tests
}

/**
* @param array<array-key, array<array-key, string>> $headers
*/
private function createRespondPayload(int $status, string $body, array $headers = [], bool $eos = true): Payload
{
$head = static::$isProto
? (new Response(['status' => $status, 'headers' => $this->arrayToHeaderValue($headers)]))
->serializeToString()

Check warning on line 265 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L264-L265

Added lines #L264 - L265 were not covered by tests
: \json_encode(['status' => $status, 'headers' => $headers ?: (object)[]], \JSON_THROW_ON_ERROR);

return new Payload(body: $body, header: $head, eos: $eos);
}

private function isProtoPayload(Payload $payload): bool

Check warning on line 271 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L271

Added line #L271 was not covered by tests
{
if (static::$isProto === null) {
static::$isProto = !json_validate($payload->header);

Check warning on line 274 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L273-L274

Added lines #L273 - L274 were not covered by tests
}

return static::$isProto;

Check warning on line 277 in src/HttpWorker.php

View check run for this annotation

Codecov / codecov/patch

src/HttpWorker.php#L277

Added line #L277 was not covered by tests
}
}
5 changes: 3 additions & 2 deletions src/HttpWorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public function waitRequest(): ?Request;
* If the body is a generator, then each yielded value will be sent as a separated stream chunk.
* Returned value will be sent as a last stream package.
* Note: Stream response is supported by RoadRunner since version 2023.3
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
* and each value MUST be an array of strings for that header.
* @param HeadersList|array<array-key, array<array-key, string>> $headers $headers An associative array of the
* message's headers. Each key MUST be a header name, and each value MUST be an array of strings for
* that header.
*/
public function respond(int $status, string|Generator $body, array $headers = []): void;
}
Loading