Skip to content

Commit

Permalink
new implementation of internals
Browse files Browse the repository at this point in the history
* support for multiplexed transport
* simpler limiter implementation
  • Loading branch information
whyoleg committed Mar 13, 2024
1 parent d5aa453 commit bc2c384
Show file tree
Hide file tree
Showing 53 changed files with 2,169 additions and 1,487 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// for responder: metadata/payload is only accessible during execution of a function, or until Flow is consumed
public interface RSocket : CoroutineScope {

public suspend fun metadataPush(metadata: ByteReadPacket) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,20 +14,12 @@
* limitations under the License.
*/

package io.rsocket.kotlin.internal
package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import kotlinx.coroutines.channels.*

internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
try {
return block(this)
} catch (e: Throwable) {
close()
throw e
}
}

internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
trySend(element).onFailure { element.close() }
// TODO: Make public
internal enum class RSocketOperationType {
FireAndForget,
RequestResponse,
RequestStream,
RequestChannel,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin

// TODO: Make public, deciding on naming, drop Version in frame.io
internal data class RSocketProtocolVersion(
val major: Int,
val minor: Int,
) : Comparable<RSocketProtocolVersion> {

init {
check(major >= 0) { "Major version component should be non-negative but was $major" }
check(minor >= 0) { "Minor version component should be non-negative: but was $minor" }
}

override fun compareTo(other: RSocketProtocolVersion): Int {
return when (val majorResult = major.compareTo(other.major)) {
0 -> minor.compareTo(other.minor)
else -> majorResult
}
}

override fun toString(): String = "$major.$minor"

companion object {
val V1: RSocketProtocolVersion = RSocketProtocolVersion(1, 0)
val V2: RSocketProtocolVersion = RSocketProtocolVersion(2, 0)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.payload.*

// send/receive setup, resume, resume ok, lease, error
internal interface ConnectionEstablishmentContext {
// only setup|lease|resume|resume_ok|error frames
suspend fun receiveFrame(): Frame

suspend fun sendSetup(
version: Version,
honorLease: Boolean,
keepAlive: KeepAlive,
resumeToken: ByteReadPacket?,
payloadMimeType: PayloadMimeType,
payload: Payload,
)
}

internal interface ConnectionEstablishmentHandler {
val isClient: Boolean
suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig
}

internal abstract class AbstractConnectionEstablishmentContext(
private val bufferPool: BufferPool,
) : ConnectionEstablishmentContext {
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
protected abstract suspend fun receiveFrameRaw(): ByteReadPacket
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frame.toPacket(bufferPool))
override suspend fun receiveFrame(): Frame = receiveFrameRaw().readFrame(bufferPool)

override suspend fun sendSetup(
version: Version,
honorLease: Boolean,
keepAlive: KeepAlive,
resumeToken: ByteReadPacket?,
payloadMimeType: PayloadMimeType,
payload: Payload,
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import kotlinx.coroutines.*

internal interface ConnectionInbound {
fun receiveMetadataPush(metadata: ByteReadPacket)
fun receiveKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long)
fun receiveLease(ttl: Int, numberOfRequests: Int, metadata: ByteReadPacket?)
fun receiveError(cause: Throwable)
}

internal class ConnectionFrameHandler(private val inbound: ConnectionInbound) {
fun handleFrame(frame: Frame): Unit = when (frame) {
is MetadataPushFrame -> inbound.receiveMetadataPush(frame.metadata)
is KeepAliveFrame -> inbound.receiveKeepAlive(frame.respond, frame.data, frame.lastPosition)
is ErrorFrame -> inbound.receiveError(frame.throwable)
is LeaseFrame -> inbound.receiveLease(frame.ttl, frame.numberOfRequests, frame.metadata)
// ignore other frames
else -> frame.close()
}
}

internal class ConnectionInboundImpl(
private val sessionScope: CoroutineScope,
private val requestsScope: CoroutineScope,
private val responder: RSocket,
private val keepAliveHandler: KeepAliveHandler,
) : ConnectionInbound {
override fun receiveMetadataPush(metadata: ByteReadPacket) {
requestsScope.launch {
metadata.use { responder.metadataPush(it) }
}
}

override fun receiveKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
keepAliveHandler.receive(data, respond)
}

override fun receiveLease(ttl: Int, numberOfRequests: Int, metadata: ByteReadPacket?) {
metadata?.close()
sessionScope.cancel("Lease is not supported")
}

override fun receiveError(cause: Throwable) {
sessionScope.cancel("Session received an error", cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.io.*

internal interface ConnectionOutbound {
suspend fun sendMetadataPush(metadata: ByteReadPacket)
suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long)
suspend fun sendError(cause: Throwable)
}

internal abstract class AbstractConnectionOutbound(
private val bufferPool: BufferPool,
) : ConnectionOutbound {
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frame.toPacket(bufferPool))

override suspend fun sendError(cause: Throwable) {
sendFrame(ErrorFrame(0, cause))
}

override suspend fun sendMetadataPush(metadata: ByteReadPacket) {
sendFrame(MetadataPushFrame(metadata))
}

override suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
sendFrame(KeepAliveFrame(respond, lastPosition, data))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.kotlin.core

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.connection.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
Expand Down Expand Up @@ -61,39 +61,36 @@ public class RSocketConnector internal constructor(
}

private suspend fun connectOnce(transport: RSocketClientTarget): RSocket {
val connection = transport.createSession().logging(frameLogger, bufferPool)
check(connection is RSocketTransportSession.Sequential) { "multiplexed is not yet supported" }
val connectionConfig = try {
connectionConfigProvider()
val connectionConfig = connectionConfigProvider()
return try {
connect(
session = transport.createSession().logging(frameLogger, bufferPool),
maxFragmentSize = maxFragmentSize,
bufferPool = bufferPool,
handler = SetupConnection(connectionConfig),
acceptor = acceptor,
interceptors = interceptors
)
} catch (cause: Throwable) {
connection.cancel("Connection config provider failed", cause)
connectionConfig.setupPayload.close()
throw cause
}
val setupFrame = SetupFrame(
}
}

private class SetupConnection(private val connectionConfig: ConnectionConfig) : ConnectionEstablishmentHandler {
override val isClient: Boolean get() = true

override suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig {
context.sendSetup(
version = Version.Current,
honorLease = false,
keepAlive = connectionConfig.keepAlive,
resumeToken = null,
payloadMimeType = connectionConfig.payloadMimeType,
payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor
// copy needed, as it can be used in acceptor
payload = connectionConfig.setupPayload.copy()
)
try {
val requester = connect(
connection = connection,
isServer = false,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = connectionConfig,
acceptor = acceptor,
bufferPool = bufferPool
)
connection.sendFrame(bufferPool, setupFrame)
return requester
} catch (cause: Throwable) {
connectionConfig.setupPayload.close()
setupFrame.close()
connection.cancel("Connection establishment failed", cause)
throw cause
}
return connectionConfig
}
}
Loading

0 comments on commit bc2c384

Please sign in to comment.