From 6ad179ed4d744b7984fee57c275da23b6c81445f Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Wed, 22 Jan 2025 23:25:15 +0100 Subject: [PATCH] refactor(*): backport streamer interfaces from next version --- .../core/streamer/file/StreamerUtils.kt | 2 +- .../elements/endpoints/CombineEndpoint.kt | 12 +- .../elements/endpoints/DynamicEndpoint.kt | 8 +- .../core/elements/endpoints/IEndpoint.kt | 2 +- .../elements/endpoints/MediaMuxerEndpoint.kt | 10 +- .../endpoints/composites/CompositeEndpoint.kt | 4 +- .../composites/sinks/AbstractSink.kt | 2 +- .../endpoints/composites/sinks/FakeSink.kt | 8 +- .../endpoints/composites/sinks/FileSink.kt | 8 +- .../endpoints/composites/sinks/ISink.kt | 2 +- .../composites/sinks/OutputStreamSink.kt | 8 +- .../DefaultBitrateRegulatorController.kt | 29 ++- .../core/streamers/interfaces/IStreamer.kt | 218 ++++++++++++++++++ .../StreamerActivityLifeCycleObserver.kt | 16 +- .../StreamerViewModelLifeCycleObserver.kt | 19 +- .../streamers/single/CameraSingleStreamer.kt | 6 +- .../core/streamers/single/ISingleStreamer.kt | 207 +++-------------- .../core/streamers/single/SingleStreamer.kt | 34 +-- .../callbacks/CallbackSingleStreamer.kt | 52 ++++- .../elements/endpoints/DynamicEndpointTest.kt | 18 +- .../app/ui/main/PreviewViewModel.kt | 20 +- .../ui/main/usecases/BuildStreamerUseCase.kt | 6 +- .../streampack/screenrecorder/MainActivity.kt | 10 +- .../endpoints/composites/sinks/RtmpSink.kt | 10 +- .../endpoints/composites/sinks/SrtSink.kt | 8 +- .../DefaultSrtBitrateRegulatorController.kt | 19 +- .../services/DefaultScreenRecorderService.kt | 6 +- 27 files changed, 451 insertions(+), 293 deletions(-) create mode 100644 core/src/main/java/io/github/thibaultbee/streampack/core/streamers/interfaces/IStreamer.kt rename core/src/main/java/io/github/thibaultbee/streampack/core/streamers/{observers => lifecycle}/StreamerActivityLifeCycleObserver.kt (63%) rename core/src/main/java/io/github/thibaultbee/streampack/core/streamers/{observers => lifecycle}/StreamerViewModelLifeCycleObserver.kt (77%) diff --git a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/file/StreamerUtils.kt b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/file/StreamerUtils.kt index d94388777..64a36cd5c 100644 --- a/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/file/StreamerUtils.kt +++ b/core/src/androidTest/java/io/github/thibaultbee/streampack/core/streamer/file/StreamerUtils.kt @@ -24,7 +24,7 @@ object StreamerUtils { while (i < numOfLoop) { i++ delay(pollDuration) - assertTrue(streamer.isStreaming.value) + assertTrue(streamer.isStreamingFlow.value) } } streamer.stopStream() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt index d6dbae59d..2e76622ca 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt @@ -56,13 +56,13 @@ open class CombineEndpoint(protected val endpointInternals: List = - combineStates(*endpointInternals.map { it.isOpen }.toTypedArray()) { _ -> - endpointInternals.any { it.isOpen.value } + override val isOpenFlow: StateFlow = + combineStates(*endpointInternals.map { it.isOpenFlow }.toTypedArray()) { _ -> + endpointInternals.any { it.isOpenFlow.value } } /** @@ -190,7 +190,7 @@ open class CombineEndpoint(protected val endpointInternals: List try { - if (endpoint.isOpen.value) { + if (endpoint.isOpenFlow.value) { val endpointStreamId = endpointsToStreamIdsMap[Pair(endpoint, streamPid)]!! endpoint.write(frame, endpointStreamId) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt index 7a4781060..b43875f0a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt @@ -63,9 +63,9 @@ open class DynamicEndpoint( private var rtmpEndpoint: IEndpointInternal? = null @OptIn(ExperimentalCoroutinesApi::class) - override val isOpen: StateFlow = DerivedStateFlow( - getValue = { _endpoint?.isOpen?.value ?: false }, - flow = endpointFlow.flatMapLatest { it?.isOpen ?: MutableStateFlow(false) } + override val isOpenFlow: StateFlow = DerivedStateFlow( + getValue = { _endpoint?.isOpenFlow?.value ?: false }, + flow = endpointFlow.flatMapLatest { it?.isOpenFlow ?: MutableStateFlow(false) } ) /** @@ -80,7 +80,7 @@ open class DynamicEndpoint( get() = endpoint.metrics override suspend fun open(descriptor: MediaDescriptor) { - if (isOpen.value) { + if (isOpenFlow.value) { Logger.w(TAG, "DynamicEndpoint is already opened") return } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt index 0f9bb5474..fd209198f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt @@ -77,7 +77,7 @@ interface IEndpoint { * Whether if the endpoint is opened. * For example, if the file is opened for [FileSink]. */ - val isOpen: StateFlow + val isOpenFlow: StateFlow /** * A info to verify supported formats. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt index a88719afb..b956cabaa 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt @@ -61,11 +61,11 @@ class MediaMuxerEndpoint( override val metrics: Any get() = TODO("Not yet implemented") - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow override suspend fun open(descriptor: MediaDescriptor) { - if (isOpen.value) { + if (isOpenFlow.value) { Logger.w(TAG, "MediaMuxerEndpoint is already opened") return } @@ -121,7 +121,7 @@ class MediaMuxerEndpoint( throw t } - _isOpen.emit(true) + _isOpenFlow.emit(true) } override suspend fun write( @@ -209,7 +209,7 @@ class MediaMuxerEndpoint( numOfStreams = 0 streamIdToTrackId.clear() mediaMuxer = null - _isOpen.emit(false) + _isOpenFlow.emit(false) } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt index d95eb9607..8754895d7 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt @@ -59,8 +59,8 @@ open class CompositeEndpoint( } } - override val isOpen: StateFlow - get() = sink.isOpen + override val isOpenFlow: StateFlow + get() = sink.isOpenFlow override suspend fun open(descriptor: MediaDescriptor) { sink.open(descriptor) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt index 9967f9f8e..91ac63a37 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt @@ -17,7 +17,7 @@ abstract class AbstractSink : ISinkInternal { get() = TODO("Not yet implemented") override suspend fun open(mediaDescriptor: MediaDescriptor) { - if (isOpen.value) { + if (isOpenFlow.value) { Logger.w(TAG, "Sink is already opened") return } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FakeSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FakeSink.kt index 143173b0c..d178c6c7f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FakeSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FakeSink.kt @@ -27,12 +27,12 @@ import kotlinx.coroutines.flow.StateFlow */ class FakeSink(override val supportedSinkTypes: List = MediaSinkType.entries) : AbstractSink() { - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow override suspend fun openImpl(mediaDescriptor: MediaDescriptor) { Logger.d(TAG, "open called: $mediaDescriptor") - _isOpen.emit(true) + _isOpenFlow.emit(true) } override fun configure(config: SinkConfiguration) { @@ -54,7 +54,7 @@ class FakeSink(override val supportedSinkTypes: List = MediaSinkT override suspend fun close() { Logger.d(TAG, "close called") - _isOpen.emit(false) + _isOpenFlow.emit(false) } companion object { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FileSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FileSink.kt index eb83a1465..afea7fb4b 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FileSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/FileSink.kt @@ -32,12 +32,12 @@ class FileSink(private val coroutineContext: CoroutineContext = Dispatchers.IO) private var file: RandomAccessFile? = null - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow override suspend fun openImpl(mediaDescriptor: MediaDescriptor) { file = openLocalFile(mediaDescriptor.uri) - _isOpen.emit(true) + _isOpenFlow.emit(true) } override fun configure(config: SinkConfiguration) {} // Nothing to configure @@ -69,7 +69,7 @@ class FileSink(private val coroutineContext: CoroutineContext = Dispatchers.IO) // Ignore } finally { file = null - _isOpen.emit(false) + _isOpenFlow.emit(false) } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt index bcb151201..a4acdabc4 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt @@ -44,7 +44,7 @@ interface ISink { * Whether if the endpoint is opened. * For example, if the file is opened for [FileSink]. */ - val isOpen: StateFlow + val isOpenFlow: StateFlow /** * Metrics of the sink. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/OutputStreamSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/OutputStreamSink.kt index d25c142e9..86563aff2 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/OutputStreamSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/OutputStreamSink.kt @@ -32,8 +32,8 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext = AbstractSink() { protected var outputStream: OutputStream? = null - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow /** * Open an [OutputStream] to write data @@ -42,7 +42,7 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext = override suspend fun openImpl(mediaDescriptor: MediaDescriptor) { outputStream = openOutputStream(mediaDescriptor) - _isOpen.emit(true) + _isOpenFlow.emit(true) } override fun configure(config: SinkConfiguration) {} // Nothing to configure @@ -77,7 +77,7 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext = // Ignore } finally { outputStream = null - _isOpen.emit(false) + _isOpenFlow.emit(false) } } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt index a4a6b7846..47d846962 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/DefaultBitrateRegulatorController.kt @@ -20,7 +20,9 @@ import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.elements.utils.Scheduler import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator +import io.github.thibaultbee.streampack.core.streamers.single.IAudioSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.IVideoSingleStreamer /** * A [BitrateRegulatorController] implementation that triggers [IBitrateRegulator.update] every [delayTimeInMs]. @@ -34,7 +36,7 @@ import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleSt */ open class DefaultBitrateRegulatorController( audioEncoder: IEncoder?, - videoEncoder: IEncoder?, + videoEncoder: IEncoder, endpoint: IEndpoint, bitrateRegulatorFactory: IBitrateRegulator.Factory, bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), @@ -46,17 +48,13 @@ open class DefaultBitrateRegulatorController( bitrateRegulatorFactory, bitrateRegulatorConfig ) { - init { - requireNotNull(videoEncoder) { "Video encoder is required" } - } - /** * Bitrate regulator. Calls regularly by [scheduler]. Don't call it otherwise or you might break regulation. */ private val bitrateRegulator = bitrateRegulatorFactory.newBitrateRegulator( bitrateRegulatorConfig, { - videoEncoder!!.bitrate = it + videoEncoder.bitrate = it }, { /* Do nothing for audio */ } ) @@ -67,7 +65,7 @@ open class DefaultBitrateRegulatorController( private val scheduler = Scheduler(delayTimeInMs) { bitrateRegulator.update( endpoint.metrics, - videoEncoder?.bitrate ?: 0, + videoEncoder.bitrate, audioEncoder?.bitrate ?: 0 ) } @@ -86,9 +84,22 @@ open class DefaultBitrateRegulatorController( private val delayTimeInMs: Long = 500 ) : BitrateRegulatorController.Factory() { override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): BitrateRegulatorController { + val audioEncoder = if (streamer is IAudioSingleStreamer) { + streamer.audioEncoder + } else { + null + } + + require(streamer is IVideoSingleStreamer) { + "Streamer must be a video single streamer" + } + val videoEncoder = requireNotNull(streamer.videoEncoder) { + "Video encoder must not be null" + } + return DefaultBitrateRegulatorController( - streamer.audioEncoder, - streamer.videoEncoder, + audioEncoder, + videoEncoder, streamer.endpoint, bitrateRegulatorFactory, bitrateRegulatorConfig, diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/interfaces/IStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/interfaces/IStreamer.kt new file mode 100644 index 000000000..61e6dc96e --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/interfaces/IStreamer.kt @@ -0,0 +1,218 @@ +/* + * Copyright (C) 2024 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.streamers.interfaces + +import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor +import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource +import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource +import io.github.thibaultbee.streampack.core.streamers.single.open +import io.github.thibaultbee.streampack.core.streamers.single.startStream +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.runBlocking + +interface IStreamer + +/** + * A single Streamer based on coroutines. + */ +interface ICoroutineStreamer : IStreamer { + /** + * Returns the last throwable that occurred. + */ + val throwableFlow: StateFlow + + /** + * Returns true if the streamer is opened. + * For example, if the streamer is connected to a server if the endpoint is SRT or RTMP. + */ + val isOpenFlow: StateFlow + + /** + * Closes the streamer. + */ + suspend fun close() + + /** + * Returns true if stream is running. + */ + val isStreamingFlow: StateFlow + + /** + * Starts audio/video stream. + * + * @see [stopStream] + */ + suspend fun startStream() + + /** + * Stops audio/video stream. + * + * @see [startStream] + */ + suspend fun stopStream() + + /** + * Clean and reset the streamer. + */ + suspend fun release() +} + +/** + * Clean and reset the streamer synchronously. + * + * @see [ICoroutineStreamer.release] + */ +fun ICoroutineStreamer.releaseBlocking() = runBlocking { + release() +} + + +interface ICoroutineAudioStreamer { + /** + * Configures only audio settings. + * + * @param audioConfig Audio configuration to set + * + * @throws [Throwable] if configuration can not be applied. + */ + suspend fun setAudioConfig(audioConfig: T) +} + +interface ICoroutineVideoStreamer { + /** + * Configures only video settings. + * + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + */ + suspend fun setVideoConfig(videoConfig: T) +} + +/** + * An audio single Streamer + */ +interface IAudioStreamer { + + /** + * Advanced settings for the audio source. + */ + val audioSource: IAudioSource? +} + +/** + * A video single streamer. + */ +interface IVideoStreamer { + /** + * Advanced settings for the video source. + */ + val videoSource: IVideoSource? +} + + +interface ICallbackAudioStreamer { + /** + * Configures only audio settings. + * + * @param audioConfig Audio configuration to set + * + * @throws [Throwable] if configuration can not be applied. + */ + fun setAudioConfig(audioConfig: T) +} + +interface ICallbackVideoStreamer { + /** + * Configures only video settings. + * + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + */ + fun setVideoConfig(videoConfig: T) +} + +interface ICallbackStreamer : IStreamer { + /** + * Returns true if streamer is opened. + * For example, if the streamer is connected to a server if the endpoint is SRT or RTMP. + */ + val isOpen: Boolean + + /** + * Closes the streamer. + */ + fun close() + + /** + * Returns true if stream is running. + */ + val isStreaming: Boolean + + /** + * Starts audio/video stream asynchronously. + * + * You must call [open] before calling this method. + * The streamer must be opened before starting the stream. You can use [Listener.onIsOpenChanged]. + * + * @see [stopStream] + */ + fun startStream() + + /** + * Starts audio/video stream asynchronously. + * + * Same as doing [open] and [startStream]. + * + * @see [stopStream] + */ + fun startStream(descriptor: MediaDescriptor) + + /** + * Stops audio/video stream asynchronously. + * + * @see [startStream] + */ + fun stopStream() + + /** + * Clean and reset the streamer. + */ + fun release() + + /** + * Listener for the callback streamer. + */ + interface Listener { + /** + * Called when the streamer is opened or closed. + */ + fun onIsOpenChanged(isOpen: Boolean) = Unit + + /** + * Called when the stream is started or stopped. + */ + fun onIsStreamingChanged(isStarted: Boolean) = Unit + + /** + * Called when an error occurs. + * + * @param throwable The throwable that occurred + */ + fun onError(throwable: Throwable) = Unit + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerActivityLifeCycleObserver.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerActivityLifeCycleObserver.kt similarity index 63% rename from core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerActivityLifeCycleObserver.kt rename to core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerActivityLifeCycleObserver.kt index 50ea40f45..71fdef6fb 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerActivityLifeCycleObserver.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerActivityLifeCycleObserver.kt @@ -13,11 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.github.thibaultbee.streampack.core.streamers.observers +package io.github.thibaultbee.streampack.core.streamers.lifecycle import androidx.lifecycle.DefaultLifecycleObserver import androidx.lifecycle.LifecycleOwner -import io.github.thibaultbee.streampack.core.streamers.single.ISingleStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICallbackStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICoroutineStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.IStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.releaseBlocking /** * A [DefaultLifecycleObserver] to control a streamer on [Activity] lifecycle. @@ -29,9 +32,14 @@ import io.github.thibaultbee.streampack.core.streamers.single.ISingleStreamer * * @param streamer The streamer to control */ -open class StreamerActivityLifeCycleObserver(streamer: ISingleStreamer) : +open class StreamerActivityLifeCycleObserver(streamer: IStreamer) : StreamerViewModelLifeCycleObserver(streamer) { + override fun onDestroy(owner: LifecycleOwner) { - streamer.release() + if (streamer is ICoroutineStreamer) { + streamer.releaseBlocking() + } else if (streamer is ICallbackStreamer) { + streamer.release() + } } } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerViewModelLifeCycleObserver.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerViewModelLifeCycleObserver.kt similarity index 77% rename from core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerViewModelLifeCycleObserver.kt rename to core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerViewModelLifeCycleObserver.kt index ba7e1e3d6..930ec6050 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/observers/StreamerViewModelLifeCycleObserver.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/lifecycle/StreamerViewModelLifeCycleObserver.kt @@ -13,15 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.github.thibaultbee.streampack.core.streamers.observers +package io.github.thibaultbee.streampack.core.streamers.lifecycle import androidx.lifecycle.DefaultLifecycleObserver import androidx.lifecycle.LifecycleOwner +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICallbackStreamer import io.github.thibaultbee.streampack.core.streamers.interfaces.ICameraCallbackStreamer import io.github.thibaultbee.streampack.core.streamers.interfaces.ICameraCoroutineStreamer -import io.github.thibaultbee.streampack.core.streamers.single.ICallbackSingleStreamer -import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer -import io.github.thibaultbee.streampack.core.streamers.single.ISingleStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICoroutineStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.IStreamer import kotlinx.coroutines.runBlocking /** @@ -34,25 +34,26 @@ import kotlinx.coroutines.runBlocking * * @param streamer The streamer to control */ -open class StreamerViewModelLifeCycleObserver(protected val streamer: ISingleStreamer) : +open class StreamerViewModelLifeCycleObserver(protected val streamer: IStreamer) : DefaultLifecycleObserver { + override fun onPause(owner: LifecycleOwner) { - if (streamer is ICoroutineSingleStreamer) { + if (streamer is ICoroutineStreamer) { if (streamer is ICameraCoroutineStreamer) { runBlocking { streamer.stopPreview() } } runBlocking { streamer.stopStream() - if (streamer.endpoint.isOpen.value) { + if (streamer.isOpenFlow.value) { streamer.close() } } - } else if (streamer is ICallbackSingleStreamer) { + } else if (streamer is ICallbackStreamer) { if (streamer is ICameraCallbackStreamer) { streamer.stopPreview() } streamer.stopStream() - if (streamer.endpoint.isOpen.value) { + if (streamer.isOpen) { streamer.close() } } else { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt index 7eafac237..041e0649e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/CameraSingleStreamer.kt @@ -211,10 +211,8 @@ open class CameraSingleStreamer( /** * Same as [SingleStreamer.release] but it also calls [stopPreview]. */ - override fun release() { - runBlocking { - stopPreview() - } + override suspend fun release() { + stopPreview() super.release() } } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt index f7b004435..01cfd80b9 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/ISingleStreamer.kt @@ -15,23 +15,26 @@ */ package io.github.thibaultbee.streampack.core.streamers.single -import android.Manifest import android.net.Uri import androidx.annotation.IntRange -import androidx.annotation.RequiresPermission import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor import io.github.thibaultbee.streampack.core.elements.encoders.AudioCodecConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder import io.github.thibaultbee.streampack.core.elements.encoders.VideoCodecConfig import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint -import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource -import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource import io.github.thibaultbee.streampack.core.elements.utils.RotationValue import io.github.thibaultbee.streampack.core.elements.utils.extensions.rotationToDegrees import io.github.thibaultbee.streampack.core.regulator.controllers.IBitrateRegulatorController import io.github.thibaultbee.streampack.core.streamers.infos.IConfigurationInfo -import kotlinx.coroutines.flow.StateFlow +import io.github.thibaultbee.streampack.core.streamers.interfaces.IAudioStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICallbackAudioStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICallbackStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICallbackVideoStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICoroutineAudioStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICoroutineStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.ICoroutineVideoStreamer +import io.github.thibaultbee.streampack.core.streamers.interfaces.IVideoStreamer /** * The single streamer audio configuration. @@ -47,36 +50,6 @@ typealias VideoConfig = VideoCodecConfig * A single Streamer that is agnostic to the underlying implementation (either with coroutines or callbacks). */ interface ISingleStreamer { - /** - * Gets the audio configuration. - */ - val audioConfig: AudioConfig? - - /** - * Gets the video configuration. - */ - val videoConfig: VideoConfig? - - /** - * Advanced settings for the audio source. - */ - val audioSource: IAudioSource? - - /** - * Advanced settings for the audio encoder. - */ - val audioEncoder: IEncoder? - - /** - * Advanced settings for the video source. - */ - val videoSource: IVideoSource? - - /** - * Advanced settings for the video encoder. - */ - val videoEncoder: IEncoder? - /** * Advanced settings for the endpoint. */ @@ -98,31 +71,6 @@ interface ISingleStreamer { */ fun getInfo(descriptor: MediaDescriptor): IConfigurationInfo - /** - * Configures only audio settings. - * - * @param audioConfig Audio configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [release] - */ - fun setAudioConfig(audioConfig: AudioConfig) - - /** - * Configures only video settings. - * - * @param videoConfig Video configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [release] - */ - fun setVideoConfig(videoConfig: VideoConfig) - - /** - * Clean and reset the streamer. - */ - fun release() - /** * Adds a bitrate regulator controller to the streamer. */ @@ -141,74 +89,50 @@ val ISingleStreamer.targetRotationDegrees: Int @IntRange(from = 0, to = 359) get() = targetRotation.rotationToDegrees - /** - * Configures both video and audio settings. - * It is the first method to call after a [SingleStreamer] instantiation. - * It must be call when both stream and audio and video capture are not running. - * - * Use [IConfigurationInfo] to get value limits. - * - * If video encoder does not support [VideoConfig.level] or [VideoConfig.profile], it fallbacks - * to video encoder default level and default profile. - * - * @param audioConfig Audio configuration to set - * @param videoConfig Video configuration to set - * - * @throws [Throwable] if configuration can not be applied. - * @see [ISingleStreamer.release] - */ -@RequiresPermission(Manifest.permission.RECORD_AUDIO) -fun ISingleStreamer.setConfig(audioConfig: AudioConfig, videoConfig: VideoConfig) { - setAudioConfig(audioConfig) - setVideoConfig(videoConfig) -} - -/** - * A single Streamer based on coroutines. + * An audio single Streamer */ -interface ICoroutineSingleStreamer : ISingleStreamer { +interface IAudioSingleStreamer : IAudioStreamer { /** - * Returns the last throwable that occurred. + * Gets the audio configuration. */ - val throwable: StateFlow + val audioConfig: AudioConfig? /** - * Returns true if endpoint is opened. - * For example, if the streamer is connected to a server if the endpoint is SRT or RTMP. + * Advanced settings for the audio encoder. */ - val isOpen: StateFlow + val audioEncoder: IEncoder? +} +/** + * A video single streamer. + */ +interface IVideoSingleStreamer : IVideoStreamer { /** - * Returns true if stream is running. + * Gets the video configuration. */ - val isStreaming: StateFlow + val videoConfig: VideoConfig? /** - * Opens the streamer endpoint. - * - * @param descriptor Media descriptor to open + * Advanced settings for the video encoder. */ - suspend fun open(descriptor: MediaDescriptor) + val videoEncoder: IEncoder? +} - /** - * Closes the streamer endpoint. - */ - suspend fun close() +interface ICoroutineAudioSingleStreamer : ICoroutineAudioStreamer, IAudioSingleStreamer - /** - * Starts audio/video stream. - * - * @see [stopStream] - */ - suspend fun startStream() +interface ICoroutineVideoSingleStreamer : ICoroutineVideoStreamer, IVideoSingleStreamer +/** + * A single Streamer based on coroutines. + */ +interface ICoroutineSingleStreamer : ICoroutineStreamer, ISingleStreamer { /** - * Stops audio/video stream. + * Opens the streamer endpoint. * - * @see [startStream] + * @param descriptor Media descriptor to open */ - suspend fun stopStream() + suspend fun open(descriptor: MediaDescriptor) } /** @@ -266,18 +190,12 @@ suspend fun ICoroutineSingleStreamer.startStream(uriString: String) { startStream() } -interface ICallbackSingleStreamer : ISingleStreamer { - /** - * Returns true if endpoint is opened. - * For example, if the streamer is connected to a server if the endpoint is SRT or RTMP. - */ - val isOpen: Boolean - /** - * Returns true if stream is running. - */ - val isStreaming: Boolean +interface ICallbackAudioSingleStreamer : ICallbackAudioStreamer, IAudioSingleStreamer + +interface ICallbackVideoSingleStreamer : ICallbackVideoStreamer, IVideoSingleStreamer +interface ICallbackSingleStreamer : ICallbackStreamer, ISingleStreamer { /** * Opens the streamer endpoint asynchronously. * @@ -288,37 +206,6 @@ interface ICallbackSingleStreamer : ISingleStreamer { */ fun open(descriptor: MediaDescriptor) - /** - * Closes the streamer endpoint. - */ - fun close() - - /** - * Starts audio/video stream asynchronously. - * - * You must call [open] before calling this method. - * The streamer must be opened before starting the stream. You can use [Listener.onIsOpenChanged]. - * - * @see [stopStream] - */ - fun startStream() - - /** - * Starts audio/video stream asynchronously. - * - * Same as doing [open] and [startStream]. - * - * @see [stopStream] - */ - fun startStream(descriptor: MediaDescriptor) - - /** - * Stops audio/video stream asynchronously. - * - * @see [startStream] - */ - fun stopStream() - /** * Adds a listener to the streamer. */ @@ -332,7 +219,7 @@ interface ICallbackSingleStreamer : ISingleStreamer { /** * Listener for the callback streamer. */ - interface Listener { + interface Listener : ICallbackStreamer.Listener { /** * Called when the streamer opening failed. * @@ -340,33 +227,15 @@ interface ICallbackSingleStreamer : ISingleStreamer { */ fun onOpenFailed(t: Throwable) = Unit - /** - * Called when the streamer is opened or closed. - */ - fun onIsOpenChanged(isOpen: Boolean) = Unit - /** * Called when the streamer was closed by an error. * * @param t The reason why the streamer was closed */ fun onClose(t: Throwable) = Unit - - /** - * Called when the stream is started or stopped. - */ - fun onIsStreamingChanged(isStarted: Boolean) = Unit - - /** - * Called when an error occurs. - * - * @param throwable The throwable that occurred - */ - fun onError(throwable: Throwable) = Unit } } - /** * Opens the streamer endpoint. * diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt index 1efb8d959..7da73620e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/SingleStreamer.kt @@ -75,12 +75,12 @@ open class SingleStreamer( protected val videoSourceInternal: IVideoSourceInternal?, protected val endpointInternal: IEndpointInternal = DynamicEndpoint(context), @RotationValue defaultRotation: Int = context.displayRotation -) : ICoroutineSingleStreamer { +) : ICoroutineSingleStreamer, ICoroutineAudioSingleStreamer, ICoroutineVideoSingleStreamer { private val dispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val _throwable = MutableStateFlow(null) - override val throwable: StateFlow = _throwable + private val _throwableFlow = MutableStateFlow(null) + override val throwableFlow: StateFlow = _throwableFlow private var audioStreamId: Int? = null private var videoStreamId: Int? = null @@ -155,7 +155,7 @@ open class SingleStreamer( Logger.e(TAG, "onStreamError: Can't stop stream", t) } finally { Logger.e(TAG, "onStreamError: ${t.message}", t) - _throwable.tryEmit(t) + _throwableFlow.tryEmit(t) } } @@ -202,12 +202,12 @@ open class SingleStreamer( override val endpoint: IEndpoint get() = endpointInternal - override val isOpen: StateFlow - get() = endpointInternal.isOpen + override val isOpenFlow: StateFlow + get() = endpointInternal.isOpenFlow - private val _isStreaming = MutableStateFlow(false) - override val isStreaming: StateFlow = _isStreaming + private val _isStreamingFlow = MutableStateFlow(false) + override val isStreamingFlow: StateFlow = _isStreamingFlow /** * Whether the streamer has audio. @@ -248,7 +248,7 @@ open class SingleStreamer( override var targetRotation: Int @RotationValue get() = _targetRotation set(@RotationValue newTargetRotation) { - if (isStreaming.value) { + if (isStreamingFlow.value) { Logger.w(TAG, "Can't change rotation while streaming") pendingTargetRotation = newTargetRotation return @@ -286,7 +286,7 @@ open class SingleStreamer( * @throws [Throwable] if configuration can not be applied. */ @RequiresPermission(Manifest.permission.RECORD_AUDIO) - override fun setAudioConfig(audioConfig: AudioConfig) { + override suspend fun setAudioConfig(audioConfig: AudioConfig) { require(hasAudio) { "Do not need to set audio as it is a video only streamer" } requireNotNull(audioSourceInternal) { "Audio source must not be null" } @@ -505,7 +505,7 @@ open class SingleStreamer( * * @throws [Throwable] if configuration can not be applied. */ - override fun setVideoConfig(videoConfig: VideoConfig) { + override suspend fun setVideoConfig(videoConfig: VideoConfig) { require(hasVideo) { "Do not need to set video as it is a audio only streamer" } requireNotNull(videoSourceInternal) { "Video source must not be null" } @@ -554,8 +554,8 @@ open class SingleStreamer( * @see [stopStream] */ override suspend fun startStream() = withContext(dispatcher) { - require(isOpen.value) { "Endpoint must be opened before starting stream" } - require(!isStreaming.value) { "Stream is already running" } + require(isOpenFlow.value) { "Endpoint must be opened before starting stream" } + require(!isStreamingFlow.value) { "Stream is already running" } try { val streams = mutableListOf() @@ -592,7 +592,7 @@ open class SingleStreamer( bitrateRegulatorController?.start() - _isStreaming.emit(true) + _isStreamingFlow.emit(true) } catch (t: Throwable) { stopStreamInternal() throw t @@ -635,7 +635,7 @@ open class SingleStreamer( audioEncoderInternal?.reset() resetVideoEncoder() - _isStreaming.emit(false) + _isStreamingFlow.emit(false) } /** @@ -672,7 +672,7 @@ open class SingleStreamer( * * @see [setAudioConfig] */ - override fun release() { + override suspend fun release() { // Sources audioSourceInternal?.release() val videoSource = videoSourceInternal @@ -708,7 +708,7 @@ open class SingleStreamer( override fun addBitrateRegulatorController(controllerFactory: IBitrateRegulatorController.Factory) { bitrateRegulatorController?.stop() bitrateRegulatorController = controllerFactory.newBitrateRegulatorController(this).apply { - if (isStreaming.value) { + if (isStreamingFlow.value) { this.start() } Logger.d( diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt index cb32da6fb..d3aa4a9a6 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/streamers/single/callbacks/CallbackSingleStreamer.kt @@ -10,9 +10,13 @@ import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSource import io.github.thibaultbee.streampack.core.elements.utils.RotationValue import io.github.thibaultbee.streampack.core.regulator.controllers.IBitrateRegulatorController import io.github.thibaultbee.streampack.core.streamers.infos.IConfigurationInfo +import io.github.thibaultbee.streampack.core.streamers.interfaces.releaseBlocking import io.github.thibaultbee.streampack.core.streamers.single.AudioConfig +import io.github.thibaultbee.streampack.core.streamers.single.ICallbackAudioSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.ICallbackSingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.ICallbackVideoSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.SingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.VideoConfig import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException import kotlinx.coroutines.CoroutineScope @@ -30,8 +34,8 @@ import kotlinx.coroutines.launch * * @param streamer the [ICoroutineSingleStreamer] to use */ -open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : - ICallbackSingleStreamer { +open class CallbackSingleStreamer(val streamer: SingleStreamer) : + ICallbackSingleStreamer, ICallbackAudioSingleStreamer, ICallbackVideoSingleStreamer { protected val coroutineScope: CoroutineScope = CoroutineScope( SupervisorJob() + Dispatchers.Default ) @@ -59,9 +63,9 @@ open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : get() = streamer.info override val isOpen: Boolean - get() = streamer.isOpen.value + get() = streamer.isOpenFlow.value override val isStreaming: Boolean - get() = streamer.isStreaming.value + get() = streamer.isStreamingFlow.value override var targetRotation: Int @RotationValue @@ -72,23 +76,23 @@ open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : init { coroutineScope.launch { - streamer.throwable.filterNotNull().filter { !it.isClosedException }.collect { e -> + streamer.throwableFlow.filterNotNull().filter { !it.isClosedException }.collect { e -> listeners.forEach { it.onError(e) } } } coroutineScope.launch { - streamer.throwable.filterNotNull().filter { it.isClosedException }.collect { e -> + streamer.throwableFlow.filterNotNull().filter { it.isClosedException }.collect { e -> listeners.forEach { it.onClose(e) } } } coroutineScope.launch { // Skip first value to avoid duplicate event - streamer.isOpen.drop(1).collect { isOpen -> + streamer.isOpenFlow.drop(1).collect { isOpen -> listeners.forEach { it.onIsOpenChanged(isOpen) } } } coroutineScope.launch { - streamer.isStreaming.collect { isStreaming -> + streamer.isStreamingFlow.collect { isStreaming -> listeners.forEach { it.onIsStreamingChanged(isStreaming) } } } @@ -100,11 +104,37 @@ open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : @RequiresPermission(Manifest.permission.RECORD_AUDIO) override fun setAudioConfig(audioConfig: AudioConfig) { - streamer.setAudioConfig(audioConfig) + coroutineScope.launch { + streamer.setAudioConfig(audioConfig) + } } override fun setVideoConfig(videoConfig: VideoConfig) { - streamer.setVideoConfig(videoConfig) + coroutineScope.launch { + streamer.setVideoConfig(videoConfig) + } + } + + + /** + * Configures both video and audio settings. + * It is the first method to call after a [SingleStreamer] instantiation. + * It must be call when both stream and audio and video capture are not running. + * + * Use [IConfigurationInfo] to get value limits. + * + * If video encoder does not support [VideoConfig.level] or [VideoConfig.profile], it fallbacks + * to video encoder default level and default profile. + * + * @param audioConfig Audio configuration to set + * @param videoConfig Video configuration to set + * + * @throws [Throwable] if configuration can not be applied. + */ + @RequiresPermission(Manifest.permission.RECORD_AUDIO) + fun setConfig(audioConfig: AudioConfig, videoConfig: VideoConfig) { + setAudioConfig(audioConfig) + setVideoConfig(videoConfig) } override fun open(descriptor: MediaDescriptor) { @@ -173,7 +203,7 @@ open class CallbackSingleStreamer(val streamer: ICoroutineSingleStreamer) : override fun release() { - streamer.release() + streamer.releaseBlocking() listeners.clear() coroutineScope.coroutineContext.cancelChildren() } diff --git a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpointTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpointTest.kt index b804ecda0..05fc88ea7 100644 --- a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpointTest.kt +++ b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpointTest.kt @@ -19,11 +19,11 @@ class DynamicEndpointTest { private val context: Context = ApplicationProvider.getApplicationContext() @Test - fun `isOpen test`() = runTest { + fun `isOpenFlow test`() = runTest { val dynamicEndpoint = DynamicEndpoint(context) - assertFalse(dynamicEndpoint.isOpen.value) + assertFalse(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.open(DescriptorUtils.createFileDescriptor("dynamic.ts")) - assertTrue(dynamicEndpoint.isOpen.value) + assertTrue(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.close() } @@ -31,7 +31,7 @@ class DynamicEndpointTest { fun `test open mp4 file descriptor`() = runTest { val dynamicEndpoint = DynamicEndpoint(context) dynamicEndpoint.open(DescriptorUtils.createFileDescriptor("dynamic.mp4")) - assertTrue(dynamicEndpoint.isOpen.value) + assertTrue(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.close() } @@ -39,7 +39,7 @@ class DynamicEndpointTest { fun `test open ts file descriptor`() = runTest { val dynamicEndpoint = DynamicEndpoint(context) dynamicEndpoint.open(DescriptorUtils.createFileDescriptor("dynamic.ts")) - assertTrue(dynamicEndpoint.isOpen.value) + assertTrue(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.close() } @@ -47,7 +47,7 @@ class DynamicEndpointTest { fun `test open flv file descriptor`() = runTest { val dynamicEndpoint = DynamicEndpoint(context) dynamicEndpoint.open(DescriptorUtils.createFileDescriptor("dynamic.flv")) - assertTrue(dynamicEndpoint.isOpen.value) + assertTrue(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.close() } @@ -58,7 +58,7 @@ class DynamicEndpointTest { dynamicEndpoint.open(DescriptorUtils.createFileDescriptor("dynamic.unknown")) fail("IllegalArgumentException expected") } catch (e: Throwable) { - assertFalse(dynamicEndpoint.isOpen.value) + assertFalse(dynamicEndpoint.isOpenFlow.value) } finally { dynamicEndpoint.close() } @@ -75,7 +75,7 @@ class DynamicEndpointTest { ), containerType = MediaContainerType.FLV ) ) - assertTrue(dynamicEndpoint.isOpen.value) + assertTrue(dynamicEndpoint.isOpenFlow.value) dynamicEndpoint.close() } @@ -86,7 +86,7 @@ class DynamicEndpointTest { dynamicEndpoint.write(FakeFrames.generate(MediaFormat.MIMETYPE_AUDIO_AAC), 0) fail("Throwable expected") } catch (e: Throwable) { - assertFalse(dynamicEndpoint.isOpen.value) + assertFalse(dynamicEndpoint.isOpenFlow.value) } finally { dynamicEndpoint.close() } diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt index 17c9840b1..22e394653 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt @@ -42,12 +42,14 @@ import io.github.thibaultbee.streampack.app.utils.switchBackToFront import io.github.thibaultbee.streampack.app.utils.toggleCamera import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor import io.github.thibaultbee.streampack.core.elements.endpoints.MediaSinkType +import io.github.thibaultbee.streampack.core.elements.sources.audio.audiorecord.AudioRecordSource import io.github.thibaultbee.streampack.core.elements.sources.video.camera.CameraSettings +import io.github.thibaultbee.streampack.core.elements.sources.video.camera.isFrameRateSupported import io.github.thibaultbee.streampack.core.streamers.interfaces.ICameraStreamer -import io.github.thibaultbee.streampack.core.streamers.observers.StreamerViewModelLifeCycleObserver +import io.github.thibaultbee.streampack.core.streamers.interfaces.releaseBlocking +import io.github.thibaultbee.streampack.core.streamers.lifecycle.StreamerViewModelLifeCycleObserver import io.github.thibaultbee.streampack.core.streamers.single.startStream import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException -import io.github.thibaultbee.streampack.core.elements.sources.video.camera.isFrameRateSupported import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.DefaultSrtBitrateRegulatorController import io.github.thibaultbee.streampack.ui.views.CameraPreviewView import kotlinx.coroutines.flow.combine @@ -75,7 +77,7 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod if (streamer is ICameraStreamer) { permissions.add(Manifest.permission.CAMERA) } - if (streamer.audioSource != null) { + if (streamer.audioSource is AudioRecordSource) { permissions.add(Manifest.permission.RECORD_AUDIO) } storageRepository.endpointDescriptorFlow.asLiveData().value?.let { @@ -95,31 +97,31 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod // Streamer states val isStreaming: LiveData - get() = streamer.isStreaming.asLiveData() + get() = streamer.isStreamingFlow.asLiveData() private val _isTryingConnection = MutableLiveData() val isTryingConnection: LiveData = _isTryingConnection init { viewModelScope.launch { - streamer.throwable.filterNotNull().filter { !it.isClosedException } + streamer.throwableFlow.filterNotNull().filter { !it.isClosedException } .map { "${it.javaClass.simpleName}: ${it.message}" }.collect { _streamerError.postValue(it) } } viewModelScope.launch { - streamer.throwable.filterNotNull().filter { it.isClosedException } + streamer.throwableFlow.filterNotNull().filter { it.isClosedException } .map { "Connection lost: ${it.message}" }.collect { _endpointError.postValue(it) } } viewModelScope.launch { - streamer.isOpen + streamer.isOpenFlow .collect { Log.i(TAG, "Streamer is opened: $it") } } viewModelScope.launch { - streamer.isStreaming + streamer.isStreamingFlow .collect { Log.i(TAG, "Streamer is streaming: $it") } @@ -401,7 +403,7 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod override fun onCleared() { super.onCleared() try { - streamer.release() + streamer.releaseBlocking() } catch (t: Throwable) { Log.e(TAG, "Streamer release failed", t) } diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/usecases/BuildStreamerUseCase.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/usecases/BuildStreamerUseCase.kt index 68bedd529..37ec20702 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/usecases/BuildStreamerUseCase.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/usecases/BuildStreamerUseCase.kt @@ -4,7 +4,7 @@ import android.content.Context import io.github.thibaultbee.streampack.app.data.storage.DataStoreRepository import io.github.thibaultbee.streampack.core.streamers.single.AudioOnlySingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.CameraSingleStreamer -import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.SingleStreamer import kotlinx.coroutines.flow.first import kotlinx.coroutines.runBlocking @@ -13,13 +13,13 @@ class BuildStreamerUseCase( private val dataStoreRepository: DataStoreRepository ) { /** - * Build a new [ICoroutineStreamer] based on audio and video preferences. + * Build a new [SingleStreamer] based on audio and video preferences. * * Only create a new streamer if the previous one is not the same type. * * @param previousStreamer Previous streamer to check if we need to create a new one. */ - operator fun invoke(previousStreamer: ICoroutineSingleStreamer? = null): ICoroutineSingleStreamer { + operator fun invoke(previousStreamer: SingleStreamer? = null): SingleStreamer { val isAudioEnable = runBlocking { dataStoreRepository.isAudioEnableFlow.first() } diff --git a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt index ae75fa789..e2ccee2fa 100644 --- a/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt +++ b/demos/screenrecorder/src/main/java/io/github/thibaultbee/streampack/screenrecorder/MainActivity.kt @@ -33,6 +33,7 @@ import androidx.activity.result.contract.ActivityResultContracts import androidx.appcompat.app.AlertDialog import androidx.appcompat.app.AppCompatActivity import androidx.core.app.ActivityCompat +import androidx.lifecycle.lifecycleScope import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor import io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecHelper import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo @@ -46,6 +47,7 @@ import io.github.thibaultbee.streampack.screenrecorder.models.EndpointType import io.github.thibaultbee.streampack.screenrecorder.services.DemoScreenRecorderService import io.github.thibaultbee.streampack.screenrecorder.settings.SettingsActivity import io.github.thibaultbee.streampack.services.DefaultScreenRecorderService +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking class MainActivity : AppCompatActivity() { @@ -165,7 +167,9 @@ class MainActivity : AppCompatActivity() { resolution = configuration.video.resolution, fps = fps ) - streamer.setVideoConfig(videoConfig) + lifecycleScope.launch { + streamer.setVideoConfig(videoConfig) + } if (configuration.audio.enable) { val audioConfig = AudioConfig( @@ -181,7 +185,9 @@ class MainActivity : AppCompatActivity() { Manifest.permission.RECORD_AUDIO ) == PackageManager.PERMISSION_GRANTED ) { - streamer.setAudioConfig(audioConfig) + lifecycleScope.launch { + streamer.setAudioConfig(audioConfig) + } } else { throw SecurityException("Permission RECORD_AUDIO must have been granted!") } diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/composites/sinks/RtmpSink.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/composites/sinks/RtmpSink.kt index 54c8f7e3e..90fe3a338 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/composites/sinks/RtmpSink.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/composites/sinks/RtmpSink.kt @@ -42,8 +42,8 @@ class RtmpSink( private val supportedVideoCodecs = mutableListOf() - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow override fun configure(config: SinkConfiguration) { val videoConfig = config.streamConfigs.firstOrNull { it is VideoCodecConfig } @@ -66,7 +66,7 @@ class RtmpSink( // supportedVideoCodecs = this@RtmpSink.supportedVideoCodecs connect("${mediaDescriptor.uri} live=1 flashver=FMLE/3.0\\20(compatible;\\20FMSc/1.0)") } - _isOpen.emit(true) + _isOpenFlow.emit(true) } } @@ -76,7 +76,7 @@ class RtmpSink( return@withContext -1 } - if (!(isOpen.value)) { + if (!(isOpenFlow.value)) { Logger.w(TAG, "Socket is not connected, dropping packet") return@withContext -1 } @@ -107,7 +107,7 @@ class RtmpSink( override suspend fun close() { withContext(dispatcher) { socket?.close() - _isOpen.emit(false) + _isOpenFlow.emit(false) } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/composites/sinks/SrtSink.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/composites/sinks/SrtSink.kt index 96caaffc6..c3d81c05e 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/composites/sinks/SrtSink.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/internal/endpoints/composites/sinks/SrtSink.kt @@ -51,8 +51,8 @@ class SrtSink : AbstractSink() { get() = socket?.bistats(clear = true, instantaneous = true) ?: throw IllegalStateException("Socket is not initialized") - private val _isOpen = MutableStateFlow(false) - override val isOpen: StateFlow = _isOpen + private val _isOpenFlow = MutableStateFlow(false) + override val isOpenFlow: StateFlow = _isOpenFlow override fun configure(config: SinkConfiguration) { bitrate = config.streamConfigs.sumOf { it.startBitrate.toLong() } @@ -86,7 +86,7 @@ class SrtSink : AbstractSink() { } connect(mediaDescriptor.srtUrl) } - _isOpen.emit(true) + _isOpenFlow.emit(true) } private fun buildMsgCtrl(packet: Packet): MsgCtrl { @@ -154,7 +154,7 @@ class SrtSink : AbstractSink() { override suspend fun close() { socket?.close() - _isOpen.emit(false) + _isOpenFlow.emit(false) } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt index a28d18743..fddd6f4f3 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt @@ -18,7 +18,9 @@ package io.github.thibaultbee.streampack.ext.srt.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig import io.github.thibaultbee.streampack.core.regulator.controllers.BitrateRegulatorController import io.github.thibaultbee.streampack.core.regulator.controllers.DefaultBitrateRegulatorController +import io.github.thibaultbee.streampack.core.streamers.single.IAudioSingleStreamer import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer +import io.github.thibaultbee.streampack.core.streamers.single.IVideoSingleStreamer import io.github.thibaultbee.streampack.ext.srt.regulator.DefaultSrtBitrateRegulator import io.github.thibaultbee.streampack.ext.srt.regulator.SrtBitrateRegulator @@ -32,9 +34,22 @@ class DefaultSrtBitrateRegulatorController { private val delayTimeInMs: Long = 500 ) : BitrateRegulatorController.Factory() { override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): DefaultBitrateRegulatorController { + val audioEncoder = if (streamer is IAudioSingleStreamer) { + streamer.audioEncoder + } else { + null + } + + require(streamer is IVideoSingleStreamer) { + "Streamer must be a video single streamer" + } + val videoEncoder = requireNotNull(streamer.videoEncoder) { + "Video encoder must not be null" + } + return DefaultBitrateRegulatorController( - streamer.audioEncoder, - streamer.videoEncoder, + audioEncoder, + videoEncoder, streamer.endpoint, bitrateRegulatorFactory, bitrateRegulatorConfig, diff --git a/services/src/main/java/io/github/thibaultbee/streampack/services/DefaultScreenRecorderService.kt b/services/src/main/java/io/github/thibaultbee/streampack/services/DefaultScreenRecorderService.kt index 32c9c3740..f79f133ea 100644 --- a/services/src/main/java/io/github/thibaultbee/streampack/services/DefaultScreenRecorderService.kt +++ b/services/src/main/java/io/github/thibaultbee/streampack/services/DefaultScreenRecorderService.kt @@ -121,14 +121,14 @@ abstract class DefaultScreenRecorderService( streamer = createStreamer(customBundle).apply { lifecycleScope.launch { - throwable.filterNotNull().collect { t -> + throwableFlow.filterNotNull().collect { t -> Logger.e(TAG, "An error occurred", t) onErrorNotification(t)?.let { notify(it) } stopSelf() } } lifecycleScope.launch { - isOpen.collect { isOpen -> + isOpenFlow.collect { isOpen -> if (isOpen) { Logger.i(TAG, "Open succeeded") onOpenNotification()?.let { notify(it) } @@ -185,8 +185,8 @@ abstract class DefaultScreenRecorderService( runBlocking { streamer?.stopStream() streamer?.close() + streamer?.release() } - streamer?.release() streamer = null Log.i(TAG, "Service destroyed") }