Skip to content

Commit

Permalink
chore(gql_websocket_link): format graphql_transport_ws.dart
Browse files Browse the repository at this point in the history
  • Loading branch information
agufagit committed Jul 23, 2024
1 parent 7a1be07 commit 70a19c3
Showing 1 changed file with 59 additions and 27 deletions.
86 changes: 59 additions & 27 deletions links/gql_websocket_link/lib/src/graphql_transport_ws.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ import "dart:convert" show jsonDecode, jsonEncode;
import "dart:math" show Random;

import "package:gql_exec/gql_exec.dart" show Request, Response;
import "package:gql_link/gql_link.dart" show Link, NextLink, RequestSerializer, ResponseParser;
import "package:gql_websocket_link/src/exceptions.dart" show WebSocketLinkParserException, WebSocketLinkServerException;
import "package:gql_link/gql_link.dart"
show Link, NextLink, RequestSerializer, ResponseParser;
import "package:gql_websocket_link/src/exceptions.dart"
show WebSocketLinkParserException, WebSocketLinkServerException;
import "package:gql_websocket_link/src/messages.dart" show ConnectionAck;
import "package:web_socket_channel/web_socket_channel.dart" show WebSocketChannel;
import "package:web_socket_channel/web_socket_channel.dart"
show WebSocketChannel;

