Skip to content

Commit

Permalink
Support keyed response for amp
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 27, 2023
1 parent ab7edb4 commit 49ac713
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 51 deletions.
42 changes: 42 additions & 0 deletions src/Concurrency/AmpTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

namespace Fansipan\Peak\Concurrency;

/**
* @template T
*/
final class AmpTask
{
/**
* @var T
*/
private mixed $value = null;

/**
* @param \Closure(): T $task
*/
public function __construct(
private readonly string|int $key,
private readonly \Closure $task
) {
}

public function key(): string|int
{
return $this->key;
}

public function value(): mixed
{
return $this->value;
}

public function __invoke(): self
{
$this->value = ($this->task)();

return $this;
}
}
21 changes: 18 additions & 3 deletions src/Concurrency/AmpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,26 @@ public function __construct(private readonly int $limit = 10)

public function run(iterable $tasks): array
{
$promises = Pipeline::fromIterable($tasks)
$promises = Pipeline::fromIterable(static function () use ($tasks): \Generator {
foreach ($tasks as $key => $task) {
yield new AmpTask($key, $task);
}
})
->concurrent($this->limit)
->unordered()
->map(static fn (\Closure $task) => Amp\async($task));
->map(static fn (AmpTask $task) => Amp\async(\Closure::fromCallable($task)));

return Future\awaitAll($promises)[1] ?? [];
$results = [];

foreach (Future::iterate($promises) as $promise) {
try {
/** @var AmpTask $t */
$t = $promise->await();
$results[$t->key()] = $t->value();
} catch (\Throwable) {
}
}

return $results;
}
}
7 changes: 7 additions & 0 deletions tests/AmpPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ public function test_amp_pool_using_guzzle(): void
{
$this->performConnectorTests($this->createConnector($this->createGuzzleClient()));
}

public function test_amp_pool_keyed_response(): void
{
$this->performKeyedResponseTests(
$this->mockSymfonyClient(new AmpDeferred())
);
}
}
45 changes: 24 additions & 21 deletions tests/DelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,20 @@

use Fansipan\Mock\MockResponse;
use Fansipan\Peak\Client\AsyncClientInterface;
use Fansipan\Peak\Client\GuzzleClient;
use Fansipan\Peak\Client\Delayable;
use Fansipan\Peak\Client\ReactClient;
use Fansipan\Peak\Client\SymfonyClient;
use Fansipan\Peak\Concurrency\Deferrable;
use Fansipan\Peak\Concurrency\PslDeferred;
use Fansipan\Peak\Concurrency\ReactDeferred;
use Fansipan\Peak\PoolFactory;
use GuzzleHttp\Client;
use GuzzleHttp\Handler\MockHandler;
use GuzzleHttp\HandlerStack;
use Http\Discovery\Psr17FactoryDiscovery;
use PHPUnit\Framework\TestCase;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\HttpClient\MockHttpClient;

final class DelayTest extends TestCase
{
use TestTrait;

private RequestFactoryInterface $requestFactory;

protected function setUp(): void
Expand All @@ -33,6 +29,26 @@ protected function setUp(): void
$this->requestFactory = Psr17FactoryDiscovery::findRequestFactory();
}

public function test_amp_delay(): void
{
$request = $this->requestFactory->createRequest('GET', 'http://localhost');

$client = $this->mockGuzzleClient(new PslDeferred(), [
MockResponse::create(''),
]);

$reflection = new \ReflectionProperty($client, 'delay');
$reflection->setAccessible(true);

$client->delay(1000);

$this->assertSame(1000, $reflection->getValue($client));

$client->sendRequest($request);

$this->assertSame(0, $reflection->getValue($client));
}

