Skip to content

Commit

Permalink
fix(graphql-transport-ws): ensure result message is processed before …
Browse files Browse the repository at this point in the history
…complete message; do not close connection if keepalive is on
  • Loading branch information
agufagit committed Jul 23, 2024
1 parent dcb03fa commit 7a1be07
Showing 1 changed file with 40 additions and 58 deletions.
98 changes: 40 additions & 58 deletions links/gql_websocket_link/lib/src/graphql_transport_ws.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ import "dart:convert" show jsonDecode, jsonEncode;
import "dart:math" show Random;

import "package:gql_exec/gql_exec.dart" show Request, Response;
import "package:gql_link/gql_link.dart"
show Link, NextLink, RequestSerializer, ResponseParser;
import "package:gql_websocket_link/src/exceptions.dart"
show WebSocketLinkParserException, WebSocketLinkServerException;
import "package:gql_link/gql_link.dart" show Link, NextLink, RequestSerializer, ResponseParser;
import "package:gql_websocket_link/src/exceptions.dart" show WebSocketLinkParserException, WebSocketLinkServerException;
import "package:gql_websocket_link/src/messages.dart" show ConnectionAck;
import "package:web_socket_channel/web_socket_channel.dart"
show WebSocketChannel;
import "package:web_socket_channel/web_socket_channel.dart" show WebSocketChannel;

