Skip to content

Commit

Permalink
Merge pull request #12 from utopia-dart/fix-close
Browse files Browse the repository at this point in the history
Improve threading and isolate handling
  • Loading branch information
lohanidamodar committed Mar 23, 2024
2 parents 973232d + f8f150d commit ffd23f3
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 83 deletions.
65 changes: 52 additions & 13 deletions lib/src/http.dart
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import 'dart:async';
import 'dart:developer' as dev;
import 'dart:io';
import 'dart:isolate';

import 'package:utopia_di/utopia_di.dart';

import 'app_mode.dart';
import 'isolate_entry_point.dart';
import 'isolate_message.dart';
import 'isolate_supervisor.dart';
import 'request.dart';
import 'response.dart';
import 'route.dart';
import 'router.dart';
import 'server.dart';
import 'validation_exception.dart';

final List<IsolateSupervisor> _supervisors = [];

/// Http class used to bootstrap your Http server
/// You need to use one of the server adapters. Currently only
/// Shelf adapter is available
Expand Down Expand Up @@ -42,6 +49,8 @@ class Http {
_router = Router();
}

List<IsolateSupervisor> get supervisors => _supervisors;

/// Server adapter, currently only shelf server is supported
final Server server;

Expand Down Expand Up @@ -69,7 +78,6 @@ class Http {
final List<Hook> _init = [];
final List<Hook> _shutdown = [];
final List<Hook> _options = [];
List<HttpServer> _servers = [];

late final Router _router;

Expand All @@ -87,20 +95,50 @@ class Http {
/// Is application running in staging mode
bool get isStage => mode == AppMode.stage;

/// List of servers running
List<HttpServer> get servers => _servers;

/// Memory cached result for chosen route
Route? route;

/// Start the servers
Future<List<HttpServer>> start() async {
_servers = await server.start(
run,
Future<void> start() async {
_supervisors.clear();
for (int i = 0; i < threads; i++) {
final supervisor = await _spawn(
context: i.toString(),
handler: run,
path: path,
);
_supervisors.add(supervisor);
supervisor.resume();
dev.log('Worker ${i.toString()} ready.', name: 'FINE');
}
}

Future<IsolateSupervisor> _spawn({
required String context,
required Handler handler,
SecurityContext? securityContext,
String? path,
}) async {
final receivePort = ReceivePort();
final message = IsolateMessage(
server: server,
context: context,
handler: run,
securityContext: securityContext,
path: path,
threads: threads,
sendPort: receivePort.sendPort,
);
final isolate = await Isolate.spawn(
entrypoint,
message,
paused: true,
debugName: 'isolate_$context',
);
return IsolateSupervisor(
isolate: isolate,
receivePort: receivePort,
context: message.context,
);
return _servers;
}

/// Initialize a GET route
Expand Down Expand Up @@ -342,6 +380,7 @@ class Http {

/// Run the execution for given request
FutureOr<Response> run(Request request, String context) async {
setResource('context', () => context, context: context);
setResource('request', () => request, context: context);

try {
Expand Down Expand Up @@ -436,10 +475,10 @@ class Http {
mode = null;
}

/// Close all the servers
Future<void> closeServer({bool force = false}) async {
for (final server in _servers) {
await server.close(force: force);
/// Stop servers
Future<void> stop() async {
for (final sup in supervisors) {
sup.stop();
}
}
}
26 changes: 26 additions & 0 deletions lib/src/isolate_entry_point.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import 'dart:developer' as dev;
import 'dart:isolate';

import 'isolate_message.dart';
import 'isolate_supervisor.dart';

Future<void> entrypoint(IsolateMessage message) async {
final ReceivePort receivePort = ReceivePort();
await message.server.start(
message.handler,
path: message.path,
context: message.context,
);

message.sendPort.send(receivePort.sendPort);
receivePort.listen((data) async {
if (data == IsolateSupervisor.messageClose) {
dev.log(
'Received close message on isolate ${message.context}',
name: 'FINE',
);
await message.server.stop();
receivePort.close();
}
});
}
22 changes: 22 additions & 0 deletions lib/src/isolate_message.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'dart:io';
import 'dart:isolate' as iso;

import 'server.dart';

class IsolateMessage {
final Handler handler;
final SecurityContext? securityContext;
final String? path;
final String context;
final iso.SendPort sendPort;
final Server server;

IsolateMessage({
required this.server,
required this.handler,
required this.context,
this.path,
this.securityContext,
required this.sendPort,
});
}
34 changes: 34 additions & 0 deletions lib/src/isolate_supervisor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import 'dart:developer' as dev;
import 'dart:isolate' as iso;

class IsolateSupervisor {
final iso.Isolate isolate;
final iso.ReceivePort receivePort;
final String context;
iso.SendPort? _isolateSendPort;

static const String messageClose = '_CLOSE';

IsolateSupervisor({
required this.isolate,
required this.receivePort,
required this.context,
});

void resume() {
receivePort.listen(listen);
isolate.resume(isolate.pauseCapability!);
}

void stop() {
dev.log('Stopping isolate $context', name: 'FINE');
_isolateSendPort?.send(messageClose);
receivePort.close();
}

void listen(dynamic message) async {
if (message is iso.SendPort) {
_isolateSendPort = message;
}
}
}
7 changes: 5 additions & 2 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ abstract class Server {
Server(this.address, this.port, {this.securityContext});

/// Start the server
Future<List<HttpServer>> start(
Future<void> start(
Handler handler, {
String context = 'utopia',
String? path,
int threads = 1,
});

/// Stop the server
Future<void> stop();
}
99 changes: 34 additions & 65 deletions lib/src/servers/shelf.dart
Original file line number Diff line number Diff line change
@@ -1,93 +1,62 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate' as iso;

import 'package:shelf/shelf.dart' as shelf;
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_static/shelf_static.dart';

import '../request.dart';
import '../response.dart';
import '../server.dart';

class _IsolateMessage {
final Handler handler;
final SecurityContext? securityContext;
final dynamic address;
final int port;
final String? path;
final String context;

_IsolateMessage({
required this.handler,
required this.address,
required this.port,
required this.context,
this.path,
this.securityContext,
});
}

/// ShelfServer
///
/// Create a server
class ShelfServer extends Server {
static final List<HttpServer> _servers = [];
Handler? handler;
String? path;

HttpServer? _server;
ShelfServer(super.address, super.port, {super.securityContext});

/// Start the server
@override
Future<List<HttpServer>> start(
Future<void> start(
Handler handler, {
String context = 'utopia',
String? path,
int threads = 1,
}) async {
this.handler = handler;
this.path = path;
iso.ReceivePort();
await _spawnOffIsolates(threads);
return _servers;
}
var shelfHandler = (shelf.Request request) => _handleRequest(
request,
context,
handler,
);
if (path != null) {
shelfHandler = shelf.Cascade()
.add(createStaticHandler(path))
.add(
(request) => _handleRequest(
request,
context,
handler,
),
)
.handler;
}

static Future<void> _onIsolateMain(_IsolateMessage message) async {
final server = await shelf_io.serve(
message.path != null
? shelf.Cascade()
.add(createStaticHandler(message.path!))
.add(
(request) =>
_handleRequest(request, message.context, message.handler),
)
.handler
: (request) =>
_handleRequest(request, message.context, message.handler),
message.address,
message.port,
securityContext: message.securityContext,
_server = await shelf_io.serve(
shelfHandler,
address,
port,
securityContext: securityContext,
shared: true,
);
print('Worker ${message.context} ready');
_servers.add(server);
}

Future<void> _spawnOffIsolates(int num) async {
for (var i = 0; i < num; i++) {
await iso.Isolate.spawn<_IsolateMessage>(
_onIsolateMain,
_IsolateMessage(
context: i.toString(),
handler: handler!,
address: address,
port: port,
securityContext: securityContext,
path: path,
),
);
}
/// Stop servers
@override
Future<void> stop() async {
await _server?.close(force: true);
}

static FutureOr<shelf.Response> _handleRequest(
FutureOr<shelf.Response> _handleRequest(
shelf.Request sheflRequest,
String context,
Handler handler,
Expand All @@ -97,7 +66,7 @@ class ShelfServer extends Server {
return _toShelfResponse(response);
}

static Request _fromShelfRequest(shelf.Request shelfRequest) {
Request _fromShelfRequest(shelf.Request shelfRequest) {
return Request(
shelfRequest.method,
shelfRequest.url,
Expand All @@ -108,7 +77,7 @@ class ShelfServer extends Server {
);
}

static shelf.Response _toShelfResponse(Response response) {
shelf.Response _toShelfResponse(Response response) {
final res = shelf.Response(
response.status,
body: response.body,
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: utopia_http
description: A light and easy to get started with HTTP server library for Dart
version: 0.1.0
version: 0.2.0
homepage: https://github.com/utopia-dart/utopia_http

environment:
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/http_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void main() {
test('file upload', fileUpload);

tearDown(() async {
await http?.closeServer();
await http?.stop();
});
});
}
Expand Down Expand Up @@ -67,7 +67,7 @@ void jsonTest() async {
final data = {
"userId": "myuserid",
"email": "[email protected]",
"name": "myname"
"name": "myname",
};
req.write(jsonEncode(data));
final res = await req.close();
Expand Down

0 comments on commit ffd23f3

Please sign in to comment.