Skip to content

Commit c066714

Browse files
authored
Merge pull request #2 from RxPHP/2.0
Updated to RxPHP v2 and PHP 7
2 parents 1399128 + 7eb306b commit c066714

File tree

13 files changed

+177
-99
lines changed

13 files changed

+177
-99
lines changed

.travis.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ language: php
22
sudo: required
33

44
php:
5-
- 5.6
65
- 7
6+
- 7.1
77
- hhvm
88

9+
matrix:
10+
allow_failures:
11+
- php: hhvm
12+
913
install:
1014
- composer install
1115

1216
script:
13-
- phpunit
17+
- vendor/bin/phpunit

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Install dependencies using [composer](https://getcomposer.org/doc/00-intro.md#do
1919

2020
$source = \Rx\React\Http::get('https://www.example.com/');
2121

22-
$source->subscribeCallback(
22+
$source->subscribe(
2323
function ($data) {
2424
echo $data, PHP_EOL;
2525
},
@@ -41,7 +41,7 @@ $headers = ['Content-Type' => 'application/json'];
4141

4242
$source = \Rx\React\Http::post('https://www.example.com/', $postData, $headers);
4343

44-
$source->subscribeCallback(
44+
$source->subscribe(
4545
function ($data) {
4646
echo $data, PHP_EOL;
4747
},
@@ -68,7 +68,7 @@ $images = \Rx\Observable::fromArray($imageTypes)
6868
});
6969
});
7070

71-
$images->subscribeCallback(
71+
$images->subscribe(
7272
function ($data) {
7373
echo "Got Image: ", array_keys($data)[0], PHP_EOL;
7474
},

composer.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
}
3030
},
3131
"require": {
32-
"voryx/event-loop": "^0.2.0",
33-
"reactivex/rxphp": "^1.0.0",
32+
"PHP": "^7.0",
33+
"voryx/event-loop": "^2.0",
34+
"reactivex/rxphp": "^2.0",
3435
"react/http-client": "^0.4.8"
36+
},
37+
"require-dev": {
38+
"phpunit/phpunit": "^5.5"
3539
}
3640
}

examples/http/multiple_gets.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
});
1313
});
1414

