diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt index e30176f4..f2a98c53 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt @@ -17,12 +17,9 @@ import reactor.core.publisher.Sinks.EmitResult import reactor.core.scheduler.Schedulers import reactor.netty.Connection import reactor.netty.http.client.HttpClient -import reactor.netty.http.client.WebsocketClientSpec -import reactor.netty.tcp.TcpClient import reactor.util.retry.Retry import reactor.util.retry.Retry.RetrySignal import java.net.InetSocketAddress -import java.net.URI import java.time.Duration import java.util.function.Function @@ -106,7 +103,7 @@ class FafLobbyClient( (emitResult == EmitResult.FAIL_NON_SERIALIZED) } - private val webSocketClient = HttpClient.newConnection() + private val httpClient = HttpClient.newConnection() .resolver(DefaultAddressResolverGroup.INSTANCE) .doOnResolveError { connection, throwable -> LOG.error("Could not find server", throwable) @@ -129,7 +126,7 @@ class FafLobbyClient( }.doOnResolveError { conn, throwable -> LOG.error("Error resolving", throwable) conn.dispose() - }.websocket() + } init { rawEvents.filter { it is ServerPingMessage }.doOnNext { send(ClientPongMessage()) }.subscribe() @@ -151,7 +148,9 @@ class FafLobbyClient( private fun openConnection() { LOG.debug("Opening connection") - webSocketClient + httpClient + .wiretap(config.wiretap) + .websocket() .uri(config.url) .handle { inbound, outbound -> val inboundMono = inbound.receive() @@ -188,7 +187,7 @@ class FafLobbyClient( pingDisposable = pingWithDelay() .subscribeOn(Schedulers.single()) .subscribe() - } + }.log() .then() val outboundMono = outbound.sendString( @@ -218,19 +217,19 @@ class FafLobbyClient( LOG.error("Error during serialization of message {}", it, throwable) Mono.empty() } - } + }.log() ).then() /* The lobby protocol requires two-way communication. If either the outbound or inbound connections complete/close then we are better off closing the connection to the server. This is why we return a mono that completes when one of the connections finishes */ - Mono.firstWithSignal(inboundMono, outboundMono) + Mono.firstWithSignal(inboundMono, outboundMono).log() } .doOnCancel { LOG.info("Connection cancelled") } .doOnSubscribe { LOG.debug("Beginning connection process") connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) - } + }.log() .subscribe(null, { LOG.warn("Error in connection", it) connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)