Skip to content

Commit

Permalink
mostly done!
Browse files Browse the repository at this point in the history
some memory leaks to be found
  • Loading branch information
whyoleg committed Mar 11, 2024
1 parent 2e0fa06 commit 5e5620b
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private class MultiplexedConnection(
MultiplexedRequesterOperationFactory(maxFragmentSize, bufferPool, storage, session)

init {
session.coroutineContext.job.invokeOnCompletion { cause ->
session.coroutineContext.job.invokeOnCompletion { _ ->
config.setupPayload.close()
storage.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private class SequentialOperationOutbound(
private val storage: OperationStateStorage<OperationFrameHandler>,
private val prioritizer: SequentialFramePrioritizer,
) : AbstractOperationOutbound(streamId, maxFragmentSize, bufferPool) {
override suspend fun sendFrame(frame: ByteReadPacket): Unit = prioritizer.sendPriority(frame)
override suspend fun sendFrame(frame: ByteReadPacket): Unit = prioritizer.sendCommon(frame)
override fun close() {
storage.removeStream(streamId)?.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ internal suspend fun connect(
session.coroutineContext.job.invokeOnCompletion {
requester.cancel("Connection closed", it)
responder.cancel("Connection closed", it)
metadataFrames.cancelWithCause(it)
}

val keepAliveHandler = KeepAliveHandler(connection.config.keepAlive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.rsocket.kotlin.internal
package io.rsocket.kotlin.operation

import io.rsocket.kotlin.*
import io.rsocket.kotlin.payload.*
Expand All @@ -27,28 +27,32 @@ import kotlinx.coroutines.flow.*
internal inline fun requestFlow(
crossinline block: suspend FlowCollector<Payload>.(strategy: RequestStrategy.Element, initialRequest: Int) -> Unit,
): Flow<Payload> = object : RequestFlow() {
override suspend fun collect(
collector: FlowCollector<Payload>,
strategy: RequestStrategy.Element,
initialRequest: Int,
) {
collector.block(strategy, initialRequest)
override suspend fun FlowCollector<Payload>.collect(strategy: RequestStrategy.Element, initialRequest: Int) {
return block(strategy, initialRequest)
}
}

@ExperimentalStreamsApi
internal suspend inline fun FlowCollector<Payload>.emitAllWithRequestN(
channel: ReceiveChannel<Payload>,
internal suspend fun FlowCollector<Payload>.emitAllWithRequestN(
payloads: ReceiveChannel<Payload>,
requestNs: SendChannel<Int>,
strategy: RequestStrategy.Element,
crossinline onRequest: suspend (n: Int) -> Unit,
) {
val collector = object : RequestFlowCollector(this, strategy) {
override suspend fun onRequest(n: Int) {
@OptIn(DelicateCoroutinesApi::class)
if (!channel.isClosedForReceive) onRequest(n)
}
): Throwable? {
while (true) {
val result = payloads.receiveCatching()
if (result.isClosed) return result.exceptionOrNull()
emit(result.getOrThrow()) // will never throw

@OptIn(DelicateCoroutinesApi::class)
if (requestNs.isClosedForSend) continue

val next = strategy.nextRequest()
if (next <= 0) continue

// if this fails, it's means that requests no longer possible;
// next payloads.receiveCatching() should return a closed state
requestNs.trySend(next)
}
collector.emitAll(channel)
}

@ExperimentalStreamsApi
Expand All @@ -59,32 +63,9 @@ internal abstract class RequestFlow : Flow<Payload> {
check(!consumed.getAndSet(true)) { "RequestFlow can be collected just once" }

val strategy = currentCoroutineContext().requestStrategy()
val initial = strategy.firstRequest()
collect(collector, strategy, initial)
}

abstract suspend fun collect(
collector: FlowCollector<Payload>,
strategy: RequestStrategy.Element,
initialRequest: Int,
)
}

@ExperimentalStreamsApi
internal abstract class RequestFlowCollector(
private val collector: FlowCollector<Payload>,
private val strategy: RequestStrategy.Element,
) : FlowCollector<Payload> {
override suspend fun emit(value: Payload) {
try {
collector.emit(value)
val next = strategy.nextRequest()
if (next > 0) onRequest(next)
} catch (cause: Throwable) {
value.close()
throw cause
}
val initialRequest = strategy.firstRequest()
collector.collect(strategy, initialRequest)
}

abstract suspend fun onRequest(n: Int)
abstract suspend fun FlowCollector<Payload>.collect(strategy: RequestStrategy.Element, initialRequest: Int)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ internal abstract class RequesterOperation : OperationInbound {

// after `execute` is completed, no other interactions with the operation are possible
abstract suspend fun execute(outbound: OperationOutbound)

open val needCancelling: Boolean get() = true
}

internal interface RequesterOperationFactory {
Expand Down Expand Up @@ -64,9 +66,8 @@ internal class RequesterOperationExecutor(
} catch (cause: Throwable) {
operation.receiveProcessingError(cause)

// TODO: we don't need to send cancel if we already sent error for RC
// TODO: we don't need to send cancel if we have sent no frames
if (requestsScope.isActive) outbound.sendCancel()
if (requestsScope.isActive && operation.needCancelling) outbound.sendCancel()
throw cause // TODO: this exception will be lost and so most likely should
} finally {
outbound.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.rsocket.kotlin.operation

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
Expand All @@ -37,84 +36,86 @@ internal class RequesterRSocket(
}

override suspend fun fireAndForget(payload: Payload) {
val deferred = CompletableDeferred<Unit>()
val requestSentDeferred = CompletableDeferred<Unit>()
val requestJob = executor.executeRequest(
payload = payload,
complete = false,
initialRequest = 0,
operation = RequesterFireAndForgetOperation(deferred)
operation = RequesterFireAndForgetOperation(requestSentDeferred)
)
try {
deferred.join()
requestSentDeferred.join()
} catch (cause: Throwable) {
requestJob.cancel("Request cancelled", cause)
deferred.cancel()
requestSentDeferred.cancel()
throw cause
}
return deferred.await()
return requestSentDeferred.await()
}

override suspend fun requestResponse(payload: Payload): Payload {
val deferred = CompletableDeferred<Payload>()
val responseDeferred = CompletableDeferred<Payload>()
val requestJob = executor.executeRequest(
payload = payload,
complete = false,
initialRequest = 0,
operation = RequesterRequestResponseOperation(deferred)
operation = RequesterRequestResponseOperation(responseDeferred)
)
try {
deferred.join()
responseDeferred.join()
} catch (cause: Throwable) {
requestJob.cancel("Request cancelled", cause)
deferred.cancel()
responseDeferred.cancel()
throw cause
}

return deferred.await()
return responseDeferred.await()
}

@OptIn(ExperimentalStreamsApi::class)
override fun requestStream(payload: Payload): Flow<Payload> = requestFlow { strategy, initialRequest ->
val payloads = channelForCloseable<Payload>(Channel.UNLIMITED) // TODO: should be configurable
val requests = Channel<Int>(Channel.UNLIMITED)
val responsePayloads = channelForCloseable<Payload>(Channel.UNLIMITED) // TODO: should be configurable
val requestNs = Channel<Int>(Channel.UNLIMITED)

val requestJob = executor.executeRequest(
payload = payload,
complete = false,
initialRequest = initialRequest,
operation = RequesterRequestStreamOperation(requests, payloads)
operation = RequesterRequestStreamOperation(requestNs, responsePayloads)
)

val error: Throwable?
try {
while (true) {
val result = payloads.receiveCatching()
if (result.isClosed) {
error = result.exceptionOrNull()
break
}
emit(result.getOrThrow()) // will never throw

@OptIn(DelicateCoroutinesApi::class)
if (!requests.isClosedForSend) {
val next = strategy.nextRequest()
if (next > 0) {
// if this fails, it's means that requests no longer possible;
// next payloads.receiveCatching() should return a closed state
requests.trySend(next)
}
}
}
val error = try {
emitAllWithRequestN(responsePayloads, requestNs, strategy)
} catch (cause: Throwable) {
requestJob.cancel("Request cancelled", cause)
throw cause
} finally {
requests.cancel() // no more requestN can be sent
responsePayloads.cancel() // no more payloads can be received
requestNs.cancel() // no more requestN can be sent
}
throw error ?: return@requestFlow
}

override fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> {
TODO()
@OptIn(ExperimentalStreamsApi::class)
override fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> = requestFlow { strategy, initialRequest ->
val responsePayloads = channelForCloseable<Payload>(Channel.UNLIMITED) // TODO: should be configurable
val requestNs = Channel<Int>(Channel.UNLIMITED)

val requestJob = executor.executeRequest(
payload = initPayload,
complete = false,
initialRequest = initialRequest,
operation = RequesterRequestChannelOperation(requestNs, responsePayloads, payloads)
)

val error = try {
emitAllWithRequestN(responsePayloads, requestNs, strategy)
} catch (cause: Throwable) {
requestJob.cancel("Request cancelled", cause)
throw cause
} finally {
responsePayloads.cancel() // no more payloads can be received
requestNs.cancel() // no more requestN can be sent
}
throw error ?: return@requestFlow
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import kotlinx.coroutines.flow.*

internal class RequesterRequestChannelOperation(
private val requestNs: ReceiveChannel<Int>,
private val responsePayloadChannel: SendChannel<Payload>,
private val responsePayloads: SendChannel<Payload>,
private val requestPayloads: Flow<Payload>,
) : RequesterOperation() {
override val type: RSocketOperationType get() = RSocketOperationType.RequestChannel

private val limiter = Limiter(0)
private var senderJob: Job? = null
override var needCancelling = true
private set

override suspend fun execute(outbound: OperationOutbound): Unit = coroutineScope {
senderJob = launch {
Expand All @@ -43,64 +45,54 @@ internal class RequesterRequestChannelOperation(
outbound.sendComplete()
} catch (cause: Throwable) {
// senderJob could be cancelled
if (isActive) outbound.sendError(cause)
if (isActive) {
outbound.sendError(cause)
needCancelling = false
}
throw cause // failing senderJob here will fail request
}
}
while (true) {
outbound.sendRequestN(requestNs.receiveCatching().getOrNull() ?: break)
}
senderJob?.join()
}

override fun isFrameExpected(frameType: FrameType): Boolean {
TODO("Not yet implemented")
@OptIn(DelicateCoroutinesApi::class)
override fun isFrameExpected(frameType: FrameType): Boolean = when {
!responsePayloads.isClosedForSend -> frameType == FrameType.Payload || frameType == FrameType.Error
else -> false
} || when {
senderJob?.isActive == true -> frameType == FrameType.RequestN || frameType == FrameType.Cancel
else -> false
}

override fun receiveRequestN(requestN: Int) {
limiter.updateRequests(requestN)
}

// TODO: handle half-closed state
// override fun receiveNext(payload: Payload, complete: Boolean) {
// if (responsePayloadChannel.trySend(payload).isFailure) {
// if (senderJob?.isActive == true) {
// // payload received, but in the meanwhile request was canceled ???
// payload.close()
// requestNs.cancel() // no more requestN can be sent
// return ReceiveResult.Continue
// } else {
//
// }
// }
// if (complete) return receiveComplete()
// return ReceiveResult.Continue
// }
//
// override fun receiveComplete(): ReceiveResult {
// responsePayloadChannel.close()
// requestNs.cancel() // no more requestN can be sent
// return ReceiveResult.Done
// }
//
// override fun receiveError(cause: Throwable): ReceiveResult {
// responsePayloadChannel.close(cause)
// requestNs.cancel() // no more requestN can be sent
// return ReceiveResult.Done
// }
//
// override fun receiveRequestN(requestN: Int): ReceiveResult {
// limiter.updateRequests(requestN)
// return ReceiveResult.Continue
// }
//
// override fun receiveCancel(): ReceiveResult {
// // we should stop sending, but not receiving
// senderJob?.cancel("Something cancelled")
//
// // if receiving stopped - we should stop
// return ReceiveResult.Continue
// }
override fun receiveNext(payload: Payload?, complete: Boolean) {
if (payload != null) {
if (responsePayloads.trySend(payload).isFailure) payload.close()
}
if (complete) closeChannels(null)
}

override fun receiveCancel() {
senderJob?.cancel("Request payloads cancelled")
}

override fun receiveError(cause: Throwable) {
closeChannels(cause)
senderJob?.cancel("Error received from remote", cause)
}

override fun receiveProcessingError(cause: Throwable) {
// cancel sender job
// cancel channels
TODO("Not yet implemented")
closeChannels(cause)
senderJob?.cancel("Request processing failure", cause)
}

private fun closeChannels(cause: Throwable?) {
responsePayloads.close(cause)
requestNs.cancel() // no more requestN can be sent
}
}
Loading

0 comments on commit 5e5620b

Please sign in to comment.