diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteAniTorrentEngine.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteAniTorrentEngine.aidl index 5ffe0d1afa..b04e0f3484 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteAniTorrentEngine.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteAniTorrentEngine.aidl @@ -2,9 +2,9 @@ package me.him188.ani.app.domain.torrent; import me.him188.ani.app.domain.torrent.IRemoteTorrentDownloader; -import me.him188.ani.app.domain.torrent.IAnitorrentConfigCollector; -import me.him188.ani.app.domain.torrent.IProxySettingsCollector; -import me.him188.ani.app.domain.torrent.ITorrentPeerConfigCollector; +import me.him188.ani.app.domain.torrent.collector.IAnitorrentConfigCollector; +import me.him188.ani.app.domain.torrent.collector.IProxySettingsCollector; +import me.him188.ani.app.domain.torrent.collector.ITorrentPeerConfigCollector; // Declare any non-default types here with import statements diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentDownloader.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentDownloader.aidl index bf40f17fae..546939dd4b 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentDownloader.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentDownloader.aidl @@ -1,7 +1,9 @@ // IRemoteTorrentDownloader.aidl package me.him188.ani.app.domain.torrent; -import me.him188.ani.app.domain.torrent.ITorrentDownloaderStatsCallback; +import me.him188.ani.app.domain.torrent.callback.ITorrentDownloaderStatsCallback; +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderFetchTorrent; +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderStartDownload; import me.him188.ani.app.domain.torrent.IRemoteTorrentSession; import me.him188.ani.app.domain.torrent.IDisposableHandle; import me.him188.ani.app.domain.torrent.parcel.PTorrentLibInfo; @@ -14,9 +16,9 @@ interface IRemoteTorrentDownloader { PTorrentLibInfo getVendor(); - PEncodedTorrentInfo fetchTorrent(in String uri, int timeoutSeconds); + IDisposableHandle fetchTorrent(in String uri, int timeoutSeconds, in ContTorrentDownloaderFetchTorrent cont); - IRemoteTorrentSession startDownload(in PEncodedTorrentInfo data, in String overrideSaveDir); + IDisposableHandle startDownload(in PEncodedTorrentInfo data, in String overrideSaveDir, in ContTorrentDownloaderStartDownload cont); String getSaveDirForTorrent(in PEncodedTorrentInfo data); diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentFileEntry.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentFileEntry.aidl index 9ca622f653..55bbf70102 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentFileEntry.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentFileEntry.aidl @@ -1,7 +1,9 @@ // IRemoteTorrentFileEntry.aidl package me.him188.ani.app.domain.torrent; -import me.him188.ani.app.domain.torrent.ITorrentFileEntryStatsCallback; +import me.him188.ani.app.domain.torrent.callback.ITorrentFileEntryStatsCallback; +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryGetInputParams; +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryResolveFile; import me.him188.ani.app.domain.torrent.IRemotePieceList; import me.him188.ani.app.domain.torrent.IRemoteTorrentFileHandle; import me.him188.ani.app.domain.torrent.IDisposableHandle; @@ -22,11 +24,11 @@ interface IRemoteTorrentFileEntry { IRemoteTorrentFileHandle createHandle(); - String resolveFile(); + IDisposableHandle resolveFile(in ContTorrentFileEntryResolveFile cont); String resolveFileMaybeEmptyOrNull(); - PTorrentInputParameter getTorrentInputParams(); + IDisposableHandle getTorrentInputParams(in ContTorrentFileEntryGetInputParams cont); void torrentInputOnWait(int pieceIndex); } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentSession.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentSession.aidl index d42bd2b8a3..2387a5781b 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentSession.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IRemoteTorrentSession.aidl @@ -1,7 +1,8 @@ // IRemoteTorrentSession.aidl package me.him188.ani.app.domain.torrent; -import me.him188.ani.app.domain.torrent.ITorrentSessionStatsCallback; +import me.him188.ani.app.domain.torrent.callback.ITorrentSessionStatsCallback; +import me.him188.ani.app.domain.torrent.cont.ContTorrentSessionGetFiles; import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntryList; import me.him188.ani.app.domain.torrent.parcel.PPeerInfo; import me.him188.ani.app.domain.torrent.IDisposableHandle; @@ -13,7 +14,7 @@ interface IRemoteTorrentSession { String getName(); - IRemoteTorrentFileEntryList getFiles(); + IDisposableHandle getFiles(in ContTorrentSessionGetFiles cont); PPeerInfo[] getPeers(); diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentDownloaderStatsCallback.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentDownloaderStatsCallback.aidl similarity index 84% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentDownloaderStatsCallback.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentDownloaderStatsCallback.aidl index ef0841088f..4601e2c530 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentDownloaderStatsCallback.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentDownloaderStatsCallback.aidl @@ -1,5 +1,5 @@ // ITorrentDownloaderStatsCallback.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.callback; import me.him188.ani.app.domain.torrent.parcel.PTorrentDownloaderStats; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentFileEntryStatsCallback.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentFileEntryStatsCallback.aidl similarity index 83% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentFileEntryStatsCallback.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentFileEntryStatsCallback.aidl index ef1c50daeb..387771b59d 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentFileEntryStatsCallback.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentFileEntryStatsCallback.aidl @@ -1,5 +1,5 @@ // ITorrentFileEntryStatsCallback.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.callback; import me.him188.ani.app.domain.torrent.parcel.PTorrentFileEntryStats; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentSessionStatsCallback.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentSessionStatsCallback.aidl similarity index 83% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentSessionStatsCallback.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentSessionStatsCallback.aidl index 7b3b7f9249..700df02918 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentSessionStatsCallback.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/callback/ITorrentSessionStatsCallback.aidl @@ -1,5 +1,5 @@ // ITorrentSessionStatsCallback.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.callback; import me.him188.ani.app.domain.torrent.parcel.PTorrentSessionStats; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IAnitorrentConfigCollector.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IAnitorrentConfigCollector.aidl similarity index 80% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IAnitorrentConfigCollector.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IAnitorrentConfigCollector.aidl index ffc691775e..bcde8fd288 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IAnitorrentConfigCollector.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IAnitorrentConfigCollector.aidl @@ -1,5 +1,5 @@ // IAnitorrentConfigCollector.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.collector; import me.him188.ani.app.domain.torrent.parcel.PAnitorrentConfig; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IProxySettingsCollector.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IProxySettingsCollector.aidl similarity index 79% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IProxySettingsCollector.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IProxySettingsCollector.aidl index 7c2487b690..188cd4775a 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/IProxySettingsCollector.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/IProxySettingsCollector.aidl @@ -1,5 +1,5 @@ // IProxySettingsCollector.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.collector; import me.him188.ani.app.domain.torrent.parcel.PProxySettings; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentPeerConfigCollector.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/ITorrentPeerConfigCollector.aidl similarity index 80% rename from app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentPeerConfigCollector.aidl rename to app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/ITorrentPeerConfigCollector.aidl index dcee53990e..c1b18a41ff 100644 --- a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/ITorrentPeerConfigCollector.aidl +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/collector/ITorrentPeerConfigCollector.aidl @@ -1,5 +1,5 @@ // ITorrentPeerConfigCollector.aidl -package me.him188.ani.app.domain.torrent; +package me.him188.ani.app.domain.torrent.collector; import me.him188.ani.app.domain.torrent.parcel.PTorrentPeerConfig; diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderFetchTorrent.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderFetchTorrent.aidl new file mode 100644 index 0000000000..a78fb20304 --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderFetchTorrent.aidl @@ -0,0 +1,12 @@ +// ContTorrentDownloaderFetchTorrent.aidl +package me.him188.ani.app.domain.torrent.cont; + +import me.him188.ani.app.domain.torrent.parcel.PEncodedTorrentInfo; +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException; + +// Declare any non-default types here with import statements + +interface ContTorrentDownloaderFetchTorrent { + void resume(in PEncodedTorrentInfo value); + void resumeWithException(in RemoteContinuationException exception); +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderStartDownload.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderStartDownload.aidl new file mode 100644 index 0000000000..bb14c57d3b --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentDownloaderStartDownload.aidl @@ -0,0 +1,12 @@ +// ContTorrentDownloaderStartDownload.aidl +package me.him188.ani.app.domain.torrent.cont; + +import me.him188.ani.app.domain.torrent.IRemoteTorrentSession; +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException; + +// Declare any non-default types here with import statements + +interface ContTorrentDownloaderStartDownload { + void resume(in IRemoteTorrentSession value); + void resumeWithException(in RemoteContinuationException exception); +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryGetInputParams.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryGetInputParams.aidl new file mode 100644 index 0000000000..6134f51fa8 --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryGetInputParams.aidl @@ -0,0 +1,12 @@ +// ContTorrentFileEntryGetInputParams.aidl +package me.him188.ani.app.domain.torrent.cont; + +import me.him188.ani.app.domain.torrent.parcel.PTorrentInputParameter; +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException; + +// Declare any non-default types here with import statements + +interface ContTorrentFileEntryGetInputParams { + void resume(in PTorrentInputParameter value); + void resumeWithException(in RemoteContinuationException exception); +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryResolveFile.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryResolveFile.aidl new file mode 100644 index 0000000000..111e6108b1 --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentFileEntryResolveFile.aidl @@ -0,0 +1,12 @@ +// ContTorrentFileEntryResolveFile.aidl +package me.him188.ani.app.domain.torrent.cont; + +import me.him188.ani.app.domain.torrent.parcel.PTorrentInputParameter; +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException; + +// Declare any non-default types here with import statements + +interface ContTorrentFileEntryResolveFile { + void resume(in String value); + void resumeWithException(in RemoteContinuationException exception); +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentSessionGetFiles.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentSessionGetFiles.aidl new file mode 100644 index 0000000000..d9f47b4973 --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/cont/ContTorrentSessionGetFiles.aidl @@ -0,0 +1,12 @@ +// ContTorrentSessionGetFiles.aidl +package me.him188.ani.app.domain.torrent.cont; + +import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntryList; +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException; + +// Declare any non-default types here with import statements + +interface ContTorrentSessionGetFiles { + void resume(in IRemoteTorrentFileEntryList value); + void resumeWithException(in RemoteContinuationException exception); +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/parcel/RemoteContinuationException.aidl b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/parcel/RemoteContinuationException.aidl new file mode 100644 index 0000000000..6f585f24a1 --- /dev/null +++ b/app/shared/app-data/src/androidMain/aidl/me/him188/ani/app/domain/torrent/parcel/RemoteContinuationException.aidl @@ -0,0 +1,4 @@ +// RemoteContinuationException.aidl +package me.him188.ani.app.domain.torrent.parcel; + +parcelable RemoteContinuationException; \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteAnitorrentEngine.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteAnitorrentEngine.kt index d20fe35b59..b21b319e68 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteAnitorrentEngine.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteAnitorrentEngine.kt @@ -12,6 +12,8 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build import android.os.IInterface import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect @@ -20,7 +22,6 @@ import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import me.him188.ani.app.data.models.preference.AnitorrentConfig import me.him188.ani.app.data.models.preference.ProxySettings @@ -34,6 +35,7 @@ import me.him188.ani.app.domain.torrent.parcel.PTorrentPeerConfig import me.him188.ani.app.domain.torrent.service.TorrentServiceConnection import me.him188.ani.app.torrent.api.TorrentDownloader import me.him188.ani.datasources.api.source.MediaSourceLocation +import me.him188.ani.utils.coroutines.IO_ import me.him188.ani.utils.coroutines.childScope import me.him188.ani.utils.coroutines.onReplacement import me.him188.ani.utils.io.SystemPath @@ -50,8 +52,13 @@ class RemoteAnitorrentEngine( saveDir: SystemPath, parentCoroutineContext: CoroutineContext, ) : TorrentEngine { - private val childScope = parentCoroutineContext.childScope() private val logger = logger() + + private val scope = parentCoroutineContext.childScope() + private val fetchRemoteScope = parentCoroutineContext.childScope( + CoroutineName("RemoteAnitorrentEngineFetchRemote") + Dispatchers.IO_, + ) + private val connectivityAware = DefaultConnectivityAware( parentCoroutineContext.childScope(), connection.connected, @@ -98,9 +105,11 @@ class RemoteAnitorrentEngine( } override suspend fun getDownloader(): TorrentDownloader { - return RemoteTorrentDownloader(connectivityAware) { - runBlocking { getBinderOrFail() }.downlaoder - } + return RemoteTorrentDownloader( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { getBinderOrFail().downlaoder }, + connectivityAware, + ) } private suspend fun getBinderOrFail(): IRemoteAniTorrentEngine { @@ -108,16 +117,17 @@ class RemoteAnitorrentEngine( } override fun close() { - childScope.cancel() + scope.cancel() + fetchRemoteScope.cancel() } private inline fun collectSettingsToRemote( settingsFlow: Flow, noinline getBinder: suspend () -> I, crossinline transact: I.(String) -> Unit - ) = childScope.launch { + ) = scope.launch { val stateFlow = settingsFlow.stateIn(this) - val remoteCall = RetryRemoteCall { runBlocking { getBinder() } } + val remoteCall = RetryRemoteObject(fetchRemoteScope) { getBinder() } connection.connected .filter { it } diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteCall.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteCall.kt deleted file mode 100644 index 612d233ab1..0000000000 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteCall.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (C) 2024 OpenAni and contributors. - * - * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. - * Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link. - * - * https://github.com/open-ani/ani/blob/main/LICENSE - */ - -package me.him188.ani.app.domain.torrent.client - -import android.os.DeadObjectException -import android.os.IInterface -import kotlinx.atomicfu.locks.SynchronizedObject -import me.him188.ani.utils.logging.logger -import me.him188.ani.utils.logging.warn - -/** - * Wrapper for remote call - */ -interface RemoteCall { - fun call(block: I.() -> R): R - - fun T.callOnceOrNull(block: T.() -> R): R? -} - -/** - * Impl for remote call safely with retry mechanism. - */ -class RetryRemoteCall( - private val getRemote: () -> I -) : RemoteCall { - private val logger = logger(this::class) - - private var remote: I? = null - private val lock = SynchronizedObject() - - private fun setRemote(): I = synchronized(lock) { - val currentRemote = remote - if (currentRemote != null) return@synchronized currentRemote - - val newRemote = getRemote() - remote = newRemote - - newRemote - } - - override fun call(block: I.() -> R): R { - var retryCount = 0 - - while (true) { - val currentRemote = remote.let { it ?: setRemote() } - - try { - return block(currentRemote) - } catch (doe: DeadObjectException) { - if (retryCount > 2) throw doe - - retryCount += 1 - logger.warn(Exception("Show stacktrace")) { - "Remote interface $currentRemote is dead, attempt to fetch new remote. retryCount = $retryCount" - } - remote = null - } - } - } - - override fun T.callOnceOrNull(block: T.() -> R): R? { - return try { - block(this) - } catch (doe: DeadObjectException) { - null - } - } -} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteObject.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteObject.kt new file mode 100644 index 0000000000..806586b8aa --- /dev/null +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteObject.kt @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2024 OpenAni and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link. + * + * https://github.com/open-ani/ani/blob/main/LICENSE + */ + +package me.him188.ani.app.domain.torrent.client + +import android.os.DeadObjectException +import android.os.IInterface +import android.os.RemoteException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import me.him188.ani.app.domain.torrent.IDisposableHandle +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException +import me.him188.ani.utils.coroutines.update +import me.him188.ani.utils.logging.logger +import me.him188.ani.utils.logging.warn +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Wrapper for remote call + */ +interface RemoteObject { + fun call(block: I.() -> R): R +} + +/** + * Impl for remote call safely with retry mechanism. + */ +class RetryRemoteObject( + private val scope: CoroutineScope, + private val getRemote: suspend () -> I +) : RemoteObject { + private val logger = logger(this::class) + + private val remote: MutableStateFlow = MutableStateFlow(null) + private val lock = Mutex() + + private fun setRemote(): Deferred = scope.async { + remote.value?.let { return@async it } + + lock.withLock { + remote.value?.let { return@async it } + + val newRemote = getRemote() + remote.update { newRemote } + newRemote + } + } + + override fun call(block: I.() -> R): R { + var retryCount = 0 + + while (true) { + // remote 为 null 时所有的 call 都要阻塞, 直到第一个 setRemote + // 其他在等待的 call 返回第一个 setRemote 的值 + val currentRemote = remote.value ?: runBlocking { + setRemote().await() + } + + try { + return block(currentRemote) + } catch (doe: DeadObjectException) { + if (retryCount > 2) throw doe + + retryCount += 1 + logger.warn(Exception("Show stacktrace")) { + "Remote interface $currentRemote is dead, attempt to fetch new remote. retryCount = $retryCount" + } + + if (!remote.compareAndSet(currentRemote, null)) { + logger.warn(IllegalStateException("Failed to invalidate current remote interface because it is changed. Before: $currentRemote, After: ${remote.value}.")) + remote.value = null + } + } + } + } +} + +/** + * Wrapper for call which takes a continuation-like argument and returns [IDisposableHandle], + * which means this is a asynchronous RPC call. + * + * [IDisposableHandle] takes responsibility to pass cancellation to server. + */ +suspend inline fun RemoteObject.callSuspendCancellable( + crossinline transact: I.(RemoteContinuation) -> IDisposableHandle?, +): T = suspendCancellableCoroutine { cont -> + val disposable = call { + transact( + object : RemoteContinuation { + override fun resume(value: T?) { + if (value == null) { + cont.resumeWithException(RemoteException("Remote resume a null value.")) + } else { + cont.resume(value) + } + } + + override fun resumeWithException(e: RemoteContinuationException?) { + cont.resumeWithException( + e?.smartCast() ?: RemoteException("Remote resume a null exception."), + ) + } + }, + ) + } + + if (disposable != null) { + cont.invokeOnCancellation { + try { + disposable.dispose() + } catch (_: DeadObjectException) { + } + } + } else { + cont.resumeWithException(RemoteException("Remote disposable is null.")) + } +} + +interface RemoteContinuation { + fun resume(value: T?) + fun resumeWithException(e: RemoteContinuationException?) +} \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentDownloader.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentDownloader.kt index f7ae2ddb18..8f13c33fca 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentDownloader.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentDownloader.kt @@ -10,23 +10,27 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build +import android.os.DeadObjectException import androidx.annotation.RequiresApi -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.withContext import kotlinx.io.files.Path import me.him188.ani.app.domain.torrent.IDisposableHandle import me.him188.ani.app.domain.torrent.IRemoteTorrentDownloader -import me.him188.ani.app.domain.torrent.ITorrentDownloaderStatsCallback +import me.him188.ani.app.domain.torrent.IRemoteTorrentSession +import me.him188.ani.app.domain.torrent.callback.ITorrentDownloaderStatsCallback +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderFetchTorrent +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderStartDownload +import me.him188.ani.app.domain.torrent.parcel.PEncodedTorrentInfo import me.him188.ani.app.domain.torrent.parcel.PTorrentDownloaderStats +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException import me.him188.ani.app.domain.torrent.parcel.toParceled import me.him188.ani.app.torrent.api.TorrentDownloader import me.him188.ani.app.torrent.api.TorrentLibInfo import me.him188.ani.app.torrent.api.TorrentSession import me.him188.ani.app.torrent.api.files.EncodedTorrentInfo -import me.him188.ani.utils.coroutines.IO_ import me.him188.ani.utils.io.SystemPath import me.him188.ani.utils.io.absolutePath import me.him188.ani.utils.io.inSystem @@ -34,13 +38,12 @@ import kotlin.coroutines.CoroutineContext @RequiresApi(Build.VERSION_CODES.O_MR1) class RemoteTorrentDownloader( - connectivityAware: ConnectivityAware, - getRemote: () -> IRemoteTorrentDownloader -) : TorrentDownloader, - RemoteCall by RetryRemoteCall(getRemote), - ConnectivityAware by connectivityAware { + private val fetchRemoteScope: CoroutineScope, + private val remote: RemoteObject, + private val connectivityAware: ConnectivityAware +) : TorrentDownloader { override val totalStats: Flow = callbackFlow { - var disposable: IDisposableHandle? = null + var disposable: IDisposableHandle? val callback = object : ITorrentDownloaderStatsCallback.Stub() { override fun onEmit(stat: PTorrentDownloaderStats?) { if (stat != null) trySend(stat.toStats()) @@ -48,49 +51,80 @@ class RemoteTorrentDownloader( } // todo: not thread-safe - disposable = call { getTotalStatus(callback) } - val transform = registerStateTransform(false, true) { - disposable?.callOnceOrNull { dispose() } - disposable = call { getTotalStatus(callback) } + disposable = remote.call { getTotalStatus(callback) } + val transform = connectivityAware.registerStateTransform(false, true) { + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + disposable = remote.call { getTotalStatus(callback) } } awaitClose { - disposable?.callOnceOrNull { dispose() } - unregister(transform) + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + connectivityAware.unregister(transform) } } - override val vendor: TorrentLibInfo get() = call { vendor.toTorrentLibInfo() } + override val vendor: TorrentLibInfo get() = remote.call { vendor.toTorrentLibInfo() } - override suspend fun fetchTorrent(uri: String, timeoutSeconds: Int): EncodedTorrentInfo { - return withContext(Dispatchers.IO_) { - val result = call { fetchTorrent(uri, timeoutSeconds) } - result.toEncodedTorrentInfo() + override suspend fun fetchTorrent(uri: String, timeoutSeconds: Int): EncodedTorrentInfo = + remote.callSuspendCancellable { cont -> + fetchTorrent( + uri, timeoutSeconds, + object : ContTorrentDownloaderFetchTorrent.Stub() { + override fun resume(value: PEncodedTorrentInfo?) { + cont.resume(value?.toEncodedTorrentInfo()) + } + + override fun resumeWithException(exception: RemoteContinuationException?) { + cont.resumeWithException(exception) + } + }, + ) } - } override suspend fun startDownload( data: EncodedTorrentInfo, parentCoroutineContext: CoroutineContext, overrideSaveDir: SystemPath? ): TorrentSession { - return withContext(Dispatchers.IO_) { - RemoteTorrentSession(this@RemoteTorrentDownloader) { - call { startDownload(data.toParceled(), overrideSaveDir?.absolutePath) } - } - } + return RemoteTorrentSession( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { + remote.callSuspendCancellable { cont -> + startDownload( + data.toParceled(), + overrideSaveDir?.absolutePath, + object : ContTorrentDownloaderStartDownload.Stub() { + override fun resume(value: IRemoteTorrentSession?) { + cont.resume(value) + } + + override fun resumeWithException(exception: RemoteContinuationException?) { + cont.resumeWithException(exception) + } + }, + ) + } + }, + connectivityAware, + ) } override fun getSaveDirForTorrent(data: EncodedTorrentInfo): SystemPath { - val remotePath = call { getSaveDirForTorrent(data.toParceled()) } + val remotePath = remote.call { getSaveDirForTorrent(data.toParceled()) } return Path(remotePath).inSystem } override fun listSaves(): List { - return call { listSaves() }.map { Path(it).inSystem } + return remote.call { listSaves() }.map { Path(it).inSystem } } override fun close() { - return call { close() } + return remote.call { close() } } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntry.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntry.kt index dfbe58cb59..a0f4ea88fa 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntry.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntry.kt @@ -10,7 +10,9 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build +import android.os.DeadObjectException import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow @@ -19,8 +21,12 @@ import kotlinx.coroutines.withContext import kotlinx.io.files.Path import me.him188.ani.app.domain.torrent.IDisposableHandle import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntry -import me.him188.ani.app.domain.torrent.ITorrentFileEntryStatsCallback +import me.him188.ani.app.domain.torrent.callback.ITorrentFileEntryStatsCallback +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryGetInputParams +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryResolveFile import me.him188.ani.app.domain.torrent.parcel.PTorrentFileEntryStats +import me.him188.ani.app.domain.torrent.parcel.PTorrentInputParameter +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException import me.him188.ani.app.torrent.api.files.TorrentFileEntry import me.him188.ani.app.torrent.api.files.TorrentFileHandle import me.him188.ani.app.torrent.api.pieces.PieceList @@ -34,11 +40,10 @@ import java.io.RandomAccessFile @RequiresApi(Build.VERSION_CODES.O_MR1) class RemoteTorrentFileEntry( - connectivityAware: ConnectivityAware, - getRemote: () -> IRemoteTorrentFileEntry -) : TorrentFileEntry, - RemoteCall by RetryRemoteCall(getRemote), - ConnectivityAware by connectivityAware { + private val fetchRemoteScope: CoroutineScope, + private val remote: RemoteObject, + private val connectivityAware: ConnectivityAware +) : TorrentFileEntry { override val fileStats: Flow = callbackFlow { var disposable: IDisposableHandle? = null val callback = object : ITorrentFileEntryStatsCallback.Stub() { @@ -48,60 +53,88 @@ class RemoteTorrentFileEntry( } // todo: not thread-safe - disposable = call { getFileStats(callback) } - val transform = registerStateTransform(false, true) { - disposable?.callOnceOrNull { dispose() } - disposable = call { getFileStats(callback) } + disposable = remote.call { getFileStats(callback) } + val transform = connectivityAware.registerStateTransform(false, true) { + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + disposable = remote.call { getFileStats(callback) } } awaitClose { - disposable?.callOnceOrNull { dispose() } - unregister(transform) + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + connectivityAware.unregister(transform) } } - override val length: Long get() = call { length } + override val length: Long get() = remote.call { length } - override val pathInTorrent: String get() = call { pathInTorrent } + override val pathInTorrent: String get() = remote.call { pathInTorrent } - override val pieces: PieceList get() = RemotePieceList(this, call { pieces }) + override val pieces: PieceList get() = RemotePieceList(connectivityAware, remote.call { pieces }) - override val supportsStreaming: Boolean get() = call { supportsStreaming } + override val supportsStreaming: Boolean get() = remote.call { supportsStreaming } override fun createHandle(): TorrentFileHandle { - return RemoteTorrentFileHandle(this) { call { createHandle() } } + return RemoteTorrentFileHandle( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { remote.call { createHandle() } }, + connectivityAware, + ) } - override suspend fun resolveFile(): SystemPath { - return withContext(Dispatchers.IO_) { - val result = call { resolveFile() } - Path(result).inSystem + override suspend fun resolveFile(): SystemPath = + remote.callSuspendCancellable { cont -> + resolveFile( + object : ContTorrentFileEntryResolveFile.Stub() { + override fun resume(value: String?) { + cont.resume(value?.let { Path(it).inSystem }) + } + + override fun resumeWithException(exception: RemoteContinuationException?) { + cont.resumeWithException(exception) + } + }, + ) } - } override fun resolveFileMaybeEmptyOrNull(): SystemPath? { - val result = call { resolveFileMaybeEmptyOrNull() } + val result = remote.call { resolveFileMaybeEmptyOrNull() } return if (result != null) Path(result).inSystem else null } - override suspend fun createInput(): SeekableInput { - val remoteInput = call { torrentInputParams } - - val file = Path(remoteInput.file).inSystem - - return TorrentInput( - file = withContext(Dispatchers.IO_) { - RandomAccessFile(file.toFile(), "r") - }, - pieces = pieces, - logicalStartOffset = remoteInput.logicalStartOffset, - onWait = { - withContext(Dispatchers.IO_) { - call { torrentInputOnWait(it.pieceIndex) } - } - }, - bufferSize = remoteInput.bufferSize, - size = remoteInput.size, - ) - } + override suspend fun createInput(): SeekableInput = + remote.callSuspendCancellable { cont -> + getTorrentInputParams( + object : ContTorrentFileEntryGetInputParams.Stub() { + override fun resume(value: PTorrentInputParameter?) { + if (value == null) { + cont.resume(null) + return + } + + TorrentInput( + file = RandomAccessFile(Path(value.file).inSystem.toFile(), "r"), + pieces = this@RemoteTorrentFileEntry.pieces, + logicalStartOffset = value.logicalStartOffset, + onWait = { + withContext(Dispatchers.IO_) { + remote.call { torrentInputOnWait(it.pieceIndex) } + } + }, + bufferSize = value.bufferSize, + size = value.size, + ).also { cont.resume(it) } + } + + override fun resumeWithException(exception: RemoteContinuationException?) { + cont.resumeWithException(exception) + } + }, + ) + } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntryList.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntryList.kt index aa7657e29f..3c1abb3ed1 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntryList.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileEntryList.kt @@ -11,19 +11,23 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineScope import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntryList import me.him188.ani.app.torrent.api.files.TorrentFileEntry @RequiresApi(Build.VERSION_CODES.O_MR1) class RemoteTorrentFileEntryList( - connectivityAware: ConnectivityAware, - getRemote: () -> IRemoteTorrentFileEntryList -) : AbstractList(), - RemoteCall by RetryRemoteCall(getRemote), - ConnectivityAware by connectivityAware { - override val size: Int get() = call { size } + private val fetchRemoteScope: CoroutineScope, + private val remote: RemoteObject, + private val connectivityAware: ConnectivityAware +) : AbstractList() { + override val size: Int get() = remote.call { size } override fun get(index: Int): TorrentFileEntry { - return RemoteTorrentFileEntry(this) { call { get(index) } } + return RemoteTorrentFileEntry( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { remote.call { get(index) } }, + connectivityAware, + ) } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileHandle.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileHandle.kt index 4f035cff77..16e1537492 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileHandle.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentFileHandle.kt @@ -11,6 +11,7 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import me.him188.ani.app.domain.torrent.IRemoteTorrentFileHandle @@ -21,31 +22,34 @@ import me.him188.ani.utils.coroutines.IO_ @RequiresApi(Build.VERSION_CODES.O_MR1) class RemoteTorrentFileHandle( - connectivityAware: ConnectivityAware, - getRemote: () -> IRemoteTorrentFileHandle -) : TorrentFileHandle, - RemoteCall by RetryRemoteCall(getRemote), - ConnectivityAware by connectivityAware { + private val fetchRemoteScope: CoroutineScope, + private val remote: RemoteObject, + private val connectivityAware: ConnectivityAware +) : TorrentFileHandle { override val entry: TorrentFileEntry - get() = RemoteTorrentFileEntry(this) { call { torrentFileEntry } } + get() = RemoteTorrentFileEntry( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { remote.call { torrentFileEntry } }, + connectivityAware, + ) override fun resume(priority: FilePriority) { - call { resume(priority.ordinal) } + remote.call { resume(priority.ordinal) } } override fun pause() { - call { pause() } + remote.call { pause() } } override suspend fun close() { withContext(Dispatchers.IO_) { - call { close() } + remote.call { close() } } } override suspend fun closeAndDelete() { withContext(Dispatchers.IO_) { - call { closeAndDelete() } + remote.call { closeAndDelete() } } } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentSession.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentSession.kt index f4e0a36546..e9d976eb15 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentSession.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/client/RemoteTorrentSession.kt @@ -10,16 +10,21 @@ package me.him188.ani.app.domain.torrent.client import android.os.Build +import android.os.DeadObjectException import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.withContext import me.him188.ani.app.domain.torrent.IDisposableHandle +import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntryList import me.him188.ani.app.domain.torrent.IRemoteTorrentSession -import me.him188.ani.app.domain.torrent.ITorrentSessionStatsCallback +import me.him188.ani.app.domain.torrent.callback.ITorrentSessionStatsCallback +import me.him188.ani.app.domain.torrent.cont.ContTorrentSessionGetFiles import me.him188.ani.app.domain.torrent.parcel.PTorrentSessionStats +import me.him188.ani.app.domain.torrent.parcel.RemoteContinuationException import me.him188.ani.app.torrent.api.TorrentSession import me.him188.ani.app.torrent.api.files.TorrentFileEntry import me.him188.ani.app.torrent.api.peer.PeerInfo @@ -27,11 +32,10 @@ import me.him188.ani.utils.coroutines.IO_ @RequiresApi(Build.VERSION_CODES.O_MR1) class RemoteTorrentSession( - connectivityAware: ConnectivityAware, - getRemote: () -> IRemoteTorrentSession -) : TorrentSession, - RemoteCall by RetryRemoteCall(getRemote), - ConnectivityAware by connectivityAware { + private val fetchRemoteScope: CoroutineScope, + private val remote: RemoteObject, + private val connectivityAware: ConnectivityAware +) : TorrentSession { override val sessionStats: Flow = callbackFlow { var disposable: IDisposableHandle? = null val callback = object : ITorrentSessionStatsCallback.Stub() { @@ -41,41 +45,63 @@ class RemoteTorrentSession( } // todo: not thread-safe - disposable = call { getSessionStats(callback) } - val transform = registerStateTransform(false, true) { - disposable?.callOnceOrNull { dispose() } - disposable = call { getSessionStats(callback) } + disposable = remote.call { getSessionStats(callback) } + val transform = connectivityAware.registerStateTransform(false, true) { + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + disposable = remote.call { getSessionStats(callback) } } awaitClose { - disposable?.callOnceOrNull { dispose() } - unregister(transform) + try { + disposable?.dispose() + } catch (_: DeadObjectException) { + } + connectivityAware.unregister(transform) } } override suspend fun getName(): String { - return withContext(Dispatchers.IO_) { call { name } } + return withContext(Dispatchers.IO_) { remote.call { name } } } override suspend fun getFiles(): List { - return withContext(Dispatchers.IO_) { - RemoteTorrentFileEntryList(this@RemoteTorrentSession) { call { files } } - } + return RemoteTorrentFileEntryList( + fetchRemoteScope, + RetryRemoteObject(fetchRemoteScope) { + remote.callSuspendCancellable { cont -> + getFiles( + object : ContTorrentSessionGetFiles.Stub() { + override fun resume(value: IRemoteTorrentFileEntryList?) { + cont.resume(value) + } + + override fun resumeWithException(exception: RemoteContinuationException?) { + cont.resumeWithException(exception) + } + }, + ) + } + }, + connectivityAware, + ) } override fun getPeers(): List { - return call { peers }.asList() + return remote.call { peers }.asList() } override suspend fun close() { withContext(Dispatchers.IO_) { - call { close() } + remote.call { close() } } } override suspend fun closeIfNotInUse() { withContext(Dispatchers.IO_) { - call { closeIfNotInUse() } + remote.call { closeIfNotInUse() } } } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/parcel/RemoteContinuationException.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/parcel/RemoteContinuationException.kt new file mode 100644 index 0000000000..26a6fb9de4 --- /dev/null +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/parcel/RemoteContinuationException.kt @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 OpenAni and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link. + * + * https://github.com/open-ani/ani/blob/main/LICENSE + */ + +package me.him188.ani.app.domain.torrent.parcel + +import android.os.Parcelable +import kotlinx.parcelize.Parcelize +import me.him188.ani.utils.coroutines.CancellationException +import java.io.OutputStream +import java.io.PrintStream + +/** + * Exception which can be parceled and transact in async remote call. + */ +@Suppress("MemberVisibilityCanBePrivate", "CanBeParameter") +@Parcelize +class RemoteContinuationException( + val throwableName: String, + override val message: String?, + val serializedCause: String?, +) : Parcelable, Exception( + "$throwableName: $message", + serializedCause?.let { RemoteContinuationException("Exception", it, null) }, +) { + fun smartCast(): Exception { + return if (throwableName.contains("CancellationException")) { + CancellationException(message, cause) + } else { + this + } + } +} + +private fun Throwable.toFullString(): String = buildString { + printStackTrace( + PrintStream( + object : OutputStream() { + override fun write(b: Int) { + append(b.toByte()) + } + }, + ), + ) +} + +internal fun Throwable.toRemoteContinuationException() = + RemoteContinuationException(javaClass.name, message, cause?.toFullString()) \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/AniTorrentService.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/AniTorrentService.kt index ce23c486c6..385f7083ed 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/AniTorrentService.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/AniTorrentService.kt @@ -51,9 +51,8 @@ import kotlin.coroutines.CoroutineContext class AniTorrentService : LifecycleService(), CoroutineScope { private val logger = logger(this::class) - override val coroutineContext: CoroutineContext - get() = lifecycleScope.coroutineContext + - CoroutineName("AniTorrentService") + + override val coroutineContext: CoroutineContext = + Dispatchers.Default + CoroutineName("AniTorrentService") + SupervisorJob(lifecycleScope.coroutineContext[Job]) // config flow for constructing torrent engine. diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentDownloaderProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentDownloaderProxy.kt index d48e0342de..8aecbd7e26 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentDownloaderProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentDownloaderProxy.kt @@ -12,20 +12,22 @@ package me.him188.ani.app.domain.torrent.service.proxy import android.os.Build import android.os.DeadObjectException import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import kotlinx.io.files.Path import me.him188.ani.app.domain.torrent.IDisposableHandle import me.him188.ani.app.domain.torrent.IRemoteTorrentDownloader -import me.him188.ani.app.domain.torrent.IRemoteTorrentSession -import me.him188.ani.app.domain.torrent.ITorrentDownloaderStatsCallback +import me.him188.ani.app.domain.torrent.callback.ITorrentDownloaderStatsCallback import me.him188.ani.app.domain.torrent.client.ConnectivityAware +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderFetchTorrent +import me.him188.ani.app.domain.torrent.cont.ContTorrentDownloaderStartDownload import me.him188.ani.app.domain.torrent.parcel.PEncodedTorrentInfo import me.him188.ani.app.domain.torrent.parcel.PTorrentDownloaderStats import me.him188.ani.app.domain.torrent.parcel.PTorrentLibInfo import me.him188.ani.app.domain.torrent.parcel.toParceled +import me.him188.ani.app.domain.torrent.parcel.toRemoteContinuationException import me.him188.ani.app.torrent.api.TorrentDownloader import me.him188.ani.utils.coroutines.CancellationException import me.him188.ani.utils.coroutines.IO_ @@ -37,16 +39,16 @@ import kotlin.coroutines.CoroutineContext class TorrentDownloaderProxy( private val delegate: TorrentDownloader, - connectivityAware: ConnectivityAware, + private val connectivityAware: ConnectivityAware, context: CoroutineContext, -) : IRemoteTorrentDownloader.Stub(), ConnectivityAware by connectivityAware { +) : IRemoteTorrentDownloader.Stub() { private val logger = logger() private val scope = context.childScope() override fun getTotalStatus(flow: ITorrentDownloaderStatsCallback?): IDisposableHandle { val job = scope.launch(Dispatchers.IO_) { delegate.totalStats.collect { - if (!isConnected) return@collect + if (!connectivityAware.isConnected) return@collect try { flow?.onEmit( @@ -79,24 +81,52 @@ class TorrentDownloaderProxy( } @RequiresApi(Build.VERSION_CODES.O_MR1) - override fun fetchTorrent(uri: String?, timeoutSeconds: Int): PEncodedTorrentInfo? { + override fun fetchTorrent( + uri: String?, + timeoutSeconds: Int, + cont: ContTorrentDownloaderFetchTorrent? + ): IDisposableHandle? { if (uri == null) return null - - val fetched = runBlocking { delegate.fetchTorrent(uri, timeoutSeconds) } - return fetched.toParceled() + if (cont == null) return null + + val job = scope.launch( + CoroutineExceptionHandler { _, throwable -> + if (!connectivityAware.isConnected) return@CoroutineExceptionHandler + cont.resumeWithException(throwable.toRemoteContinuationException()) + } + Dispatchers.IO_, + ) { + val result = delegate.fetchTorrent(uri, timeoutSeconds) + if (!connectivityAware.isConnected) return@launch + cont.resume(result.toParceled()) + } + + return DisposableHandleProxy { job.cancel() } } @RequiresApi(Build.VERSION_CODES.O_MR1) - override fun startDownload(data: PEncodedTorrentInfo?, overrideSaveDir: String?): IRemoteTorrentSession? { + override fun startDownload( + data: PEncodedTorrentInfo?, + overrideSaveDir: String?, + cont: ContTorrentDownloaderStartDownload? + ): IDisposableHandle? { if (data == null) return null - - val session = runBlocking { - delegate.startDownload( + if (cont == null) return null + + val job = scope.launch( + CoroutineExceptionHandler { _, throwable -> + if (!connectivityAware.isConnected) return@CoroutineExceptionHandler + cont.resumeWithException(throwable.toRemoteContinuationException()) + } + Dispatchers.IO_, + ) { + val result = delegate.startDownload( withContext(Dispatchers.IO_) { data.toEncodedTorrentInfo() }, overrideSaveDir = overrideSaveDir?.run { Path(this).inSystem }, - ) + ) + if (!connectivityAware.isConnected) return@launch + cont.resume(TorrentSessionProxy(result, connectivityAware, scope.coroutineContext)) } - return TorrentSessionProxy(session, this, scope.coroutineContext) + + return DisposableHandleProxy { job.cancel() } } @RequiresApi(Build.VERSION_CODES.O_MR1) diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentEngineProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentEngineProxy.kt index 61e29dbfbb..d609ae0896 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentEngineProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentEngineProxy.kt @@ -24,12 +24,12 @@ import kotlinx.serialization.json.Json import me.him188.ani.app.data.models.preference.AnitorrentConfig import me.him188.ani.app.data.models.preference.ProxySettings import me.him188.ani.app.data.models.preference.TorrentPeerConfig -import me.him188.ani.app.domain.torrent.IAnitorrentConfigCollector -import me.him188.ani.app.domain.torrent.IProxySettingsCollector import me.him188.ani.app.domain.torrent.IRemoteAniTorrentEngine import me.him188.ani.app.domain.torrent.IRemoteTorrentDownloader -import me.him188.ani.app.domain.torrent.ITorrentPeerConfigCollector import me.him188.ani.app.domain.torrent.client.DefaultConnectivityAware +import me.him188.ani.app.domain.torrent.collector.IAnitorrentConfigCollector +import me.him188.ani.app.domain.torrent.collector.IProxySettingsCollector +import me.him188.ani.app.domain.torrent.collector.ITorrentPeerConfigCollector import me.him188.ani.app.domain.torrent.engines.AnitorrentEngine import me.him188.ani.app.domain.torrent.parcel.PAnitorrentConfig import me.him188.ani.app.domain.torrent.parcel.PProxySettings diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryListProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryListProxy.kt index 4c961ccc87..b90c5c2f6c 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryListProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryListProxy.kt @@ -18,13 +18,13 @@ import kotlin.coroutines.CoroutineContext class TorrentFileEntryListProxy( val delegate: List, - connectivityAware: ConnectivityAware, + private val connectivityAware: ConnectivityAware, context: CoroutineContext -) : IRemoteTorrentFileEntryList.Stub(), ConnectivityAware by connectivityAware { +) : IRemoteTorrentFileEntryList.Stub() { private val scope = context.childScope() override fun get(index: Int): IRemoteTorrentFileEntry { - return TorrentFileEntryProxy(delegate[index], this, scope.coroutineContext) + return TorrentFileEntryProxy(delegate[index], connectivityAware, scope.coroutineContext) } override fun getSize(): Int { diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryProxy.kt index 2dedfbf61b..5f750a2ba7 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileEntryProxy.kt @@ -12,17 +12,20 @@ package me.him188.ani.app.domain.torrent.service.proxy import android.os.Build import android.os.DeadObjectException import androidx.annotation.RequiresApi +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import me.him188.ani.app.domain.torrent.IDisposableHandle import me.him188.ani.app.domain.torrent.IRemotePieceList import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntry import me.him188.ani.app.domain.torrent.IRemoteTorrentFileHandle -import me.him188.ani.app.domain.torrent.ITorrentFileEntryStatsCallback +import me.him188.ani.app.domain.torrent.callback.ITorrentFileEntryStatsCallback import me.him188.ani.app.domain.torrent.client.ConnectivityAware +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryGetInputParams +import me.him188.ani.app.domain.torrent.cont.ContTorrentFileEntryResolveFile import me.him188.ani.app.domain.torrent.parcel.PTorrentFileEntryStats import me.him188.ani.app.domain.torrent.parcel.PTorrentInputParameter +import me.him188.ani.app.domain.torrent.parcel.toRemoteContinuationException import me.him188.ani.app.torrent.anitorrent.session.AnitorrentDownloadSession import me.him188.ani.app.torrent.api.files.TorrentFileEntry import me.him188.ani.utils.coroutines.CancellationException @@ -33,15 +36,15 @@ import kotlin.coroutines.CoroutineContext class TorrentFileEntryProxy( private val delegate: TorrentFileEntry, - connectivityAware: ConnectivityAware, + private val connectivityAware: ConnectivityAware, context: CoroutineContext -) : IRemoteTorrentFileEntry.Stub(), ConnectivityAware by connectivityAware { +) : IRemoteTorrentFileEntry.Stub() { private val scope = context.childScope() override fun getFileStats(flow: ITorrentFileEntryStatsCallback?): IDisposableHandle { val job = scope.launch(Dispatchers.IO_) { delegate.fileStats.collect { - if (!isConnected) return@collect + if (!connectivityAware.isConnected) return@collect try { flow?.onEmit(PTorrentFileEntryStats(it.downloadedBytes, it.downloadProgress)) @@ -72,29 +75,57 @@ class TorrentFileEntryProxy( } override fun createHandle(): IRemoteTorrentFileHandle { - return TorrentFileHandleProxy(delegate.createHandle(), this, scope.coroutineContext) + return TorrentFileHandleProxy(delegate.createHandle(), connectivityAware, scope.coroutineContext) } - override fun resolveFile(): String { - return runBlocking { delegate.resolveFile().absolutePath } + override fun resolveFile(cont: ContTorrentFileEntryResolveFile?): IDisposableHandle? { + if (cont == null) return null + + val job = scope.launch( + CoroutineExceptionHandler { _, throwable -> + if (!connectivityAware.isConnected) return@CoroutineExceptionHandler + cont.resumeWithException(throwable.toRemoteContinuationException()) + } + Dispatchers.IO_, + ) { + val result = delegate.resolveFile().absolutePath + if (!connectivityAware.isConnected) return@launch + cont.resume(result) + } + + return DisposableHandleProxy { job.cancel() } } override fun resolveFileMaybeEmptyOrNull(): String? { return delegate.resolveFileMaybeEmptyOrNull()?.absolutePath } - override fun getTorrentInputParams(): PTorrentInputParameter? { - check(delegate is AnitorrentDownloadSession.AnitorrentEntry) { - "Expected delegate instance is AnitorrentEntry, actual $delegate" + override fun getTorrentInputParams(cont: ContTorrentFileEntryGetInputParams?): IDisposableHandle? { + if (cont == null) return null + if (delegate !is AnitorrentDownloadSession.AnitorrentEntry) { + val exception = IllegalStateException("Expected delegate instance is AnitorrentEntry, actual $delegate") + cont.resumeWithException(exception.toRemoteContinuationException()) + throw exception + } + + val job = scope.launch( + CoroutineExceptionHandler { _, throwable -> + if (!connectivityAware.isConnected) return@CoroutineExceptionHandler + cont.resumeWithException(throwable.toRemoteContinuationException()) + } + Dispatchers.IO_, + ) { + val result = delegate.createTorrentInputParameters() + if (!connectivityAware.isConnected) return@launch + cont.resume( + PTorrentInputParameter( + file = result.file.absolutePath, + logicalStartOffset = result.logicalStartOffset, + bufferSize = result.bufferSize, + size = result.size, + ), + ) } - val torrentInputParameters = runBlocking { delegate.createTorrentInputParameters() } - - return PTorrentInputParameter( - file = torrentInputParameters.file.absolutePath, - logicalStartOffset = torrentInputParameters.logicalStartOffset, - bufferSize = torrentInputParameters.bufferSize, - size = torrentInputParameters.size, - ) + + return DisposableHandleProxy { job.cancel() } } override fun torrentInputOnWait(pieceIndex: Int) { diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileHandleProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileHandleProxy.kt index 04be5ae0ad..0ed7b71880 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileHandleProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentFileHandleProxy.kt @@ -9,7 +9,7 @@ package me.him188.ani.app.domain.torrent.service.proxy -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.launch import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntry import me.him188.ani.app.domain.torrent.IRemoteTorrentFileHandle import me.him188.ani.app.domain.torrent.client.ConnectivityAware @@ -20,13 +20,13 @@ import kotlin.coroutines.CoroutineContext class TorrentFileHandleProxy( private val delegate: TorrentFileHandle, - connectivityAware: ConnectivityAware, + private val connectivityAware: ConnectivityAware, context: CoroutineContext -) : IRemoteTorrentFileHandle.Stub(), ConnectivityAware by connectivityAware { +) : IRemoteTorrentFileHandle.Stub() { private val scope = context.childScope() override fun getTorrentFileEntry(): IRemoteTorrentFileEntry { - return TorrentFileEntryProxy(delegate.entry, this, scope.coroutineContext) + return TorrentFileEntryProxy(delegate.entry, connectivityAware, scope.coroutineContext) } override fun resume(priorityEnum: Int) { @@ -38,10 +38,14 @@ class TorrentFileHandleProxy( } override fun close() { - runBlocking { delegate.close() } + scope.launch { + delegate.close() + } } override fun closeAndDelete() { - runBlocking { delegate.closeAndDelete() } + scope.launch { + delegate.closeAndDelete() + } } } \ No newline at end of file diff --git a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentSessionProxy.kt b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentSessionProxy.kt index 506ec64177..82da412758 100644 --- a/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentSessionProxy.kt +++ b/app/shared/app-data/src/androidMain/kotlin/domain/torrent/service/proxy/TorrentSessionProxy.kt @@ -10,16 +10,18 @@ package me.him188.ani.app.domain.torrent.service.proxy import android.os.DeadObjectException +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import me.him188.ani.app.domain.torrent.IDisposableHandle -import me.him188.ani.app.domain.torrent.IRemoteTorrentFileEntryList import me.him188.ani.app.domain.torrent.IRemoteTorrentSession -import me.him188.ani.app.domain.torrent.ITorrentSessionStatsCallback +import me.him188.ani.app.domain.torrent.callback.ITorrentSessionStatsCallback import me.him188.ani.app.domain.torrent.client.ConnectivityAware +import me.him188.ani.app.domain.torrent.cont.ContTorrentSessionGetFiles import me.him188.ani.app.domain.torrent.parcel.PPeerInfo import me.him188.ani.app.domain.torrent.parcel.PTorrentSessionStats +import me.him188.ani.app.domain.torrent.parcel.toRemoteContinuationException import me.him188.ani.app.torrent.api.TorrentSession import me.him188.ani.utils.coroutines.CancellationException import me.him188.ani.utils.coroutines.IO_ @@ -29,16 +31,16 @@ import kotlin.coroutines.CoroutineContext class TorrentSessionProxy( private val delegate: TorrentSession, - connectivityAware: ConnectivityAware, + private val connectivityAware: ConnectivityAware, context: CoroutineContext -) : IRemoteTorrentSession.Stub(), ConnectivityAware by connectivityAware { +) : IRemoteTorrentSession.Stub() { private val scope = context.childScope() private val logger = logger() override fun getSessionStats(flow: ITorrentSessionStatsCallback?): IDisposableHandle { val job = scope.launch(Dispatchers.IO_) { delegate.sessionStats.collect { - if (!isConnected) return@collect + if (!connectivityAware.isConnected) return@collect if (it == null) return@collect try { @@ -65,35 +67,50 @@ class TorrentSessionProxy( return runBlocking { delegate.getName() } } - override fun getFiles(): IRemoteTorrentFileEntryList { - val list = runBlocking { delegate.getFiles() } + override fun getFiles(cont: ContTorrentSessionGetFiles?): IDisposableHandle? { + if (cont == null) return null - return TorrentFileEntryListProxy(list, this, scope.coroutineContext) + val job = scope.launch( + CoroutineExceptionHandler { _, throwable -> + if (!connectivityAware.isConnected) return@CoroutineExceptionHandler + cont.resumeWithException(throwable.toRemoteContinuationException()) + } + Dispatchers.IO_, + ) { + val result = delegate.getFiles() + if (!connectivityAware.isConnected) return@launch + cont.resume( + TorrentFileEntryListProxy(result, connectivityAware, scope.coroutineContext), + ) + } + + return DisposableHandleProxy { job.cancel() } } override fun getPeers(): Array { - return runBlocking { - delegate.getPeers().map { - PPeerInfo( - it.handle, - it.id, - it.client, - it.ipAddr, - it.ipPort, - it.progress, - it.totalDownload.inBytes, - it.totalUpload.inBytes, - it.flags - ) - }.toTypedArray() - } + return delegate.getPeers().map { + PPeerInfo( + it.handle, + it.id, + it.client, + it.ipAddr, + it.ipPort, + it.progress, + it.totalDownload.inBytes, + it.totalUpload.inBytes, + it.flags, + ) + }.toTypedArray() } override fun close() { - return runBlocking { delegate.close() } + scope.launch { + delegate.close() + } } override fun closeIfNotInUse() { - return runBlocking { delegate.closeIfNotInUse() } + scope.launch { + delegate.closeIfNotInUse() + } } } \ No newline at end of file