Skip to content

Commit

Permalink
Use deferred to be full async in retry (#101)
Browse files Browse the repository at this point in the history
Use deferred to be full async in retry
  • Loading branch information
joelwurtz authored and dbu committed Jun 22, 2018
1 parent f6901d0 commit 949ee8a
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Changed

- AddPathPlugin no longer add prefix multiple times if a request is restarted - it now only adds the prefix if that request chain has not yet passed through the AddPathPlugin
- RetryPlugin no longer wait for retried requests and use a deferred promise instead

### Fixed

Expand Down
17 changes: 12 additions & 5 deletions spec/Plugin/RetryPluginSpec.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ function it_returns_response(RequestInterface $request, ResponseInterface $respo
}
};

$this->handleRequest($request, $next, function () {})->shouldReturnAnInstanceOf('Http\Client\Promise\HttpFulfilledPromise');
$promise = $this->handleRequest($request, $next, function () {});
$promise->shouldReturnAnInstanceOf('Http\Client\Common\Deferred');
$promise->wait()->shouldReturn($response);
}

function it_throws_exception_on_multiple_exceptions(RequestInterface $request)
Expand All @@ -53,7 +55,7 @@ function it_throws_exception_on_multiple_exceptions(RequestInterface $request)
};

$promise = $this->handleRequest($request, $next, function () {});
$promise->shouldReturnAnInstanceOf('Http\Client\Promise\HttpRejectedPromise');
$promise->shouldReturnAnInstanceOf('Http\Client\Common\Deferred');
$promise->shouldThrow($exception2)->duringWait();
}

Expand All @@ -76,7 +78,7 @@ function it_returns_response_on_second_try(RequestInterface $request, ResponseIn
};

$promise = $this->handleRequest($request, $next, function () {});
$promise->shouldReturnAnInstanceOf('Http\Client\Promise\HttpFulfilledPromise');
$promise->shouldReturnAnInstanceOf('Http\Client\Common\Deferred');
$promise->wait()->shouldReturn($response);
}

Expand All @@ -98,8 +100,13 @@ function it_does_not_keep_history_of_old_failure(RequestInterface $request, Resp
}
};

$this->handleRequest($request, $next, function () {})->shouldReturnAnInstanceOf('Http\Client\Promise\HttpFulfilledPromise');
$this->handleRequest($request, $next, function () {})->shouldReturnAnInstanceOf('Http\Client\Promise\HttpFulfilledPromise');
$promise = $this->handleRequest($request, $next, function () {});
$promise->shouldReturnAnInstanceOf('Http\Client\Common\Deferred');
$promise->wait()->shouldReturn($response);

$promise = $this->handleRequest($request, $next, function () {});
$promise->shouldReturnAnInstanceOf('Http\Client\Common\Deferred');
$promise->wait()->shouldReturn($response);
}

function it_has_an_exponential_default_delay(RequestInterface $request, Exception\HttpException $exception)
Expand Down
131 changes: 131 additions & 0 deletions src/Deferred.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?php

namespace Http\Client\Common;

use Http\Client\Exception;
use Http\Promise\Promise;
use Psr\Http\Message\ResponseInterface;

/**
* A deferred allow to return a promise which has not been resolved yet.
*/
class Deferred implements Promise
{
private $value;

private $failure;

private $state;

private $waitCallback;

private $onFulfilledCallbacks;

private $onRejectedCallbacks;

public function __construct(callable $waitCallback)
{
$this->waitCallback = $waitCallback;
$this->state = Promise::PENDING;
$this->onFulfilledCallbacks = [];
$this->onRejectedCallbacks = [];
}

/**
* {@inheritdoc}
*/
public function then(callable $onFulfilled = null, callable $onRejected = null)
{
$deferred = new self($this->waitCallback);

$this->onFulfilledCallbacks[] = function (ResponseInterface $response) use ($onFulfilled, $deferred) {
try {
if (null !== $onFulfilled) {
$response = $onFulfilled($response);
}
$deferred->resolve($response);
} catch (Exception $exception) {
$deferred->reject($exception);
}
};

$this->onRejectedCallbacks[] = function (Exception $exception) use ($onRejected, $deferred) {
try {
if (null !== $onRejected) {
$response = $onRejected($exception);
$deferred->resolve($response);

return;
}
$deferred->reject($exception);
} catch (Exception $newException) {
$deferred->reject($newException);
}
};

return $deferred;
}

/**
* {@inheritdoc}
*/
public function getState()
{
return $this->state;
}

/**
* Resolve this deferred with a Response.
*/
public function resolve(ResponseInterface $response)
{
if (self::PENDING !== $this->state) {
return;
}

$this->value = $response;
$this->state = self::FULFILLED;

foreach ($this->onFulfilledCallbacks as $onFulfilledCallback) {
$onFulfilledCallback($response);
}
}

/**
* Reject this deferred with an Exception.
*/
public function reject(Exception $exception)
{
if (self::PENDING !== $this->state) {
return;
}

$this->failure = $exception;
$this->state = self::REJECTED;

foreach ($this->onRejectedCallbacks as $onRejectedCallback) {
$onRejectedCallback($exception);
}
}

/**
* {@inheritdoc}
*/
public function wait($unwrap = true)
{
if (self::PENDING === $this->state) {
$callback = $this->waitCallback;
$callback();
}

if (!$unwrap) {
return;
}

if (self::FULFILLED === $this->state) {
return $this->value;
}

throw $this->failure;
}
}
27 changes: 22 additions & 5 deletions src/Plugin/RetryPlugin.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Http\Client\Common\Plugin;

use Http\Client\Common\Deferred;
use Http\Client\Common\Plugin;
use Http\Client\Exception;
use Psr\Http\Message\RequestInterface;
Expand Down Expand Up @@ -76,20 +77,31 @@ public function handleRequest(RequestInterface $request, callable $next, callabl
{
$chainIdentifier = spl_object_hash((object) $first);

return $next($request)->then(function (ResponseInterface $response) use ($request, $chainIdentifier) {
$promise = $next($request);
$deferred = new Deferred(function () use ($promise) {
$promise->wait(false);
});

$onFulfilled = function (ResponseInterface $response) use ($chainIdentifier, $deferred) {
if (array_key_exists($chainIdentifier, $this->retryStorage)) {
unset($this->retryStorage[$chainIdentifier]);
}

$deferred->resolve($response);

return $response;
}, function (Exception $exception) use ($request, $next, $first, $chainIdentifier) {
};

$onRejected = function (Exception $exception) use ($request, $next, $onFulfilled, &$onRejected, $chainIdentifier, $deferred) {
if (!array_key_exists($chainIdentifier, $this->retryStorage)) {
$this->retryStorage[$chainIdentifier] = 0;
}

if ($this->retryStorage[$chainIdentifier] >= $this->retry) {
unset($this->retryStorage[$chainIdentifier]);

$deferred->reject($exception);

throw $exception;
}

Expand All @@ -102,10 +114,15 @@ public function handleRequest(RequestInterface $request, callable $next, callabl

// Retry in synchrone
++$this->retryStorage[$chainIdentifier];
$promise = $this->handleRequest($request, $next, $first);

return $promise->wait();
});
$next($request)->then($onFulfilled, $onRejected);

throw $exception;
};

$promise->then($onFulfilled, $onRejected);

return $deferred;
}

/**
Expand Down

0 comments on commit 949ee8a

Please sign in to comment.