Skip to content

Commit ac0d4c3

Browse files
authored
Merge pull request #121 from xp-forge/feature/websockets
Websockets support
2 parents a6b37c0 + 28ceba1 commit ac0d4c3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1899
-349
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ Server models
5252
The four server models (*selectable via `-m <model>` on the command line*) are:
5353
5454
* **async** (*the default since 3.0.0*): A single-threaded web server. Handlers can yield control back to the server to serve other clients during lengthy operations such as file up- and downloads.
55-
* **sequential**: Same as above, but blocks until one client's HTTP request handler has finished executing before serving the next request.
5655
* **prefork**: Much like Apache, forks a given number of children to handle HTTP requests. Requires the `pcntl` extension.
5756
* **develop**: As mentioned above, built ontop of the PHP development wenserver. Application code is recompiled and application setup performed from scratch on every request, errors and debug output are handled by the [development console](https://github.com/xp-forge/web/pull/35).
5857

composer.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
"keywords": ["module", "xp"],
88
"require" : {
99
"xp-framework/core": "^12.0 | ^11.0 | ^10.0",
10-
"xp-framework/networking": "^10.1 | ^9.3",
11-
"xp-forge/uri": "^3.0 | ^2.0",
10+
"xp-framework/networking": "^10.1",
11+
"xp-forge/uri": "^3.1",
12+
"xp-forge/websockets": "^4.1",
1213
"php": ">=7.4.0"
1314
},
1415
"require-dev" : {

src/it/php/web/unittest/IntegrationTest.class.php

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
<?php namespace web\unittest;
22

3-
use test\{Assert, After, Test, Values};
3+
use peer\ProtocolException;
4+
use test\{Assert, After, Expect, Test, Values};
5+
use util\Bytes;
6+
use websocket\WebSocket;
47

58
#[StartServer(TestingServer::class)]
69
class IntegrationTest {
7-
const FORM_URLENCODED = 'application/x-www-form-urlencoded';
10+
const FORM_URLENCODED= 'application/x-www-form-urlencoded';
811

912
private $server;
1013

@@ -13,9 +16,10 @@ public function __construct($server) {
1316
$this->server= $server;
1417
}
1518

16-
#[After]
17-
public function shutdown() {
18-
$this->server->shutdown();
19+
/** @return iterable */
20+
private function messages() {
21+
yield ['Test', 'Echo: Test'];
22+
yield [new Bytes([8, 15]), new Bytes([47, 11, 8, 15])];
1923
}
2024

2125
/**
@@ -161,4 +165,34 @@ public function with_large_cookie($length) {
161165
$r= $this->send('GET', '/cookie', '1.0', ['Cookie' => $header]);
162166
Assert::equals((string)strlen($header), $r['body']);
163167
}
168+
169+
#[Test, Values(from: 'messages')]
170+
public function websocket_message($input, $output) {
171+
try {
172+
$ws= new WebSocket($this->server->connection, '/ws');
173+
$ws->connect();
174+
$ws->send($input);
175+
$result= $ws->receive();
176+
} finally {
177+
$ws->close();
178+
}
179+
Assert::equals($output, $result);
180+
}
181+
182+
#[Test, Expect(class: ProtocolException::class, message: 'Connection closed (#1007): Not valid utf-8')]
183+
public function invalid_utf8_passed_to_websocket_text_message() {
184+
try {
185+
$ws= new WebSocket($this->server->connection, '/ws');
186+
$ws->connect();
187+
$ws->send("\xfc");
188+
$ws->receive();
189+
} finally {
190+
$ws->close();
191+
}
192+
}
193+
194+
#[After]
195+
public function shutdown() {
196+
$this->server->shutdown();
197+
}
164198
}

src/it/php/web/unittest/TestingApplication.class.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
<?php namespace web\unittest;
22

3+
use Throwable;
34
use lang\XPClass;
4-
use test\Assert;
5+
use util\Bytes;
6+
use web\handler\WebSocket;
57
use web\{Application, Error};
68

79
class TestingApplication extends Application {
810

911
/** @return var */
1012
public function routes() {
1113
return [
14+
'/ws' => new WebSocket(function($conn, $payload) {
15+
if ($payload instanceof Bytes) {
16+
$conn->send(new Bytes("\057\013{$payload}"));
17+
} else {
18+
$conn->send('Echo: '.$payload);
19+
}
20+
}),
1221
'/status/420' => function($req, $res) {
1322
$res->answer(420, $req->param('message') ?? 'Enhance your calm');
1423
$res->send('Answered with status 420', 'text/plain');
@@ -20,7 +29,7 @@ public function routes() {
2029
},
2130
'/raise/exception' => function($req, $res) {
2231
$class= XPClass::forName(basename($req->uri()->path()));
23-
if ($class->isSubclassOf(\Throwable::class)) throw $class->newInstance('Raised');
32+
if ($class->isSubclassOf(Throwable::class)) throw $class->newInstance('Raised');
2433

2534
// A non-exception class was passed!
2635
$res->answer(200, 'No error');

src/it/php/web/unittest/TestingServer.class.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use peer\server\AsyncServer;
66
use util\cmd\Console;
77
use web\{Environment, Logging};
8-
use xp\web\srv\HttpProtocol;
8+
use xp\web\srv\{Protocol, HttpProtocol, WebSocketProtocol};
99

1010
/**
1111
* Socket server used by integration tests.
@@ -23,10 +23,14 @@ class TestingServer {
2323
public static function main(array $args) {
2424
$application= new TestingApplication(new Environment('test', '.', '.', '.', [], null));
2525
$socket= new ServerSocket('127.0.0.1', $args[0] ?? 0);
26+
$log= new Logging(null);
2627

2728
$s= new AsyncServer();
2829
try {
29-
$s->listen($socket, HttpProtocol::executing($application, new Logging(null)));
30+
$s->listen($socket, Protocol::multiplex()
31+
->serving('http', new HttpProtocol($application, $log))
32+
->serving('websocket', new WebSocketProtocol(null, $log))
33+
);
3034
$s->init();
3135
Console::writeLinef('+ Service %s:%d', $socket->host, $socket->port);
3236
$s->service();

src/main/php/web/Application.class.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ public function install($arg) {
8080
*
8181
* @param web.Request $request
8282
* @param web.Response $response
83-
* @return var
83+
* @return iterable
8484
*/
8585
public function service($request, $response) {
8686
$seen= [];
8787

8888
// Handle dispatching
8989
dispatch: $result= $this->routing()->handle($request, $response);
90+
$return= null;
9091
if ($result instanceof Traversable) {
9192
foreach ($result as $kind => $argument) {
9293
if ('dispatch' === $kind) {
@@ -96,10 +97,16 @@ public function service($request, $response) {
9697
throw new Error(508, 'Internal redirect loop caused by dispatch to '.$argument);
9798
}
9899
goto dispatch;
100+
} else if ('connection' === $kind) {
101+
$response->header('Connection', 'upgrade');
102+
$response->header('Upgrade', $argument[0]);
103+
$return= $argument;
104+
} else {
105+
yield $kind => $argument;
99106
}
100-
yield $kind => $argument;
101107
}
102108
}
109+
return $return;
103110
}
104111

105112
/** @return string */

src/main/php/web/Logging.class.php

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,33 @@ public function tee($sink) {
5858
}
5959

6060
/**
61-
* Writes a log entry
61+
* Writes an HTTP exchange to the log
6262
*
6363
* @param web.Request $response
6464
* @param web.Response $response
6565
* @param [:var] $hints Optional hints
6666
* @return void
6767
*/
68-
public function log($request, $response, $hints= []) {
69-
$this->sink && $this->sink->log($request, $response, $hints);
68+
public function exchange($request, $response, $hints= []) {
69+
$this->sink && $this->sink->log(
70+
$response->status(),
71+
$request->method(),
72+
$request->uri()->resource(),
73+
$response->trace + $hints
74+
);
75+
}
76+
77+
/**
78+
* Writes a log entry
79+
*
80+
* @param string $status
81+
* @param string $method
82+
* @param string $resource
83+
* @param [:var] $hints Optional hints
84+
* @return void
85+
*/
86+
public function log($status, $method, $resource, $hints= []) {
87+
$this->sink && $this->sink->log($status, $method, $resource, $hints);
7088
}
7189

7290
/**

src/main/php/web/Request.class.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public function __construct(Input $input) {
3030
}
3131

3232
$this->method= $input->method();
33-
$this->uri= (new URI($input->scheme().'://'.$this->header('Host', 'localhost').$input->uri()))->canonicalize();
33+
$this->uri= (new URI($input->scheme().'://'.$this->header('Host', 'localhost').$input->resource()))->canonicalize();
3434
$this->input= $input;
3535
}
3636

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?php namespace web\handler;
2+
3+
use web\Handler;
4+
use web\io\EventSink;
5+
use websocket\Listeners;
6+
7+
/**
8+
* WebSocket handler used for routing websocket handshake requests
9+
*
10+
* @test web.unittest.handler.WebSocketTest
11+
* @see https://www.rfc-editor.org/rfc/rfc6455
12+
*/
13+
class WebSocket implements Handler {
14+
const GUID= '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
15+
16+
private $listener;
17+
18+
/** @param function(websocket.protocol.Connection, string|util.Bytes): var|websocket.Listener $listener */
19+
public function __construct($listener) {
20+
$this->listener= Listeners::cast($listener);
21+
}
22+
23+
/**
24+
* Handles a request
25+
*
26+
* @param web.Request $request
27+
* @param web.Response $response
28+
* @return var
29+
*/
30+
public function handle($request, $response) {
31+
switch ($version= (int)$request->header('Sec-WebSocket-Version')) {
32+
case 13: // RFC 6455
33+
$key= $request->header('Sec-WebSocket-Key');
34+
$response->answer(101);
35+
$response->header('Sec-WebSocket-Accept', base64_encode(sha1($key.self::GUID, true)));
36+
foreach ($this->listener->protocols ?? [] as $protocol) {
37+
$response->header('Sec-WebSocket-Protocol', $protocol, true);
38+
}
39+
break;
40+
41+
case 9: // Reserved version, use for WS <-> SSE translation
42+
$response->answer(200);
43+
$response->header('Content-Type', 'text/event-stream');
44+
$response->header('Transfer-Encoding', 'chunked');
45+
$response->trace('websocket', $request->header('Sec-WebSocket-Id'));
46+
47+
$events= new EventSink($request, $response);
48+
foreach ($events->receive() as $message) {
49+
$this->listener->message($events, $message);
50+
}
51+
return;
52+
53+
case 0:
54+
$response->answer(426);
55+
$response->send('This service requires use of the WebSocket protocol', 'text/plain');
56+
return;
57+
58+
default:
59+
$response->answer(400);
60+
$response->send('This service does not support WebSocket version '.$version, 'text/plain');
61+
return;
62+
}
63+
64+
yield 'connection' => ['websocket', [
65+
'path' => $request->uri()->resource(),
66+
'headers' => $request->headers(),
67+
'listener' => $this->listener,
68+
]];
69+
}
70+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php namespace web\io;
2+
3+
use io\streams\Streams;
4+
use lang\IllegalStateException;
5+
use util\Bytes;
6+
use websocket\protocol\{Opcodes, Connection};
7+
8+
/** @test web.unittest.io.EventSinkTest */
9+
class EventSink extends Connection {
10+
private $request, $out;
11+
12+
/**
13+
* Creates a new event sink
14+
*
15+
* @param web.Request $request
16+
* @param web.Response $response
17+
*/
18+
public function __construct($request, $response) {
19+
$this->request= $request;
20+
$this->out= $response->stream();
21+
parent::__construct(null, null, null, $request->uri()->resource(), $request->headers());
22+
}
23+
24+
/**
25+
* Receives messages
26+
*
27+
* @return iterable
28+
*/
29+
public function receive() {
30+
switch ($mime= $this->request->header('Content-Type')) {
31+
case 'text/plain': yield Opcodes::TEXT => Streams::readAll($this->request->stream()); break;
32+
case 'application/octet-stream': yield Opcodes::BINARY => new Bytes(Streams::readAll($this->request->stream())); break;
33+
default: throw new IllegalStateException('Unexpected content type '.$mime);
34+
}
35+
}
36+
37+
/**
38+
* Sends a websocket message
39+
*
40+
* @param string|util.Bytes $message
41+
* @return void
42+
*/
43+
public function send($message) {
44+
if ($message instanceof Bytes) {
45+
$this->out->write("event: bytes\ndata: ".addcslashes($message, "\r\n")."\n\n");
46+
} else {
47+
$this->out->write("data: ".addcslashes($message, "\r\n")."\n\n");
48+
}
49+
}
50+
51+
/**
52+
* Closes the websocket connection
53+
*
54+
* @param int $code
55+
* @param string $reason
56+
* @return void
57+
*/
58+
public function close($code= 1000, $reason= '') {
59+
$this->out->write("event: close\ndata: ".$code.':'.addcslashes($reason, "\r\n")."\n\n");
60+
}
61+
}

0 commit comments

Comments
 (0)