import "link.dart" show ChannelGenerator, GraphQLSocketMessageDecoder;
import "transport_ws_common.dart";
Expand Down Expand Up @@ -165,18 +162,15 @@ class TransportWsEventHandler<T> {
///
/// Also, the second argument is the optional payload that the server may
/// send through the `ConnectionAck` message.
final T? Function(WebSocketChannel socket, Map<String, Object?>? payload)?
connected;
final T? Function(WebSocketChannel socket, Map<String, Object?>? payload)? connected;

/// The first argument communicates whether the ping was received from the server.
/// If `false`, the ping was sent by the client.
final T? Function(Map<String, Object?>? payload, {required bool received})?
ping;
final T? Function(Map<String, Object?>? payload, {required bool received})? ping;

/// The first argument communicates whether the pong was received from the server.
/// If `false`, the pong was sent by the client.
final T? Function(Map<String, Object?>? payload, {required bool received})?
pong;
final T? Function(Map<String, Object?>? payload, {required bool received})? pong;

/// Called for all **valid** messages received by the client. Mainly useful for
/// debugging and logging received messages.
Expand Down Expand Up @@ -236,8 +230,7 @@ class WebSocketMaker {

/// A function that returns the URL of the GraphQL server.
/// Will be used to create the [WebSocketChannel].
const WebSocketMaker.url(FutureOr<String> Function() this.url)
: generator = null;
const WebSocketMaker.url(FutureOr<String> Function() this.url) : generator = null;

/// A generator that will be used to create the [WebSocketChannel].
const WebSocketMaker.generator(ChannelGenerator this.generator) : url = null;
Expand Down Expand Up @@ -443,13 +436,10 @@ class TransportWsClientOptions {
final ResponseParser parser;

/// A function that encodes the request message to json string before sending it over the network.
final FutureOr<Object> Function(TransportWsMessage message)
graphQLSocketMessageEncoder;
final FutureOr<Object> Function(TransportWsMessage message) graphQLSocketMessageEncoder;

/// The default [graphQLSocketMessageEncoder] that encodes the request message to json string.
static String defaultGraphQLSocketMessageEncoder(
TransportWsMessage message) =>
jsonEncode(message);
static String defaultGraphQLSocketMessageEncoder(TransportWsMessage message) => jsonEncode(message);

/// A function that decodes the incoming http response to `Map<String, dynamic>`,
/// the decoded map will be then passes to the `RequestSerializer`.
Expand All @@ -460,9 +450,7 @@ class TransportWsClientOptions {
final GraphQLSocketMessageDecoder graphQLSocketMessageDecoder;

/// The default [graphQLSocketMessageDecoder] that decodes the request message from a json string.
static Map<String, dynamic>? defaultGraphQLSocketMessageDecoder(
dynamic message) =>
jsonDecode(message as String) as Map<String, dynamic>?;
static Map<String, dynamic>? defaultGraphQLSocketMessageDecoder(dynamic message) => jsonDecode(message as String) as Map<String, dynamic>?;

/// A function that logs events within the execution of the [TransportWsClient].
/// Useful for debugging.
Expand Down Expand Up @@ -509,16 +497,14 @@ class TransportWsClientOptions {
}

/// non `CloseEvent`s are fatal by default
static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) =>
errOrCloseEvent is! LikeCloseEvent;
static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) => errOrCloseEvent is! LikeCloseEvent;

/// Generates a v4 UUID to be used as the ID using `Math`
/// as the random number generator. Supply your own generator
/// in case you need more uniqueness.
///
/// Reference: https://gist.github.com/jed/982883
static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
.replaceAllMapped(RegExp("[xy]"), (c) {
static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replaceAllMapped(RegExp("[xy]"), (c) {
final int r = (_random.nextDouble() * 16).floor() | 0;
final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8;
return v.toRadixString(16);
Expand Down Expand Up @@ -625,6 +611,8 @@ class _ConnectionState {
// TODO: WebSocketChannel should have a `state` getter and `onStateChange` stream
bool isOpen = false;

Map<String, Object?> nextOrErrorMsgWaitMap = {};

/// Checks the `connect` problem and evaluates if the client should retry.
bool shouldRetryConnectOrThrow(Object errOrCloseEvent) {
options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent");
Expand Down Expand Up @@ -713,11 +701,9 @@ class _ConnectionState {
Timer? queuedPing;
void enqueuePing() {
if (options.keepAlive.inMicroseconds > 0) {
queuedPing
?.cancel(); // in case where a pong was received before a ping (this is valid behaviour)
queuedPing?.cancel(); // in case where a pong was received before a ping (this is valid behaviour)
queuedPing = Timer(options.keepAlive, () async {
final _pingMsg =
await options.graphQLSocketMessageEncoder(PingMessage(null));
final _pingMsg = await options.graphQLSocketMessageEncoder(PingMessage(null));
if (isOpen) {
socket.sink.add(_pingMsg);
emitter.emit(TransportWsEvent.ping(null, received: false));
Expand All @@ -726,10 +712,8 @@ class _ConnectionState {
}
}

void Function(Object)? onError =
(Object err) => emitter.emit(TransportWsEvent.error(err));
void Function(Object)? onClose =
(Object event) => emitter.emit(TransportWsEvent.closed(event));
void Function(Object)? onError = (Object err) => emitter.emit(TransportWsEvent.error(err));
void Function(Object)? onClose = (Object event) => emitter.emit(TransportWsEvent.closed(event));
errorOrClosed((errOrEvent) {
options.log?.call("errorOrClosed $errOrEvent");
connecting = null;
Expand All @@ -754,9 +738,7 @@ class _ConnectionState {
isOpen = true;
try {
emitter.emit(TransportWsEvent.opened(socket));
final payload = options.connectionParams == null
? null
: await options.connectionParams!();
final payload = options.connectionParams == null ? null : await options.connectionParams!();

final _initMsg = await options.graphQLSocketMessageEncoder(
ConnectionInitMessage(payload),
Expand Down Expand Up @@ -813,27 +795,27 @@ class _ConnectionState {
}
// parseMessage(msg!, reviver: options.jsonMessageReviver);
if (!isOpen) return;

// wait for next or error message (result) to be processed before process complete message
while (message is CompleteMessage && nextOrErrorMsgWaitMap.containsKey(message.id)) {
await Future.delayed(const Duration(milliseconds: 100));
}

emitter.emit(TransportWsEvent.message(message));
if (message is PingMessage || message is PongMessage) {
final msgPayload = message is PingMessage
? message.payload
: (message as PongMessage).payload;
final msgPayload = message is PingMessage ? message.payload : (message as PongMessage).payload;
emitter.emit(
message is PingMessage
? TransportWsEvent.ping(msgPayload, received: true)
: TransportWsEvent.pong(msgPayload, received: true),
message is PingMessage ? TransportWsEvent.ping(msgPayload, received: true) : TransportWsEvent.pong(msgPayload, received: true),
); // received

if (message is PongMessage) {
enqueuePing(); // enqueue next ping (noop if disabled)
} else if (!options.disablePong) {
// respond with pong on ping
socket.sink.add(
await options
.graphQLSocketMessageEncoder(PongMessage(msgPayload)),
await options.graphQLSocketMessageEncoder(PongMessage(msgPayload)),
);
emitter
.emit(TransportWsEvent.pong(msgPayload, received: false));
emitter.emit(TransportWsEvent.pong(msgPayload, received: false));
}
return; // ping and pongs can be received whenever
}
Expand Down Expand Up @@ -920,7 +902,8 @@ class _ConnectionState {
waitForReleaseOrThrowOnClose: Future.any([
// wait for
released.then((_) {
if (locks == 0) {
// if released, no other operations, and not keep alive, wait for the socket to close
if (locks == 0 && options.keepAlive == Duration.zero) {
// and if no more locks are present, complete the connection
final complete = () {
isOpen = false;
Expand Down Expand Up @@ -998,10 +981,12 @@ class _Client extends TransportWsClient {
final unlisten = emitter.onMessage(id, (message) {
if (message is NextMessage) {
sink.add(message.payload);
state.nextOrErrorMsgWaitMap.remove(id);
} else if (message is ErrorMessage) {
errored = true;
done = true;
sink.addError(message.payload);
state.nextOrErrorMsgWaitMap.remove(id);
releaser();
} else if (message is CompleteMessage) {
done = true;
Expand All @@ -1011,9 +996,10 @@ class _Client extends TransportWsClient {

socket.sink.add(_subscribeMsg);

state.nextOrErrorMsgWaitMap[id] = null;

releaser = () async {
final _completeMsg =
await options.graphQLSocketMessageEncoder(CompleteMessage(id));
final _completeMsg = await options.graphQLSocketMessageEncoder(CompleteMessage(id));
if (!done && state.isOpen) {
// if not completed already and socket is open, send complete message to server on release
socket.sink.add(_completeMsg);
Expand Down Expand Up @@ -1145,13 +1131,11 @@ TransportWsClient createClient(TransportWsClientOptions options) {
_listenersMessage[message.id]?.call(message);
}

final Map<TransportWsEventType, List<TransportWsEventHandler>> listeners =
Map.fromIterables(
final Map<TransportWsEventType, List<TransportWsEventHandler>> listeners = Map.fromIterables(
TransportWsEventType.values,
TransportWsEventType.values.map((e) => []),
);
listeners[TransportWsEventType.message]!
.add(TransportWsEventHandler<void>(message: emitMessage));
listeners[TransportWsEventType.message]!.add(TransportWsEventHandler<void>(message: emitMessage));

final on = options.eventHandlers;
if (on != null) {
Expand Down Expand Up @@ -1193,8 +1177,7 @@ class LikeCloseEvent {
});

@override
String toString() =>
"LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)";
String toString() => "LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)";
}

bool _isFatalInternalCloseCode(int code) {
Expand All @@ -1214,8 +1197,7 @@ bool _isFatalInternalCloseCode(int code) {
}

class TransportWebSocketLink extends Link {
TransportWebSocketLink(TransportWsClientOptions options)
: client = createClient(options);
TransportWebSocketLink(TransportWsClientOptions options) : client = createClient(options);
final TransportWsClient client;

@override
Expand Down

0 comments on commit 7a1be07

Please sign in to comment.