diff --git a/lib/src/http.dart b/lib/src/http.dart index d578593..22b55f8 100644 --- a/lib/src/http.dart +++ b/lib/src/http.dart @@ -1,9 +1,14 @@ import 'dart:async'; +import 'dart:developer' as dev; import 'dart:io'; +import 'dart:isolate'; import 'package:utopia_di/utopia_di.dart'; import 'app_mode.dart'; +import 'isolate_entry_point.dart'; +import 'isolate_message.dart'; +import 'isolate_supervisor.dart'; import 'request.dart'; import 'response.dart'; import 'route.dart'; @@ -11,6 +16,8 @@ import 'router.dart'; import 'server.dart'; import 'validation_exception.dart'; +final List _supervisors = []; + /// Http class used to bootstrap your Http server /// You need to use one of the server adapters. Currently only /// Shelf adapter is available @@ -42,6 +49,8 @@ class Http { _router = Router(); } + List get supervisors => _supervisors; + /// Server adapter, currently only shelf server is supported final Server server; @@ -69,7 +78,6 @@ class Http { final List _init = []; final List _shutdown = []; final List _options = []; - List _servers = []; late final Router _router; @@ -87,20 +95,50 @@ class Http { /// Is application running in staging mode bool get isStage => mode == AppMode.stage; - /// List of servers running - List get servers => _servers; - /// Memory cached result for chosen route Route? route; /// Start the servers - Future> start() async { - _servers = await server.start( - run, + Future start() async { + _supervisors.clear(); + for (int i = 0; i < threads; i++) { + final supervisor = await _spawn( + context: i.toString(), + handler: run, + path: path, + ); + _supervisors.add(supervisor); + supervisor.resume(); + dev.log('Worker ${i.toString()} ready.', name: 'FINE'); + } + } + + Future _spawn({ + required String context, + required Handler handler, + SecurityContext? securityContext, + String? path, + }) async { + final receivePort = ReceivePort(); + final message = IsolateMessage( + server: server, + context: context, + handler: run, + securityContext: securityContext, path: path, - threads: threads, + sendPort: receivePort.sendPort, + ); + final isolate = await Isolate.spawn( + entrypoint, + message, + paused: true, + debugName: 'isolate_$context', + ); + return IsolateSupervisor( + isolate: isolate, + receivePort: receivePort, + context: message.context, ); - return _servers; } /// Initialize a GET route @@ -342,6 +380,7 @@ class Http { /// Run the execution for given request FutureOr run(Request request, String context) async { + setResource('context', () => context, context: context); setResource('request', () => request, context: context); try { @@ -436,10 +475,10 @@ class Http { mode = null; } - /// Close all the servers - Future closeServer({bool force = false}) async { - for (final server in _servers) { - await server.close(force: force); + /// Stop servers + Future stop() async { + for (final sup in supervisors) { + sup.stop(); } } } diff --git a/lib/src/isolate_entry_point.dart b/lib/src/isolate_entry_point.dart new file mode 100644 index 0000000..2bfee0d --- /dev/null +++ b/lib/src/isolate_entry_point.dart @@ -0,0 +1,26 @@ +import 'dart:developer' as dev; +import 'dart:isolate'; + +import 'isolate_message.dart'; +import 'isolate_supervisor.dart'; + +Future entrypoint(IsolateMessage message) async { + final ReceivePort receivePort = ReceivePort(); + await message.server.start( + message.handler, + path: message.path, + context: message.context, + ); + + message.sendPort.send(receivePort.sendPort); + receivePort.listen((data) async { + if (data == IsolateSupervisor.messageClose) { + dev.log( + 'Received close message on isolate ${message.context}', + name: 'FINE', + ); + await message.server.stop(); + receivePort.close(); + } + }); +} diff --git a/lib/src/isolate_message.dart b/lib/src/isolate_message.dart new file mode 100644 index 0000000..03440f3 --- /dev/null +++ b/lib/src/isolate_message.dart @@ -0,0 +1,22 @@ +import 'dart:io'; +import 'dart:isolate' as iso; + +import 'server.dart'; + +class IsolateMessage { + final Handler handler; + final SecurityContext? securityContext; + final String? path; + final String context; + final iso.SendPort sendPort; + final Server server; + + IsolateMessage({ + required this.server, + required this.handler, + required this.context, + this.path, + this.securityContext, + required this.sendPort, + }); +} diff --git a/lib/src/isolate_supervisor.dart b/lib/src/isolate_supervisor.dart new file mode 100644 index 0000000..06f0cec --- /dev/null +++ b/lib/src/isolate_supervisor.dart @@ -0,0 +1,34 @@ +import 'dart:developer' as dev; +import 'dart:isolate' as iso; + +class IsolateSupervisor { + final iso.Isolate isolate; + final iso.ReceivePort receivePort; + final String context; + iso.SendPort? _isolateSendPort; + + static const String messageClose = '_CLOSE'; + + IsolateSupervisor({ + required this.isolate, + required this.receivePort, + required this.context, + }); + + void resume() { + receivePort.listen(listen); + isolate.resume(isolate.pauseCapability!); + } + + void stop() { + dev.log('Stopping isolate $context', name: 'FINE'); + _isolateSendPort?.send(messageClose); + receivePort.close(); + } + + void listen(dynamic message) async { + if (message is iso.SendPort) { + _isolateSendPort = message; + } + } +} diff --git a/lib/src/server.dart b/lib/src/server.dart index a9d2e96..74c4d4a 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -20,9 +20,12 @@ abstract class Server { Server(this.address, this.port, {this.securityContext}); /// Start the server - Future> start( + Future start( Handler handler, { + String context = 'utopia', String? path, - int threads = 1, }); + + /// Stop the server + Future stop(); } diff --git a/lib/src/servers/shelf.dart b/lib/src/servers/shelf.dart index e7dc330..4c8e053 100644 --- a/lib/src/servers/shelf.dart +++ b/lib/src/servers/shelf.dart @@ -1,93 +1,62 @@ import 'dart:async'; import 'dart:io'; -import 'dart:isolate' as iso; + import 'package:shelf/shelf.dart' as shelf; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_static/shelf_static.dart'; + import '../request.dart'; import '../response.dart'; import '../server.dart'; -class _IsolateMessage { - final Handler handler; - final SecurityContext? securityContext; - final dynamic address; - final int port; - final String? path; - final String context; - - _IsolateMessage({ - required this.handler, - required this.address, - required this.port, - required this.context, - this.path, - this.securityContext, - }); -} - /// ShelfServer /// /// Create a server class ShelfServer extends Server { - static final List _servers = []; - Handler? handler; - String? path; - + HttpServer? _server; ShelfServer(super.address, super.port, {super.securityContext}); /// Start the server @override - Future> start( + Future start( Handler handler, { + String context = 'utopia', String? path, - int threads = 1, }) async { - this.handler = handler; - this.path = path; - iso.ReceivePort(); - await _spawnOffIsolates(threads); - return _servers; - } + var shelfHandler = (shelf.Request request) => _handleRequest( + request, + context, + handler, + ); + if (path != null) { + shelfHandler = shelf.Cascade() + .add(createStaticHandler(path)) + .add( + (request) => _handleRequest( + request, + context, + handler, + ), + ) + .handler; + } - static Future _onIsolateMain(_IsolateMessage message) async { - final server = await shelf_io.serve( - message.path != null - ? shelf.Cascade() - .add(createStaticHandler(message.path!)) - .add( - (request) => - _handleRequest(request, message.context, message.handler), - ) - .handler - : (request) => - _handleRequest(request, message.context, message.handler), - message.address, - message.port, - securityContext: message.securityContext, + _server = await shelf_io.serve( + shelfHandler, + address, + port, + securityContext: securityContext, shared: true, ); - print('Worker ${message.context} ready'); - _servers.add(server); } - Future _spawnOffIsolates(int num) async { - for (var i = 0; i < num; i++) { - await iso.Isolate.spawn<_IsolateMessage>( - _onIsolateMain, - _IsolateMessage( - context: i.toString(), - handler: handler!, - address: address, - port: port, - securityContext: securityContext, - path: path, - ), - ); - } + /// Stop servers + @override + Future stop() async { + await _server?.close(force: true); } - static FutureOr _handleRequest( + FutureOr _handleRequest( shelf.Request sheflRequest, String context, Handler handler, @@ -97,7 +66,7 @@ class ShelfServer extends Server { return _toShelfResponse(response); } - static Request _fromShelfRequest(shelf.Request shelfRequest) { + Request _fromShelfRequest(shelf.Request shelfRequest) { return Request( shelfRequest.method, shelfRequest.url, @@ -108,7 +77,7 @@ class ShelfServer extends Server { ); } - static shelf.Response _toShelfResponse(Response response) { + shelf.Response _toShelfResponse(Response response) { final res = shelf.Response( response.status, body: response.body, diff --git a/pubspec.yaml b/pubspec.yaml index 67068eb..ea1c3c3 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: utopia_http description: A light and easy to get started with HTTP server library for Dart -version: 0.1.0 +version: 0.2.0 homepage: https://github.com/utopia-dart/utopia_http environment: diff --git a/test/e2e/http_test.dart b/test/e2e/http_test.dart index edec344..029d2b2 100644 --- a/test/e2e/http_test.dart +++ b/test/e2e/http_test.dart @@ -28,7 +28,7 @@ void main() { test('file upload', fileUpload); tearDown(() async { - await http?.closeServer(); + await http?.stop(); }); }); } @@ -67,7 +67,7 @@ void jsonTest() async { final data = { "userId": "myuserid", "email": "email@gmail.com", - "name": "myname" + "name": "myname", }; req.write(jsonEncode(data)); final res = await req.close();