15-
$images->subscribeCallback(
15+
$images->subscribe(
1616
function ($data) {
1717
echo "Got Image: ", array_keys($data)[0], PHP_EOL;
1818
},

examples/http/simple_get.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
$source = \Rx\React\Http::get('https://www.google.com/');
66

7-
$source->subscribeCallback(
7+
$source->subscribe(
88
function ($data) {
99
echo $data, PHP_EOL;
1010
},

examples/http/simple_post_json.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
$source = \Rx\React\Http::post('https://www.example.com/', $postData, $headers);
99

10-
$source->subscribeCallback(
10+
$source->subscribe(
1111
function ($data) {
1212
echo $data, PHP_EOL;
1313
},

examples/http/stream_body.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
$start = time();
77
$size = 0;
88

9-
$source->subscribeCallback(
9+
$source->subscribe(
1010
function ($data) use (&$size) {
1111
$size += strlen($data);
1212
echo "\033[1A", 'Downloaded size: ', number_format($size / 1024 / 1024, 2, '.', ''), 'MB', PHP_EOL;

examples/http/stream_twitter.php

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,14 @@ function generateHeader($method, $url, $params = null)
3232
->streamResults()
3333
->share();
3434

35-
$connected = $source->take(1)->doOnNext(function () {
35+
$connected = $source->take(1)->do(function () {
3636
echo 'Connected to twitter, listening in on stream:', PHP_EOL;
3737
});
3838

39+
/** @var \Rx\Observable $allTweets */
3940
$allTweets = $connected
4041
->merge($source)
41-
->lift(function () {
42-
return new \Rx\Extra\Operator\CutOperator(PHP_EOL);
43-
})
42+
->cut(PHP_EOL)
4443
->filter(function ($tweet) {
4544
return strlen(trim($tweet)) > 0;
4645
})
@@ -53,9 +52,9 @@ function generateHeader($method, $url, $params = null)
5352
return is_object($tweet);
5453
})
5554
->filter(function ($tweet) {
56-
return trim($tweet->text) == 'exit();';
55+
return trim($tweet->text) === 'exit();';
5756
})
58-
->doOnNext(function ($twitter) {
57+
->do(function ($twitter) {
5958
echo 'exit(); found, stopping...', PHP_EOL;
6059
});
6160

@@ -78,7 +77,7 @@ function generateHeader($method, $url, $params = null)
7877
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/measurement/{$id}/");
7978
})->map(function ($data) {
8079
return json_decode($data);
81-
})->subscribeCallback(
80+
})->subscribe(
8281
function ($json) {
8382
echo 'Measurement #', $json->msm_id, ' "', $json->description, '" had ', $json->participant_count, ' nodes involved', PHP_EOL;
8483
},
@@ -99,7 +98,7 @@ function () {
9998
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/probe/{$id}/");
10099
})->map(function ($data) {
101100
return json_decode($data);
102-
})->subscribeCallback(
101+
})->subscribe(
103102
function ($json) {
104103
echo 'Probe #', $json->id, ' connected since ' . date('r', $json->status_since), PHP_EOL;
105104
},

src/Http.php

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
namespace Rx\React;
44

55
use Psr\Http\Message\RequestInterface;
6-
use Rx\Observable;
76

87
class Http
98
{
10-
11-
public static function request(RequestInterface $request)
9+
public static function request(RequestInterface $request): HttpObservable
1210
{
13-
1411
$method = $request->getMethod();
1512
$url = $request->getUri();
1613
$body = $request->getBody()->getContents();
@@ -20,33 +17,33 @@ public static function request(RequestInterface $request)
2017
return new HttpObservable($method, $url, $body, $headers, $protocolVersion);
2118
}
2219

23-
public static function get($url, array $headers = [], $protocolVersion = '1.0')
20+
public static function get(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
2421
{
25-
return new HttpObservable("GET", $url, null, $headers, $protocolVersion);
22+
return new HttpObservable('GET', $url, null, $headers, $protocolVersion);
2623
}
2724

28-
public static function post($url, $body = null, array $headers = [], $protocolVersion = '1.0')
25+
public static function post(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
2926
{
30-
return new HttpObservable("POST", $url, $body, $headers, $protocolVersion);
27+
return new HttpObservable('POST', $url, $body, $headers, $protocolVersion);
3128
}
3229

33-
public static function put($url, $body = null, array $headers = [], $protocolVersion = '1.0')
30+
public static function put(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
3431
{
35-
return new HttpObservable("PUT", $url, $body, $headers, $protocolVersion);
32+
return new HttpObservable('PUT', $url, $body, $headers, $protocolVersion);
3633
}
3734

38-
public static function delete($url, array $headers = [], $protocolVersion = '1.0')
35+
public static function delete(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
3936
{
40-
return new HttpObservable("DELETE", $url, null, $headers, $protocolVersion);
37+
return new HttpObservable('DELETE', $url, null, $headers, $protocolVersion);
4138
}
4239

43-
public static function patch($url, $body = null, array $headers = [], $protocolVersion = '1.0')
40+
public static function patch(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
4441
{
45-
return new HttpObservable("PATCH", $url, $body, $headers, $protocolVersion);
42+
return new HttpObservable('PATCH', $url, $body, $headers, $protocolVersion);
4643
}
4744

48-
public static function head($url, array $headers = [], $protocolVersion = '1.0')
45+
public static function head(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
4946
{
50-
return new HttpObservable("HEAD", $url, null, $headers, $protocolVersion);
47+
return new HttpObservable('HEAD', $url, null, $headers, $protocolVersion);
5148
}
5249
}

src/HttpObservable.php

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,77 +5,73 @@
55
use React\Dns\Resolver\Factory;
66
use React\HttpClient\Response;
77
use Rx\Disposable\CallbackDisposable;
8+
use Rx\DisposableInterface;
89
use Rx\Observable;
910
use Rx\ObserverInterface;
10-
use Rx\Scheduler\ImmediateScheduler;
11+
use Rx\Scheduler;
1112
use Rx\SchedulerInterface;
1213

1314
class HttpObservable extends Observable
1415
{
15-
16-
/** @var string */
1716
private $method;
1817

19-
/** @var string */
2018
private $url;
2119

22-
/** @var string */
2320
private $body;
2421

25-
/** @var array */
2622
private $headers;
2723

28-
/** @var string */
2924
private $protocolVersion;
3025

31-
/** @var boolean */
3226
private $bufferResults;
3327

34-
/** @var boolean */
3528
private $includeResponse;
3629

30+
private $scheduler;
31+
3732
/** @var \React\HttpClient\Client */
3833
private $client;
3934

40-
public function __construct($method, $url, $body = null, array $headers = [], $protocolVersion = '1.0', $bufferResults = true, $includeResponse = false)
41-
{
35+
public function __construct(
36+
string $method,
37+
string $url,
38+
string $body = null,
39+
array $headers = [],
40+
string $protocolVersion = '1.1',
41+
bool $bufferResults = true,
42+
bool $includeResponse = false,
43+
SchedulerInterface $scheduler = null
44+
) {
4245
$this->method = $method;
4346
$this->url = $url;
4447
$this->body = $body;
4548
$this->headers = $headers;
4649
$this->protocolVersion = $protocolVersion;
4750
$this->bufferResults = $bufferResults;
4851
$this->includeResponse = $includeResponse;
52+
$this->scheduler = $scheduler ?: Scheduler::getDefault();
4953

5054
$loop = \EventLoop\getLoop();
5155
$dnsResolverFactory = new Factory();
5256
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
5357
$factory = new \React\HttpClient\Factory();
5458
$this->client = $factory->create($loop, $dnsResolver);
55-
5659
}
5760

58-
/**
59-
* @param ObserverInterface $observer
60-
* @param SchedulerInterface|null $scheduler
61-
* @return \Rx\Disposable\CompositeDisposable|\Rx\DisposableInterface
62-
*/
63-
public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null)
61+
protected function _subscribe(ObserverInterface $observer): DisposableInterface
6462
{
65-
66-
$scheduler = $scheduler ?: new ImmediateScheduler();
67-
68-
$buffer = '';
69-
$request = $this->client->request($this->method, $this->url, $this->headers, $this->protocolVersion);
63+
$scheduler = $this->scheduler;
64+
$buffer = '';
65+
$request = $this->client->request($this->method, $this->url, $this->headers, $this->protocolVersion);
7066

7167
$request->on('response', function (Response $response) use (&$buffer, $observer, $request, $scheduler) {
7268
$response->on('data', function ($data, Response $response) use (&$buffer, $observer, $request, $scheduler) {
7369

7470
try {
75-
//Http Errors
76-
if ($response->getCode() < 200 || $response->getCode() >= 400) {
77-
$error = new HttpResponseException($request, $response, $response->getReasonPhrase(), $response->getCode());
78-
$observer->onError($error);
71+
//Buffer the data if we get a http error
72+
$code = $response->getCode();
73+
if ($code < 200 || $code >= 400) {
74+
$buffer .= $data;
7975
return;
8076
}
8177

@@ -101,6 +97,13 @@ public function subscribe(ObserverInterface $observer, SchedulerInterface $sched
10197

10298
$response->on('end', function ($end = null) use (&$buffer, $observer, $request, $response, $scheduler) {
10399

100+
$code = $response->getCode();
101+
if ($code < 200 || $code >= 400) {
102+
$error = new HttpResponseException($request, $response, $response->getReasonPhrase(), $response->getCode(), $buffer);
103+
$observer->onError($error);
104+
return;
105+
}
106+
104107
if ($this->bufferResults) {
105108
$data = $this->includeResponse ? [$buffer, $response, $request] : $buffer;
106109
$scheduler->schedule(function () use ($observer, $data, $scheduler) {

0 commit comments

Comments
 (0)