Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support RR[>=2023.3] streamed responses #130

Open
wants to merge 4 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ baldinof_road_runner:
kernel_reboot:
strategy: on_exception
allowed_exceptions:
- Spiral\RoadRunner\Http\Exception\StreamStoppedException
- Symfony\Component\HttpKernel\Exception\HttpExceptionInterface
- Symfony\Component\Serializer\Exception\ExceptionInterface
- App\Exception\YourDomainException
Expand Down Expand Up @@ -112,6 +113,11 @@ baldinof_road_runner:
max_jobs_dispersion: 0.2
```

## StreamedResponse

Replace your `StreamedResponse`s with `Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse`. The only difference is
that the `$callback` should be a `\Generator`. If you don't, the Symfony's Streamed Response will be loaded completely to memory
before it's sent!

## Events

Expand Down
131 changes: 131 additions & 0 deletions src/Helpers/StreamedJsonResponseHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?php

namespace Baldinof\RoadRunnerBundle\Helpers;

use Symfony\Component\HttpFoundation\StreamedJsonResponse;

// Basically copy of Symfony\Component\HttpFoundation\StreamedJsonResponse
// but adds `yield`ing, instead of `echo`s
class StreamedJsonResponseHelper
{
private static ?\Closure $streamedJsonResponseParameterExtractor = null;

public static function toGenerator(StreamedJsonResponse $response): \Generator

Check failure on line 13 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Parameter $response of method Baldinof\RoadRunnerBundle\Helpers\StreamedJsonResponseHelper::toGenerator() has invalid type Symfony\Component\HttpFoundation\StreamedJsonResponse.

Check failure on line 13 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Parameter $response of method Baldinof\RoadRunnerBundle\Helpers\StreamedJsonResponseHelper::toGenerator() has invalid type Symfony\Component\HttpFoundation\StreamedJsonResponse.
{
$placeholder = "__symfony_json__";
[$data, $encodingOptions] = self::getStreamedJsonResponseParameterExtractor()($response);

return self::stream($data, $encodingOptions, $placeholder);
}

private static function stream(iterable $data, int $encodingOptions, string $placeholder): \Generator
{
$jsonEncodingOptions = \JSON_THROW_ON_ERROR | $encodingOptions;
$keyEncodingOptions = $jsonEncodingOptions & ~\JSON_NUMERIC_CHECK;

return self::streamData($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder);
}

private static function streamData(mixed $data, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
if (\is_array($data)) {
foreach (self::streamArray($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $item) {
yield $item;
}

return;
}

if (is_iterable($data) && !$data instanceof \JsonSerializable) {
foreach (self::streamIterable($data, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $item) {
yield $item;
}

return;
}

yield json_encode($data, $jsonEncodingOptions);
}

private static function streamArray(array $data, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
$generators = [];

array_walk_recursive($data, function (&$item, $key) use (&$generators, $placeholder) {
if ($placeholder === $key) {
// if the placeholder is already in the structure it should be replaced with a new one that explode
// works like expected for the structure
$generators[] = $key;
}

// generators should be used but for better DX all kind of Traversable and objects are supported
if (\is_object($item)) {
$generators[] = $item;
$item = $placeholder;
} elseif ($placeholder === $item) {
// if the placeholder is already in the structure it should be replaced with a new one that explode
// works like expected for the structure
$generators[] = $item;
}
});

$jsonParts = explode('"' . $placeholder . '"', json_encode($data, $jsonEncodingOptions));

Check failure on line 72 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Parameter #2 $string of function explode expects string, string|false given.

Check failure on line 72 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (stable dependencies)

Parameter #2 $string of function explode expects string, string|false given.

Check failure on line 72 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Parameter #2 $string of function explode expects string, string|false given.

Check failure on line 72 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (stable dependencies)

Parameter #2 $string of function explode expects string, string|false given.

foreach ($generators as $index => $generator) {
// send first and between parts of the structure
yield $jsonParts[$index];

foreach (self::streamData($generator, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) {
yield $child;
}
}

// send last part of the structure
yield $jsonParts[array_key_last($jsonParts)];
}

private static function streamIterable(iterable $iterable, int $jsonEncodingOptions, int $keyEncodingOptions, string $placeholder): \Generator
{
$isFirstItem = true;
$startTag = '[';

foreach ($iterable as $key => $item) {
if ($isFirstItem) {
$isFirstItem = false;
// depending on the first elements key the generator is detected as a list or map
// we can not check for a whole list or map because that would hurt the performance
// of the streamed response which is the main goal of this response class
if (0 !== $key) {
$startTag = '{';
}

yield $startTag;
} else {
// if not first element of the generic, a separator is required between the elements
yield ',';
}

if ('{' === $startTag) {
yield json_encode((string)$key, $keyEncodingOptions) . ':';
}

foreach (self::streamData($item, $jsonEncodingOptions, $keyEncodingOptions, $placeholder) as $child) {
yield $child;
}
}

if ($isFirstItem) { // indicates that the generator was empty
yield '[';
}

yield '[' === $startTag ? ']' : '}';
}

private static function getStreamedJsonResponseParameterExtractor(): \Closure
{
return self::$streamedJsonResponseParameterExtractor ?? (self::$streamedJsonResponseParameterExtractor = \Closure::bind(static fn(StreamedJsonResponse $binaryFileResponse) => [

Check failure on line 126 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Parameter $binaryFileResponse of anonymous function has invalid type Symfony\Component\HttpFoundation\StreamedJsonResponse.

Check failure on line 126 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Parameter $binaryFileResponse of anonymous function has invalid type Symfony\Component\HttpFoundation\StreamedJsonResponse.
$binaryFileResponse->data,

Check failure on line 127 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Access to property $data on an unknown class Symfony\Component\HttpFoundation\StreamedJsonResponse.

Check failure on line 127 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Access to property $data on an unknown class Symfony\Component\HttpFoundation\StreamedJsonResponse.
$binaryFileResponse->encodingOptions,

Check failure on line 128 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Access to property $encodingOptions on an unknown class Symfony\Component\HttpFoundation\StreamedJsonResponse.

Check failure on line 128 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Access to property $encodingOptions on an unknown class Symfony\Component\HttpFoundation\StreamedJsonResponse.
], null, StreamedJsonResponse::class));

Check failure on line 129 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Class Symfony\Component\HttpFoundation\StreamedJsonResponse not found.

Check failure on line 129 in src/Helpers/StreamedJsonResponseHelper.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Class Symfony\Component\HttpFoundation\StreamedJsonResponse not found.
}
}
90 changes: 90 additions & 0 deletions src/Http/Response/StreamedResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

namespace Baldinof\RoadRunnerBundle\Http\Response;

use Symfony\Component\HttpFoundation\Response;

/**
* Basically a copy of Symfony's StreamedResponse,
* but the callback needs to be a Generator
*/
class StreamedResponse extends Response
{
protected ?\Closure $callback = null;
protected bool $streamed = false;

private bool $headersSent = false;

public function __construct(\Generator $callback = null, int $status = 200, array $headers = [])
{
parent::__construct(null, $status, $headers);

if (null !== $callback) {
$this->setCallback($callback);
}
}

public function setCallback(\Generator $callback): static
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should still accept a callable, then in the sendContent we can test if the callback returned a Generator or nothing. And if so consume and echo the generator

{
$this->callback = $callback(...);

Check failure on line 29 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Creating callable from Generator but it's not a callable.

Check failure on line 29 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (stable dependencies)

Creating callable from Generator but it's not a callable.

Check failure on line 29 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Creating callable from Generator but it's not a callable.

Check failure on line 29 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (stable dependencies)

Creating callable from Generator but it's not a callable.

return $this;
}

public function getCallback(): ?\Generator
{
if (!isset($this->callback)) {
return null;
}

return ($this->callback)(...);

Check failure on line 40 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Method Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse::getCallback() should return Generator|null but returns Closure.

Check failure on line 40 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (stable dependencies)

Method Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse::getCallback() should return Generator|null but returns Closure.

Check failure on line 40 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Method Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse::getCallback() should return Generator|null but returns Closure.

Check failure on line 40 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (stable dependencies)

Method Baldinof\RoadRunnerBundle\Http\Response\StreamedResponse::getCallback() should return Generator|null but returns Closure.
}

public function sendHeaders(int $statusCode = null): static
{
if ($this->headersSent) {
return $this;
}

if ($statusCode < 100 || $statusCode >= 200) {
$this->headersSent = true;
}

return parent::sendHeaders($statusCode);

Check failure on line 53 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.1) (lowest dependencies)

Method Symfony\Component\HttpFoundation\Response::sendHeaders() invoked with 1 parameter, 0 required.

Check failure on line 53 in src/Http/Response/StreamedResponse.php

View workflow job for this annotation

GitHub Actions / Static analysis (PHP 8.2) (lowest dependencies)

Method Symfony\Component\HttpFoundation\Response::sendHeaders() invoked with 1 parameter, 0 required.
}

public function sendContent(): static
{
if ($this->streamed) {
return $this;
}

$this->streamed = true;

if (!isset($this->callback)) {
throw new \LogicException('The Response callback must be set.');
}

foreach (($this->callback)() as $value) {
echo $value;
}

return $this;
}

public function setContent(?string $content): static
{
if (null !== $content) {
throw new \LogicException('The content cannot be set on a StreamedResponse instance.');
}

$this->streamed = true;

return $this;
}

public function getContent(): string|false
{
return false;
}
}
Loading
Loading