public function test_psl_delay(): void
{
$request = $this->requestFactory->createRequest('GET', 'http://localhost');
Expand Down Expand Up @@ -97,7 +113,7 @@ private function runPoolDelayTests(AsyncClientInterface $client, int $totalReque
{
$requests = function (int $total) use ($delay) {
for ($i = 0; $i < $total; $i++) {
yield function (AsyncClientInterface $client) use ($delay): ResponseInterface {
yield function (AsyncClientInterface&Delayable $client) use ($delay): ResponseInterface {
$client->delay($delay);

return $client->sendRequest($this->requestFactory->createRequest('GET', 'http://localhost'));
Expand All @@ -108,17 +124,4 @@ private function runPoolDelayTests(AsyncClientInterface $client, int $totalReque
$pool = PoolFactory::createFromClient($client);
$pool->send($requests($totalRequests));
}

private function mockGuzzleClient(Deferrable $defer, ?array $response = null): GuzzleClient
{
$handler = new MockHandler($response);
$handlerStack = HandlerStack::create($handler);

return new GuzzleClient($defer, new Client(['handler' => $handlerStack]));
}

private function mockSymfonyClient(Deferrable $defer, mixed $response = null): SymfonyClient
{
return new SymfonyClient($defer, new MockHttpClient($response));
}
}
11 changes: 9 additions & 2 deletions tests/PslPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ private function createGuzzleClient(): GuzzleClient
return new GuzzleClient(new PslDeferred());
}

public function test_react_pool_using_symfony_http_client(): void
public function test_psl_pool_using_symfony_http_client(): void
{
$this->performConnectorTests($this->createConnector($this->createSymfonyClient()));
}

public function test_react_pool_using_guzzle(): void
public function test_psl_pool_using_guzzle(): void
{
$this->performConnectorTests($this->createConnector($this->createGuzzleClient()));
}

public function test_psl_pool_keyed_response(): void
{
$this->performKeyedResponseTests(
$this->mockSymfonyClient(new PslDeferred())
);
}
}
7 changes: 7 additions & 0 deletions tests/ReactPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ public function test_react_pool_using_guzzle(): void
{
$this->performConnectorTests($this->createConnector($this->createGuzzleClient()));
}

public function test_react_pool_keyed_response(): void
{
$this->performKeyedResponseTests(
$this->mockSymfonyClient(new ReactDeferred())
);
}
}
21 changes: 20 additions & 1 deletion tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Fansipan\Peak\Client\AsyncClientInterface;
use Fansipan\Peak\ClientPool;
use Fansipan\Peak\ConnectorPool;
use Fansipan\Peak\PoolFactory;
use Http\Discovery\Psr17FactoryDiscovery;
use Jenky\Atlas\Contracts\ConnectorInterface;
use Jenky\Atlas\GenericConnector;
use Jenky\Atlas\Middleware\Interceptor;
Expand All @@ -17,7 +19,7 @@

abstract class TestCase extends BaseTestCase
{
use TestRequestTrait;
use TestTrait;

protected function createConnector(?ClientInterface $client = null): ConnectorInterface
{
Expand Down Expand Up @@ -54,4 +56,21 @@ protected function performConnectorTests(ConnectorInterface $connector): void
$this->assertInstanceOf(Response::class, $responses[0]);
$this->assertSame('bar', $responses[0]->header('X-Foo'));
}

protected function performKeyedResponseTests(AsyncClientInterface $client): void
{
$requestFactory = Psr17FactoryDiscovery::findRequestFactory();

$responses = PoolFactory::createFromClient($client)
->send([
'foo' => $requestFactory->createRequest('GET', 'http://localhost/foo'),
'bar' => $requestFactory->createRequest('GET', 'http://localhost/bar'),
]);

$this->assertArrayHasKey('foo', $responses);
$this->assertArrayHasKey('bar', $responses);

$this->assertSame(200, $responses['foo']->getStatusCode());
$this->assertSame(200, $responses['bar']->getStatusCode());
}
}
24 changes: 0 additions & 24 deletions tests/TestRequestTrait.php

This file was deleted.

44 changes: 44 additions & 0 deletions tests/TestTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Fansipan\Peak\Tests;

use Fansipan\Peak\Client\GuzzleClient;
use Fansipan\Peak\Client\SymfonyClient;
use Fansipan\Peak\Concurrency\Deferrable;
use GuzzleHttp\Client;
use GuzzleHttp\Handler\MockHandler;
use GuzzleHttp\HandlerStack;
use Jenky\Atlas\Util;
use Symfony\Component\HttpClient\MockHttpClient;

trait TestTrait
{
protected function createRequests(int $total): iterable
{
for ($i = 1; $i <= $total; $i++) {
yield new AkamaiTileRequest($i);
}
}

protected function createPsrRequests(int $total): iterable
{
for ($i = 1; $i <= $total; $i++) {
yield Util::request(new AkamaiTileRequest($i));
}
}

protected function mockGuzzleClient(Deferrable $defer, ?array $response = null): GuzzleClient
{
$handler = new MockHandler($response);
$handlerStack = HandlerStack::create($handler);

return new GuzzleClient($defer, new Client(['handler' => $handlerStack]));
}

protected function mockSymfonyClient(Deferrable $defer, mixed $response = null): SymfonyClient
{
return new SymfonyClient($defer, new MockHttpClient($response));
}
}

0 comments on commit 49ac713

Please sign in to comment.