diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index dc72592a..9f0050d9 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -3,13 +3,10 @@ import "dart:convert" show jsonDecode, jsonEncode; import "dart:math" show Random; import "package:gql_exec/gql_exec.dart" show Request, Response; -import "package:gql_link/gql_link.dart" - show Link, NextLink, RequestSerializer, ResponseParser; -import "package:gql_websocket_link/src/exceptions.dart" - show WebSocketLinkParserException, WebSocketLinkServerException; +import "package:gql_link/gql_link.dart" show Link, NextLink, RequestSerializer, ResponseParser; +import "package:gql_websocket_link/src/exceptions.dart" show WebSocketLinkParserException, WebSocketLinkServerException; import "package:gql_websocket_link/src/messages.dart" show ConnectionAck; -import "package:web_socket_channel/web_socket_channel.dart" - show WebSocketChannel; +import "package:web_socket_channel/web_socket_channel.dart" show WebSocketChannel; import "link.dart" show ChannelGenerator, GraphQLSocketMessageDecoder; import "transport_ws_common.dart"; @@ -165,18 +162,15 @@ class TransportWsEventHandler { /// /// Also, the second argument is the optional payload that the server may /// send through the `ConnectionAck` message. - final T? Function(WebSocketChannel socket, Map? payload)? - connected; + final T? Function(WebSocketChannel socket, Map? payload)? connected; /// The first argument communicates whether the ping was received from the server. /// If `false`, the ping was sent by the client. - final T? Function(Map? payload, {required bool received})? - ping; + final T? Function(Map? payload, {required bool received})? ping; /// The first argument communicates whether the pong was received from the server. /// If `false`, the pong was sent by the client. - final T? Function(Map? payload, {required bool received})? - pong; + final T? Function(Map? payload, {required bool received})? pong; /// Called for all **valid** messages received by the client. Mainly useful for /// debugging and logging received messages. @@ -236,8 +230,7 @@ class WebSocketMaker { /// A function that returns the URL of the GraphQL server. /// Will be used to create the [WebSocketChannel]. - const WebSocketMaker.url(FutureOr Function() this.url) - : generator = null; + const WebSocketMaker.url(FutureOr Function() this.url) : generator = null; /// A generator that will be used to create the [WebSocketChannel]. const WebSocketMaker.generator(ChannelGenerator this.generator) : url = null; @@ -443,13 +436,10 @@ class TransportWsClientOptions { final ResponseParser parser; /// A function that encodes the request message to json string before sending it over the network. - final FutureOr Function(TransportWsMessage message) - graphQLSocketMessageEncoder; + final FutureOr Function(TransportWsMessage message) graphQLSocketMessageEncoder; /// The default [graphQLSocketMessageEncoder] that encodes the request message to json string. - static String defaultGraphQLSocketMessageEncoder( - TransportWsMessage message) => - jsonEncode(message); + static String defaultGraphQLSocketMessageEncoder(TransportWsMessage message) => jsonEncode(message); /// A function that decodes the incoming http response to `Map`, /// the decoded map will be then passes to the `RequestSerializer`. @@ -460,9 +450,7 @@ class TransportWsClientOptions { final GraphQLSocketMessageDecoder graphQLSocketMessageDecoder; /// The default [graphQLSocketMessageDecoder] that decodes the request message from a json string. - static Map? defaultGraphQLSocketMessageDecoder( - dynamic message) => - jsonDecode(message as String) as Map?; + static Map? defaultGraphQLSocketMessageDecoder(dynamic message) => jsonDecode(message as String) as Map?; /// A function that logs events within the execution of the [TransportWsClient]. /// Useful for debugging. @@ -509,16 +497,14 @@ class TransportWsClientOptions { } /// non `CloseEvent`s are fatal by default - static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) => - errOrCloseEvent is! LikeCloseEvent; + static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) => errOrCloseEvent is! LikeCloseEvent; /// Generates a v4 UUID to be used as the ID using `Math` /// as the random number generator. Supply your own generator /// in case you need more uniqueness. /// /// Reference: https://gist.github.com/jed/982883 - static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" - .replaceAllMapped(RegExp("[xy]"), (c) { + static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replaceAllMapped(RegExp("[xy]"), (c) { final int r = (_random.nextDouble() * 16).floor() | 0; final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; return v.toRadixString(16); @@ -625,6 +611,8 @@ class _ConnectionState { // TODO: WebSocketChannel should have a `state` getter and `onStateChange` stream bool isOpen = false; + Map nextOrErrorMsgWaitMap = {}; + /// Checks the `connect` problem and evaluates if the client should retry. bool shouldRetryConnectOrThrow(Object errOrCloseEvent) { options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent"); @@ -713,11 +701,9 @@ class _ConnectionState { Timer? queuedPing; void enqueuePing() { if (options.keepAlive.inMicroseconds > 0) { - queuedPing - ?.cancel(); // in case where a pong was received before a ping (this is valid behaviour) + queuedPing?.cancel(); // in case where a pong was received before a ping (this is valid behaviour) queuedPing = Timer(options.keepAlive, () async { - final _pingMsg = - await options.graphQLSocketMessageEncoder(PingMessage(null)); + final _pingMsg = await options.graphQLSocketMessageEncoder(PingMessage(null)); if (isOpen) { socket.sink.add(_pingMsg); emitter.emit(TransportWsEvent.ping(null, received: false)); @@ -726,10 +712,8 @@ class _ConnectionState { } } - void Function(Object)? onError = - (Object err) => emitter.emit(TransportWsEvent.error(err)); - void Function(Object)? onClose = - (Object event) => emitter.emit(TransportWsEvent.closed(event)); + void Function(Object)? onError = (Object err) => emitter.emit(TransportWsEvent.error(err)); + void Function(Object)? onClose = (Object event) => emitter.emit(TransportWsEvent.closed(event)); errorOrClosed((errOrEvent) { options.log?.call("errorOrClosed $errOrEvent"); connecting = null; @@ -754,9 +738,7 @@ class _ConnectionState { isOpen = true; try { emitter.emit(TransportWsEvent.opened(socket)); - final payload = options.connectionParams == null - ? null - : await options.connectionParams!(); + final payload = options.connectionParams == null ? null : await options.connectionParams!(); final _initMsg = await options.graphQLSocketMessageEncoder( ConnectionInitMessage(payload), @@ -813,15 +795,17 @@ class _ConnectionState { } // parseMessage(msg!, reviver: options.jsonMessageReviver); if (!isOpen) return; + + // wait for next or error message (result) to be processed before process complete message + while (message is CompleteMessage && nextOrErrorMsgWaitMap.containsKey(message.id)) { + await Future.delayed(const Duration(milliseconds: 100)); + } + emitter.emit(TransportWsEvent.message(message)); if (message is PingMessage || message is PongMessage) { - final msgPayload = message is PingMessage - ? message.payload - : (message as PongMessage).payload; + final msgPayload = message is PingMessage ? message.payload : (message as PongMessage).payload; emitter.emit( - message is PingMessage - ? TransportWsEvent.ping(msgPayload, received: true) - : TransportWsEvent.pong(msgPayload, received: true), + message is PingMessage ? TransportWsEvent.ping(msgPayload, received: true) : TransportWsEvent.pong(msgPayload, received: true), ); // received if (message is PongMessage) { @@ -829,11 +813,9 @@ class _ConnectionState { } else if (!options.disablePong) { // respond with pong on ping socket.sink.add( - await options - .graphQLSocketMessageEncoder(PongMessage(msgPayload)), + await options.graphQLSocketMessageEncoder(PongMessage(msgPayload)), ); - emitter - .emit(TransportWsEvent.pong(msgPayload, received: false)); + emitter.emit(TransportWsEvent.pong(msgPayload, received: false)); } return; // ping and pongs can be received whenever } @@ -920,7 +902,8 @@ class _ConnectionState { waitForReleaseOrThrowOnClose: Future.any([ // wait for released.then((_) { - if (locks == 0) { + // if released, no other operations, and not keep alive, wait for the socket to close + if (locks == 0 && options.keepAlive == Duration.zero) { // and if no more locks are present, complete the connection final complete = () { isOpen = false; @@ -998,10 +981,12 @@ class _Client extends TransportWsClient { final unlisten = emitter.onMessage(id, (message) { if (message is NextMessage) { sink.add(message.payload); + state.nextOrErrorMsgWaitMap.remove(id); } else if (message is ErrorMessage) { errored = true; done = true; sink.addError(message.payload); + state.nextOrErrorMsgWaitMap.remove(id); releaser(); } else if (message is CompleteMessage) { done = true; @@ -1011,9 +996,10 @@ class _Client extends TransportWsClient { socket.sink.add(_subscribeMsg); + state.nextOrErrorMsgWaitMap[id] = null; + releaser = () async { - final _completeMsg = - await options.graphQLSocketMessageEncoder(CompleteMessage(id)); + final _completeMsg = await options.graphQLSocketMessageEncoder(CompleteMessage(id)); if (!done && state.isOpen) { // if not completed already and socket is open, send complete message to server on release socket.sink.add(_completeMsg); @@ -1145,13 +1131,11 @@ TransportWsClient createClient(TransportWsClientOptions options) { _listenersMessage[message.id]?.call(message); } - final Map> listeners = - Map.fromIterables( + final Map> listeners = Map.fromIterables( TransportWsEventType.values, TransportWsEventType.values.map((e) => []), ); - listeners[TransportWsEventType.message]! - .add(TransportWsEventHandler(message: emitMessage)); + listeners[TransportWsEventType.message]!.add(TransportWsEventHandler(message: emitMessage)); final on = options.eventHandlers; if (on != null) { @@ -1193,8 +1177,7 @@ class LikeCloseEvent { }); @override - String toString() => - "LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)"; + String toString() => "LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)"; } bool _isFatalInternalCloseCode(int code) { @@ -1214,8 +1197,7 @@ bool _isFatalInternalCloseCode(int code) { } class TransportWebSocketLink extends Link { - TransportWebSocketLink(TransportWsClientOptions options) - : client = createClient(options); + TransportWebSocketLink(TransportWsClientOptions options) : client = createClient(options); final TransportWsClient client; @override