Skip to content

Commit

Permalink
Websocket lobby client
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 committed Oct 9, 2023
1 parent 4a3facc commit 32893c9
Showing 1 changed file with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ import reactor.core.publisher.Sinks.EmitFailureHandler
import reactor.core.publisher.Sinks.EmitResult
import reactor.core.scheduler.Schedulers
import reactor.netty.Connection
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.WebsocketClientSpec
import reactor.netty.tcp.TcpClient
import reactor.util.retry.Retry
import reactor.util.retry.Retry.RetrySignal
import java.net.InetSocketAddress
import java.net.URI
import java.time.Duration
import java.util.function.Function

Expand All @@ -46,7 +49,8 @@ class FafLobbyClient(

private lateinit var config: Config

private var connection: Connection? = null
private var connection: Connection? = null;
private var connectionDisposable: Disposable? = null
private var pingDisposable: Disposable? = null
private var connecting: Boolean = false

Expand Down Expand Up @@ -82,9 +86,9 @@ class FafLobbyClient(
}

private val loginMono = Mono.defer {
openConnection()
.then(loginResponseMono)
.retryWhen(createRetrySpec(config))
connectionDisposable?.dispose()
connectionDisposable = openConnection()
loginResponseMono.retryWhen(createRetrySpec(config))
}
.doOnError { LOG.error("Error during connection", it); connection?.dispose() }
.doOnCancel { LOG.debug("Login cancelled"); disconnect() }
Expand All @@ -101,8 +105,7 @@ class FafLobbyClient(
(emitResult == EmitResult.FAIL_NON_SERIALIZED)
}

private val client = TcpClient.newConnection()
.resolver(DefaultAddressResolverGroup.INSTANCE)
private val webSocketClient = HttpClient.newConnection().resolver(DefaultAddressResolverGroup.INSTANCE)
.doOnResolveError { connection, throwable ->
LOG.error("Could not find server", throwable)
connection.dispose()
Expand All @@ -123,7 +126,7 @@ class FafLobbyClient(
}.doOnResolveError { conn, throwable ->
LOG.error("Error resolving", throwable)
conn.dispose()
}
}.websocket()

init {
rawEvents.filter { it is ServerPingMessage }.doOnNext { send(ClientPongMessage()) }.subscribe()
Expand All @@ -143,11 +146,9 @@ class FafLobbyClient(
}.subscribe()
}

private fun openConnection(): Mono<out Connection> {
return client
.wiretap(config.wiretap)
.host(config.host)
.port(config.port)
private fun openConnection(): Disposable {
return webSocketClient
.uri(URI("wss", "", config.host, config.port, "", "", ""))
.handle { inbound, outbound ->
val inboundMono = inbound.receive()
.asString(Charsets.UTF_8)
Expand Down Expand Up @@ -218,8 +219,8 @@ class FafLobbyClient(
of the connections finishes */
Mono.firstWithSignal(inboundMono, outboundMono)
}
.connect()
.doOnSubscribe { connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) }
.subscribe()
}

override fun connectAndLogin(config: Config): Mono<Player> {
Expand Down

0 comments on commit 32893c9

Please sign in to comment.