Skip to content

Commit ffcd1e4

Browse files
committed
Updated to RxPHP v2
1 parent 1399128 commit ffcd1e4

File tree

10 files changed

+59
-62
lines changed

10 files changed

+59
-62
lines changed

composer.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
}
3030
},
3131
"require": {
32-
"voryx/event-loop": "^0.2.0",
33-
"reactivex/rxphp": "^1.0.0",
32+
"PHP": "^7.0",
33+
"voryx/event-loop": "^2.0.0",
34+
"reactivex/rxphp": "2.x-dev",
3435
"react/http-client": "^0.4.8"
3536
}
3637
}

examples/http/multiple_gets.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
});
1313
});
1414

15-
$images->subscribeCallback(
15+
$images->subscribe(
1616
function ($data) {
1717
echo "Got Image: ", array_keys($data)[0], PHP_EOL;
1818
},

examples/http/simple_get.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
$source = \Rx\React\Http::get('https://www.google.com/');
66

7-
$source->subscribeCallback(
7+
$source->subscribe(
88
function ($data) {
99
echo $data, PHP_EOL;
1010
},

examples/http/simple_post_json.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
$source = \Rx\React\Http::post('https://www.example.com/', $postData, $headers);
99

10-
$source->subscribeCallback(
10+
$source->subscribe(
1111
function ($data) {
1212
echo $data, PHP_EOL;
1313
},

examples/http/stream_body.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
$start = time();
77
$size = 0;
88

9-
$source->subscribeCallback(
9+
$source->subscribe(
1010
function ($data) use (&$size) {
1111
$size += strlen($data);
1212
echo "\033[1A", 'Downloaded size: ', number_format($size / 1024 / 1024, 2, '.', ''), 'MB', PHP_EOL;

examples/http/stream_twitter.php

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,14 @@ function generateHeader($method, $url, $params = null)
3232
->streamResults()
3333
->share();
3434

35-
$connected = $source->take(1)->doOnNext(function () {
35+
$connected = $source->take(1)->do(function () {
3636
echo 'Connected to twitter, listening in on stream:', PHP_EOL;
3737
});
3838

39+
/** @var \Rx\Observable $allTweets */
3940
$allTweets = $connected
4041
->merge($source)
41-
->lift(function () {
42-
return new \Rx\Extra\Operator\CutOperator(PHP_EOL);
43-
})
42+
->cut(PHP_EOL)
4443
->filter(function ($tweet) {
4544
return strlen(trim($tweet)) > 0;
4645
})
@@ -53,9 +52,9 @@ function generateHeader($method, $url, $params = null)
5352
return is_object($tweet);
5453
})
5554
->filter(function ($tweet) {
56-
return trim($tweet->text) == 'exit();';
55+
return trim($tweet->text) === 'exit();';
5756
})
58-
->doOnNext(function ($twitter) {
57+
->do(function ($twitter) {
5958
echo 'exit(); found, stopping...', PHP_EOL;
6059
});
6160

@@ -78,7 +77,7 @@ function generateHeader($method, $url, $params = null)
7877
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/measurement/{$id}/");
7978
})->map(function ($data) {
8079
return json_decode($data);
81-
})->subscribeCallback(
80+
})->subscribe(
8281
function ($json) {
8382
echo 'Measurement #', $json->msm_id, ' "', $json->description, '" had ', $json->participant_count, ' nodes involved', PHP_EOL;
8483
},
@@ -99,7 +98,7 @@ function () {
9998
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/probe/{$id}/");
10099
})->map(function ($data) {
101100
return json_decode($data);
102-
})->subscribeCallback(
101+
})->subscribe(
103102
function ($json) {
104103
echo 'Probe #', $json->id, ' connected since ' . date('r', $json->status_since), PHP_EOL;
105104
},

src/Http.php

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
namespace Rx\React;
44

55
use Psr\Http\Message\RequestInterface;
6-
use Rx\Observable;
76

87
class Http
98
{
10-
119
public static function request(RequestInterface $request)
1210
{
13-
1411
$method = $request->getMethod();
1512
$url = $request->getUri();
1613
$body = $request->getBody()->getContents();
@@ -22,31 +19,31 @@ public static function request(RequestInterface $request)
2219

2320
public static function get($url, array $headers = [], $protocolVersion = '1.0')
2421
{
25-
return new HttpObservable("GET", $url, null, $headers, $protocolVersion);
22+
return new HttpObservable('GET', $url, null, $headers, $protocolVersion);
2623
}
2724

2825
public static function post($url, $body = null, array $headers = [], $protocolVersion = '1.0')
2926
{
30-
return new HttpObservable("POST", $url, $body, $headers, $protocolVersion);
27+
return new HttpObservable('POST', $url, $body, $headers, $protocolVersion);
3128
}
3229

3330
public static function put($url, $body = null, array $headers = [], $protocolVersion = '1.0')
3431
{
35-
return new HttpObservable("PUT", $url, $body, $headers, $protocolVersion);
32+
return new HttpObservable('PUT', $url, $body, $headers, $protocolVersion);
3633
}
3734

3835
public static function delete($url, array $headers = [], $protocolVersion = '1.0')
3936
{
40-
return new HttpObservable("DELETE", $url, null, $headers, $protocolVersion);
37+
return new HttpObservable('DELETE', $url, null, $headers, $protocolVersion);
4138
}
4239

4340
public static function patch($url, $body = null, array $headers = [], $protocolVersion = '1.0')
4441
{
45-
return new HttpObservable("PATCH", $url, $body, $headers, $protocolVersion);
42+
return new HttpObservable('PATCH', $url, $body, $headers, $protocolVersion);
4643
}
4744

4845
public static function head($url, array $headers = [], $protocolVersion = '1.0')
4946
{
50-
return new HttpObservable("HEAD", $url, null, $headers, $protocolVersion);
47+
return new HttpObservable('HEAD', $url, null, $headers, $protocolVersion);
5148
}
5249
}

src/HttpObservable.php

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,75 +5,73 @@
55
use React\Dns\Resolver\Factory;
66
use React\HttpClient\Response;
77
use Rx\Disposable\CallbackDisposable;
8+
use Rx\DisposableInterface;
89
use Rx\Observable;
910
use Rx\ObserverInterface;
10-
use Rx\Scheduler\ImmediateScheduler;
11+
use Rx\Scheduler;
1112
use Rx\SchedulerInterface;
13+
use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop;
1214

1315
class HttpObservable extends Observable
1416
{
15-
16-
/** @var string */
1717
private $method;
1818

19-
/** @var string */
2019
private $url;
2120

22-
/** @var string */
2321
private $body;
2422

25-
/** @var array */
2623
private $headers;
2724

28-
/** @var string */
2925
private $protocolVersion;
3026

31-
/** @var boolean */
3227
private $bufferResults;
3328

34-
/** @var boolean */
3529
private $includeResponse;
3630

31+
private $scheduler;
32+
3733
/** @var \React\HttpClient\Client */
3834
private $client;
3935

40-
public function __construct($method, $url, $body = null, array $headers = [], $protocolVersion = '1.0', $bufferResults = true, $includeResponse = false)
41-
{
36+
public function __construct(
37+
string $method,
38+
string $url,
39+
string $body = null,
40+
array $headers = [],
41+
string $protocolVersion = '1.0',
42+
bool $bufferResults = true,
43+
bool $includeResponse = false,
44+
SchedulerInterface $scheduler = null
45+
) {
4246
$this->method = $method;
4347
$this->url = $url;
4448
$this->body = $body;
4549
$this->headers = $headers;
4650
$this->protocolVersion = $protocolVersion;
4751
$this->bufferResults = $bufferResults;
4852
$this->includeResponse = $includeResponse;
53+
$this->scheduler = $scheduler ?: Scheduler::getDefault();
4954

50-
$loop = \EventLoop\getLoop();
55+
$loop = new AsyncInteropLoop();
5156
$dnsResolverFactory = new Factory();
5257
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
5358
$factory = new \React\HttpClient\Factory();
5459
$this->client = $factory->create($loop, $dnsResolver);
55-
5660
}
5761

58-
/**
59-
* @param ObserverInterface $observer
60-
* @param SchedulerInterface|null $scheduler
61-
* @return \Rx\Disposable\CompositeDisposable|\Rx\DisposableInterface
62-
*/
63-
public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null)
62+
protected function _subscribe(ObserverInterface $observer): DisposableInterface
6463
{
65-
66-
$scheduler = $scheduler ?: new ImmediateScheduler();
67-
68-
$buffer = '';
69-
$request = $this->client->request($this->method, $this->url, $this->headers, $this->protocolVersion);
64+
$scheduler = $this->scheduler;
65+
$buffer = '';
66+
$request = $this->client->request($this->method, $this->url, $this->headers, $this->protocolVersion);
7067

7168
$request->on('response', function (Response $response) use (&$buffer, $observer, $request, $scheduler) {
7269
$response->on('data', function ($data, Response $response) use (&$buffer, $observer, $request, $scheduler) {
7370

7471
try {
7572
//Http Errors
76-
if ($response->getCode() < 200 || $response->getCode() >= 400) {
73+
$code = $response->getCode();
74+
if ($code < 200 || $code >= 400) {
7775
$error = new HttpResponseException($request, $response, $response->getReasonPhrase(), $response->getCode());
7876
$observer->onError($error);
7977
return;

tests/Functional/Observable/HttpObservableTest.php

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class HttpObservableTest extends TestCase
1717
*/
1818
public function http_with_buffer()
1919
{
20-
$testData1 = str_repeat("1", 1000);
21-
$testData2 = str_repeat("1", 1000);
22-
$testData3 = str_repeat("1", 1000);
20+
$testData1 = str_repeat("1", 69536);
21+
$testData2 = str_repeat("1", 69536);
22+
$testData3 = str_repeat("1", 69536);
2323

2424
$error = false;
2525

@@ -60,7 +60,7 @@ function () use (&$complete) {
6060
*/
6161
public function http_without_buffer()
6262
{
63-
$testData = str_repeat("1", 1000); //1k, so it does not use the buffer
63+
$testData = str_repeat("1", 69536); //1k, so it does not use the buffer
6464
$error = false;
6565

6666
$method = "GET";
@@ -97,9 +97,9 @@ function () use (&$complete) {
9797
*/
9898
public function http_with_stream()
9999
{
100-
$testData1 = str_repeat("1", 1000);
101-
$testData2 = str_repeat("1", 1000);
102-
$testData3 = str_repeat("1", 1000);
100+
$testData1 = str_repeat("1", 69536);
101+
$testData2 = str_repeat("1", 69536);
102+
$testData3 = str_repeat("1", 69536);
103103

104104
$error = false;
105105
$result = false;
@@ -147,7 +147,7 @@ function () use (&$complete) {
147147
*/
148148
public function http_with_error()
149149
{
150-
$testData = str_repeat("1", 1000); //1k, so it does not use the buffer
150+
$testData = str_repeat("1", 69536); //1k, so it does not use the buffer
151151
$error = false;
152152
$complete = false;
153153

@@ -185,7 +185,7 @@ function () use (&$complete) {
185185
*/
186186
public function http_with_includeResponse()
187187
{
188-
$testData = str_repeat("1", 1000); //1k, so it does not use the buffer
188+
$testData = str_repeat("1", 69536); //1k, so it does not use the buffer
189189
$error = false;
190190
$complete = false;
191191

@@ -226,9 +226,9 @@ function () use (&$complete) {
226226
*/
227227
public function http_with_includeResponse_with_buffer()
228228
{
229-
$testData1 = str_repeat("1", 1000);
230-
$testData2 = str_repeat("1", 1000);
231-
$testData3 = str_repeat("1", 1000);
229+
$testData1 = str_repeat("1", 69536);
230+
$testData2 = str_repeat("1", 69536);
231+
$testData3 = str_repeat("1", 69536);
232232
$complete = false;
233233
$error = false;
234234

tests/Functional/TestCase.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use React\HttpClient\Request;
88
use React\Promise\Promise;
99
use Rx\React\HttpObservable;
10+
use Rx\Scheduler;
1011

1112
class TestCase extends \PHPUnit_Framework_TestCase
1213
{
@@ -20,7 +21,7 @@ public function setUp()
2021
->disableOriginalConstructor()
2122
->getMock();
2223

23-
$this->connector = $this->getMock('React\SocketClient\ConnectorInterface');
24+
$this->connector = $this->createMock('React\SocketClient\ConnectorInterface');
2425

2526
$this->connector->expects($this->once())
2627
->method('create')
@@ -57,7 +58,8 @@ protected function createHttpObservable(Request $request, $method, $url, $body =
5758
'headers' => $headers,
5859
'protocolVersion' => $protocolVersion,
5960
'bufferResults' => $bufferResults,
60-
'includeResponse' => $includeResponse
61+
'includeResponse' => $includeResponse,
62+
'scheduler' => Scheduler::getImmediate()
6163
];
6264

6365
$httpObservable = $reflection->newInstanceWithoutConstructor();

0 commit comments

Comments
 (0)