diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt index 1b76b69d..917d6bd5 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt @@ -16,10 +16,13 @@ import reactor.core.publisher.Sinks.EmitFailureHandler import reactor.core.publisher.Sinks.EmitResult import reactor.core.scheduler.Schedulers import reactor.netty.Connection +import reactor.netty.http.client.HttpClient +import reactor.netty.http.client.WebsocketClientSpec import reactor.netty.tcp.TcpClient import reactor.util.retry.Retry import reactor.util.retry.Retry.RetrySignal import java.net.InetSocketAddress +import java.net.URI import java.time.Duration import java.util.function.Function @@ -46,7 +49,8 @@ class FafLobbyClient( private lateinit var config: Config - private var connection: Connection? = null + private var connection: Connection? = null; + private var connectionDisposable: Disposable? = null private var pingDisposable: Disposable? = null private var connecting: Boolean = false @@ -82,9 +86,9 @@ class FafLobbyClient( } private val loginMono = Mono.defer { - openConnection() - .then(loginResponseMono) - .retryWhen(createRetrySpec(config)) + connectionDisposable?.dispose() + connectionDisposable = openConnection() + loginResponseMono.retryWhen(createRetrySpec(config)) } .doOnError { LOG.error("Error during connection", it); connection?.dispose() } .doOnCancel { LOG.debug("Login cancelled"); disconnect() } @@ -101,8 +105,7 @@ class FafLobbyClient( (emitResult == EmitResult.FAIL_NON_SERIALIZED) } - private val client = TcpClient.newConnection() - .resolver(DefaultAddressResolverGroup.INSTANCE) + private val webSocketClient = HttpClient.newConnection().resolver(DefaultAddressResolverGroup.INSTANCE) .doOnResolveError { connection, throwable -> LOG.error("Could not find server", throwable) connection.dispose() @@ -123,7 +126,7 @@ class FafLobbyClient( }.doOnResolveError { conn, throwable -> LOG.error("Error resolving", throwable) conn.dispose() - } + }.websocket() init { rawEvents.filter { it is ServerPingMessage }.doOnNext { send(ClientPongMessage()) }.subscribe() @@ -143,11 +146,9 @@ class FafLobbyClient( }.subscribe() } - private fun openConnection(): Mono { - return client - .wiretap(config.wiretap) - .host(config.host) - .port(config.port) + private fun openConnection(): Disposable { + return webSocketClient + .uri(URI("wss", "", config.host, config.port, "", "", "")) .handle { inbound, outbound -> val inboundMono = inbound.receive() .asString(Charsets.UTF_8) @@ -218,8 +219,8 @@ class FafLobbyClient( of the connections finishes */ Mono.firstWithSignal(inboundMono, outboundMono) } - .connect() .doOnSubscribe { connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) } + .subscribe() } override fun connectAndLogin(config: Config): Mono {