Skip to content

Commit 7311918

Browse files
committed
Updated React Http to v 0.5
Refactored to use socket connector
1 parent d647dc9 commit 7311918

File tree

12 files changed

+179
-115
lines changed

12 files changed

+179
-115
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"PHP": "^7.0",
3838
"voryx/event-loop": "^2.0",
3939
"reactivex/rxphp": "^2.0",
40-
"react/http-client": "^0.4.8"
40+
"react/http-client": "^0.5.0"
4141
},
4242
"require-dev": {
4343
"phpunit/phpunit": "^5.5"

examples/http/advanced_get.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
include __DIR__ . '/../../vendor/autoload.php';
4+
5+
$loop = \React\EventLoop\Factory::create();
6+
7+
\Rx\Scheduler::setDefaultFactory(function () use ($loop) {
8+
return new \Rx\Scheduler\EventLoopScheduler($loop);
9+
});
10+
11+
$connector = new \React\Socket\Connector($loop, ['dns' => (new React\Dns\Resolver\Factory())->create('4.2.2.2', $loop)]);
12+
13+
$source = (new \Rx\React\Client($loop, $connector))->request('GET', 'https://www.example.com/');
14+
15+
$source->subscribe(
16+
function ($data) {
17+
echo $data, PHP_EOL;
18+
},
19+
function (\Throwable $e) {
20+
echo $e->getMessage(), PHP_EOL;
21+
},
22+
function () {
23+
echo 'completed', PHP_EOL;
24+
}
25+
);
26+
27+
$loop->run();

examples/http/multiple_gets.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
include __DIR__ . '/../../vendor/autoload.php';
44

55

6-
$imageTypes = ["png", "jpeg", "webp"];
6+
$imageTypes = ['png', 'jpeg', 'webp'];
77

88
$images = \Rx\Observable::fromArray($imageTypes)
99
->flatMap(function ($type) {
@@ -14,12 +14,12 @@
1414

1515
$images->subscribe(
1616
function ($data) {
17-
echo "Got Image: ", array_keys($data)[0], PHP_EOL;
17+
echo 'Got Image: ', array_keys($data)[0], PHP_EOL;
1818
},
19-
function (\Exception $e) {
19+
function (\Throwable $e) {
2020
echo $e->getMessage(), PHP_EOL;
2121
},
2222
function () {
23-
echo "completed", PHP_EOL;
23+
echo 'completed', PHP_EOL;
2424
}
2525
);

examples/http/simple_get.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
function ($data) {
99
echo $data, PHP_EOL;
1010
},
11-
function (\Exception $e) {
11+
function (\Throwable $e) {
1212
echo $e->getMessage(), PHP_EOL;
1313
},
1414
function () {
15-
echo "completed", PHP_EOL;
15+
echo 'completed', PHP_EOL;
1616
}
1717
);

examples/http/simple_post_json.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
include __DIR__ . '/../../vendor/autoload.php';
44

5-
$postData = json_encode(["test" => "data"]);
5+
$postData = json_encode(['test' => 'data']);
66
$headers = ['Content-Type' => 'application/json'];
77

88
$source = \Rx\React\Http::post('https://www.example.com/', $postData, $headers);
@@ -11,10 +11,10 @@
1111
function ($data) {
1212
echo $data, PHP_EOL;
1313
},
14-
function (\Exception $e) {
14+
function (\Throwable $e) {
1515
echo $e->getMessage(), PHP_EOL;
1616
},
1717
function () {
18-
echo "completed", PHP_EOL;
18+
echo 'completed', PHP_EOL;
1919
}
2020
);

examples/http/stream_body.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ function ($data) use (&$size) {
1111
$size += strlen($data);
1212
echo "\033[1A", 'Downloaded size: ', number_format($size / 1024 / 1024, 2, '.', ''), 'MB', PHP_EOL;
1313
},
14-
function (\Exception $e) {
14+
function (\Throwable $e) {
1515
echo $e->getMessage();
1616
},
1717
function () use (&$size, $start) {

examples/http/stream_twitter.php

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
//Requires jacobkiers/oauth and rx/operator-extras
44

5+
use Rx\Observable;
6+
use Rx\React\Http;
7+
use Rx\React\HttpResponseException;
8+
59
include __DIR__ . '/../../vendor/autoload.php';
610

711
const TWITTER_USER_ID = -1; // Use http://gettwitterid.com/ to get the wanted twitter ID
@@ -28,29 +32,27 @@ function generateHeader($method, $url, $params = null)
2832
'Content-Length' => strlen($postData),
2933
];
3034

31-
$source = \Rx\React\Http::post($url, $postData, $headers, '1.1')
35+
$source = Http::post($url, $postData, $headers)
3236
->streamResults()
3337
->share();
3438

35-
$connected = $source->take(1)->do(function () {
36-
echo 'Connected to twitter, listening in on stream:', PHP_EOL;
37-
});
39+
$connected = $source
40+
->take(1)
41+
->do(function () {
42+
echo 'Connected to twitter, listening in on stream:', PHP_EOL;
43+
});
3844

39-
/** @var \Rx\Observable $allTweets */
45+
/** @var Observable $allTweets */
4046
$allTweets = $connected
4147
->merge($source)
4248
->cut(PHP_EOL)
4349
->filter(function ($tweet) {
4450
return strlen(trim($tweet)) > 0;
4551
})
46-
->map(function ($tweet) {
47-
return json_decode($tweet);
48-
});
52+
->map('json_decode');
4953

5054
$endTwitterStream = $allTweets
51-
->filter(function ($tweet) {
52-
return is_object($tweet);
53-
})
55+
->filter('is_object')
5456
->filter(function ($tweet) {
5557
return trim($tweet->text) === 'exit();';
5658
})
@@ -65,47 +67,51 @@ function generateHeader($method, $url, $params = null)
6567
$tweets = $usersTweets->takeUntil($endTwitterStream);
6668

6769
$urls = $tweets->flatMap(function ($tweet) {
68-
return \Rx\Observable::fromArray($tweet->entities->urls);
70+
return Observable::fromArray($tweet->entities->urls);
6971
});
7072

7173
$measurementsSubscription = $urls
7274
->filter(function ($url) {
73-
return substr($url->expanded_url, 0, 36) == 'https://atlas.ripe.net/measurements/';
74-
})->map(function ($url) {
75+
return 0 === strpos($url->expanded_url, 'https://atlas.ripe.net/measurements/');
76+
})
77+
->map(function ($url) {
7578
return trim(substr($url->expanded_url, 36), '/');
76-
})->flatMap(function ($id) {
77-
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/measurement/{$id}/");
78-
})->map(function ($data) {
79-
return json_decode($data);
80-
})->subscribe(
79+
})
80+
->flatMap(function ($id) {
81+
return Http::get("https://atlas.ripe.net/api/v1/measurement/{$id}/");
82+
})
83+
->map('json_decode')
84+
->subscribe(
8185
function ($json) {
8286
echo 'Measurement #', $json->msm_id, ' "', $json->description, '" had ', $json->participant_count, ' nodes involved', PHP_EOL;
8387
},
84-
function (\Rx\React\HttpResponseException $e) {
85-
echo "Error: ", $e->getMessage(), PHP_EOL;
88+
function (HttpResponseException $e) {
89+
echo 'Error: ', $e->getMessage(), PHP_EOL;
8690
},
8791
function () {
88-
echo "complete", PHP_EOL;
92+
echo 'complete', PHP_EOL;
8993
}
9094
);
9195

9296
$probesSubscription = $urls
9397
->filter(function ($url) {
94-
return substr($url->expanded_url, 0, 30) == 'https://atlas.ripe.net/probes/';
95-
})->map(function ($url) {
98+
return 0 === strpos($url->expanded_url, 'https://atlas.ripe.net/probes/');
99+
})
100+
->map(function ($url) {
96101
return trim(substr($url->expanded_url, 30), '/');
97-
})->flatMap(function ($id) {
98-
return \Rx\React\Http::get("https://atlas.ripe.net/api/v1/probe/{$id}/");
99-
})->map(function ($data) {
100-
return json_decode($data);
101-
})->subscribe(
102+
})
103+
->flatMap(function ($id) {
104+
return Http::get("https://atlas.ripe.net/api/v1/probe/{$id}/");
105+
})
106+
->map('json_decode')
107+
->subscribe(
102108
function ($json) {
103109
echo 'Probe #', $json->id, ' connected since ' . date('r', $json->status_since), PHP_EOL;
104110
},
105-
function (\Exception $e) {
106-
echo "Error: ", $e->getMessage(), PHP_EOL;
111+
function (\Throwable $e) {
112+
echo 'Error: ', $e->getMessage(), PHP_EOL;
107113
},
108114
function () {
109-
echo "complete", PHP_EOL;
115+
echo 'complete', PHP_EOL;
110116
}
111117
);

src/Client.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
namespace Rx\React;
4+
5+
use function EventLoop\getLoop;
6+
use function EventLoop\setLoop;
7+
use React\EventLoop\LoopInterface;
8+
use React\HttpClient\Client as HttpClient;
9+
use React\Socket\Connector;
10+
use React\Socket\ConnectorInterface;
11+
12+
final class Client
13+
{
14+
private $client;
15+
16+
public function __construct(LoopInterface $loop = null, ConnectorInterface $connector = null)
17+
{
18+
if ($loop) {
19+
setLoop($loop);
20+
}
21+
22+
$loop = getLoop();
23+
$connector = $connector ?: new Connector($loop);
24+
$this->client = new HttpClient($loop, $connector);
25+
}
26+
27+
public function request(string $method, string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
28+
{
29+
return new HttpObservable($method, $url, $body, $headers, $protocolVersion, $this->client);
30+
}
31+
}

src/Http.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Psr\Http\Message\RequestInterface;
66

7-
class Http
7+
final class Http
88
{
99
public static function request(RequestInterface $request): HttpObservable
1010
{
@@ -14,36 +14,36 @@ public static function request(RequestInterface $request): HttpObservable
1414
$headers = $request->getHeaders();
1515
$protocolVersion = $request->getProtocolVersion();
1616

17-
return new HttpObservable($method, $url, $body, $headers, $protocolVersion);
17+
return (new Client)->request($method, $url, $body, $headers, $protocolVersion);
1818
}
1919

2020
public static function get(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
2121
{
22-
return new HttpObservable('GET', $url, null, $headers, $protocolVersion);
22+
return (new Client)->request('GET', $url, null, $headers, $protocolVersion);
2323
}
2424

2525
public static function post(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
2626
{
27-
return new HttpObservable('POST', $url, $body, $headers, $protocolVersion);
27+
return (new Client)->request('POST', $url, $body, $headers, $protocolVersion);
2828
}
2929

3030
public static function put(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
3131
{
32-
return new HttpObservable('PUT', $url, $body, $headers, $protocolVersion);
32+
return (new Client)->request('PUT', $url, $body, $headers, $protocolVersion);
3333
}
3434

3535
public static function delete(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
3636
{
37-
return new HttpObservable('DELETE', $url, null, $headers, $protocolVersion);
37+
return (new Client)->request('DELETE', $url, null, $headers, $protocolVersion);
3838
}
3939

4040
public static function patch(string $url, string $body = null, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
4141
{
42-
return new HttpObservable('PATCH', $url, $body, $headers, $protocolVersion);
42+
return (new Client)->request('PATCH', $url, $body, $headers, $protocolVersion);
4343
}
4444

4545
public static function head(string $url, array $headers = [], string $protocolVersion = '1.1'): HttpObservable
4646
{
47-
return new HttpObservable('HEAD', $url, null, $headers, $protocolVersion);
47+
return (new Client)->request('HEAD', $url, null, $headers, $protocolVersion);
4848
}
4949
}

0 commit comments

Comments
 (0)