import "link.dart" show ChannelGenerator, GraphQLSocketMessageDecoder;
import "transport_ws_common.dart";
Expand Down Expand Up @@ -162,15 +165,18 @@ class TransportWsEventHandler<T> {
///
/// Also, the second argument is the optional payload that the server may
/// send through the `ConnectionAck` message.
final T? Function(WebSocketChannel socket, Map<String, Object?>? payload)? connected;
final T? Function(WebSocketChannel socket, Map<String, Object?>? payload)?
connected;

/// The first argument communicates whether the ping was received from the server.
/// If `false`, the ping was sent by the client.
final T? Function(Map<String, Object?>? payload, {required bool received})? ping;
final T? Function(Map<String, Object?>? payload, {required bool received})?
ping;

/// The first argument communicates whether the pong was received from the server.
/// If `false`, the pong was sent by the client.
final T? Function(Map<String, Object?>? payload, {required bool received})? pong;
final T? Function(Map<String, Object?>? payload, {required bool received})?
pong;

/// Called for all **valid** messages received by the client. Mainly useful for
/// debugging and logging received messages.
Expand Down Expand Up @@ -230,7 +236,8 @@ class WebSocketMaker {

/// A function that returns the URL of the GraphQL server.
/// Will be used to create the [WebSocketChannel].
const WebSocketMaker.url(FutureOr<String> Function() this.url) : generator = null;
const WebSocketMaker.url(FutureOr<String> Function() this.url)
: generator = null;

/// A generator that will be used to create the [WebSocketChannel].
const WebSocketMaker.generator(ChannelGenerator this.generator) : url = null;
Expand Down Expand Up @@ -436,10 +443,13 @@ class TransportWsClientOptions {
final ResponseParser parser;

/// A function that encodes the request message to json string before sending it over the network.
final FutureOr<Object> Function(TransportWsMessage message) graphQLSocketMessageEncoder;
final FutureOr<Object> Function(TransportWsMessage message)
graphQLSocketMessageEncoder;

/// The default [graphQLSocketMessageEncoder] that encodes the request message to json string.
static String defaultGraphQLSocketMessageEncoder(TransportWsMessage message) => jsonEncode(message);
static String defaultGraphQLSocketMessageEncoder(
TransportWsMessage message) =>
jsonEncode(message);

/// A function that decodes the incoming http response to `Map<String, dynamic>`,
/// the decoded map will be then passes to the `RequestSerializer`.
Expand All @@ -450,7 +460,9 @@ class TransportWsClientOptions {
final GraphQLSocketMessageDecoder graphQLSocketMessageDecoder;

/// The default [graphQLSocketMessageDecoder] that decodes the request message from a json string.
static Map<String, dynamic>? defaultGraphQLSocketMessageDecoder(dynamic message) => jsonDecode(message as String) as Map<String, dynamic>?;
static Map<String, dynamic>? defaultGraphQLSocketMessageDecoder(
dynamic message) =>
jsonDecode(message as String) as Map<String, dynamic>?;

/// A function that logs events within the execution of the [TransportWsClient].
/// Useful for debugging.
Expand Down Expand Up @@ -497,14 +509,16 @@ class TransportWsClientOptions {
}

/// non `CloseEvent`s are fatal by default
static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) => errOrCloseEvent is! LikeCloseEvent;
static bool isFatalConnectionProblemDefault(Object errOrCloseEvent) =>
errOrCloseEvent is! LikeCloseEvent;

/// Generates a v4 UUID to be used as the ID using `Math`
/// as the random number generator. Supply your own generator
/// in case you need more uniqueness.
///
/// Reference: https://gist.github.com/jed/982883
static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replaceAllMapped(RegExp("[xy]"), (c) {
static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
.replaceAllMapped(RegExp("[xy]"), (c) {
final int r = (_random.nextDouble() * 16).floor() | 0;
final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8;
return v.toRadixString(16);
Expand Down Expand Up @@ -701,9 +715,11 @@ class _ConnectionState {
Timer? queuedPing;
void enqueuePing() {
if (options.keepAlive.inMicroseconds > 0) {
queuedPing?.cancel(); // in case where a pong was received before a ping (this is valid behaviour)
queuedPing
?.cancel(); // in case where a pong was received before a ping (this is valid behaviour)
queuedPing = Timer(options.keepAlive, () async {
final _pingMsg = await options.graphQLSocketMessageEncoder(PingMessage(null));
final _pingMsg =
await options.graphQLSocketMessageEncoder(PingMessage(null));
if (isOpen) {
socket.sink.add(_pingMsg);
emitter.emit(TransportWsEvent.ping(null, received: false));
Expand All @@ -712,8 +728,10 @@ class _ConnectionState {
}
}

void Function(Object)? onError = (Object err) => emitter.emit(TransportWsEvent.error(err));
void Function(Object)? onClose = (Object event) => emitter.emit(TransportWsEvent.closed(event));
void Function(Object)? onError =
(Object err) => emitter.emit(TransportWsEvent.error(err));
void Function(Object)? onClose =
(Object event) => emitter.emit(TransportWsEvent.closed(event));
errorOrClosed((errOrEvent) {
options.log?.call("errorOrClosed $errOrEvent");
connecting = null;
Expand All @@ -738,7 +756,9 @@ class _ConnectionState {
isOpen = true;
try {
emitter.emit(TransportWsEvent.opened(socket));
final payload = options.connectionParams == null ? null : await options.connectionParams!();
final payload = options.connectionParams == null
? null
: await options.connectionParams!();

final _initMsg = await options.graphQLSocketMessageEncoder(
ConnectionInitMessage(payload),
Expand Down Expand Up @@ -797,25 +817,32 @@ class _ConnectionState {
if (!isOpen) return;

// wait for next or error message (result) to be processed before process complete message
while (message is CompleteMessage && nextOrErrorMsgWaitMap.containsKey(message.id)) {
while (message is CompleteMessage &&
nextOrErrorMsgWaitMap.containsKey(message.id)) {
await Future.delayed(const Duration(milliseconds: 100));
}

emitter.emit(TransportWsEvent.message(message));
if (message is PingMessage || message is PongMessage) {
final msgPayload = message is PingMessage ? message.payload : (message as PongMessage).payload;
final msgPayload = message is PingMessage
? message.payload
: (message as PongMessage).payload;
emitter.emit(
message is PingMessage ? TransportWsEvent.ping(msgPayload, received: true) : TransportWsEvent.pong(msgPayload, received: true),
message is PingMessage
? TransportWsEvent.ping(msgPayload, received: true)
: TransportWsEvent.pong(msgPayload, received: true),
); // received

if (message is PongMessage) {
enqueuePing(); // enqueue next ping (noop if disabled)
} else if (!options.disablePong) {
// respond with pong on ping
socket.sink.add(
await options.graphQLSocketMessageEncoder(PongMessage(msgPayload)),
await options
.graphQLSocketMessageEncoder(PongMessage(msgPayload)),
);
emitter.emit(TransportWsEvent.pong(msgPayload, received: false));
emitter
.emit(TransportWsEvent.pong(msgPayload, received: false));
}
return; // ping and pongs can be received whenever
}
Expand Down Expand Up @@ -999,7 +1026,8 @@ class _Client extends TransportWsClient {
state.nextOrErrorMsgWaitMap[id] = null;

releaser = () async {
final _completeMsg = await options.graphQLSocketMessageEncoder(CompleteMessage(id));
final _completeMsg =
await options.graphQLSocketMessageEncoder(CompleteMessage(id));
if (!done && state.isOpen) {
// if not completed already and socket is open, send complete message to server on release
socket.sink.add(_completeMsg);
Expand Down Expand Up @@ -1131,11 +1159,13 @@ TransportWsClient createClient(TransportWsClientOptions options) {
_listenersMessage[message.id]?.call(message);
}

final Map<TransportWsEventType, List<TransportWsEventHandler>> listeners = Map.fromIterables(
final Map<TransportWsEventType, List<TransportWsEventHandler>> listeners =
Map.fromIterables(
TransportWsEventType.values,
TransportWsEventType.values.map((e) => []),
);
listeners[TransportWsEventType.message]!.add(TransportWsEventHandler<void>(message: emitMessage));
listeners[TransportWsEventType.message]!
.add(TransportWsEventHandler<void>(message: emitMessage));

final on = options.eventHandlers;
if (on != null) {
Expand Down Expand Up @@ -1177,7 +1207,8 @@ class LikeCloseEvent {
});

@override
String toString() => "LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)";
String toString() =>
"LikeCloseEvent(code: $code, reason: $reason, wasClean: $wasClean)";
}

bool _isFatalInternalCloseCode(int code) {
Expand All @@ -1197,7 +1228,8 @@ bool _isFatalInternalCloseCode(int code) {
}

class TransportWebSocketLink extends Link {
TransportWebSocketLink(TransportWsClientOptions options) : client = createClient(options);
TransportWebSocketLink(TransportWsClientOptions options)
: client = createClient(options);
final TransportWsClient client;

@override
Expand Down

0 comments on commit 70a19c3

Please sign in to comment.