Skip to content

Commit

Permalink
refactor(*): backport streamer interfaces from next version
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaultBee committed Jan 22, 2025
1 parent 7b857ce commit 6ad179e
Show file tree
Hide file tree
Showing 27 changed files with 451 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object StreamerUtils {
while (i < numOfLoop) {
i++
delay(pollDuration)
assertTrue(streamer.isStreaming.value)
assertTrue(streamer.isStreamingFlow.value)
}
}
streamer.stopStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ open class CombineEndpoint(protected val endpointInternals: List<IEndpointIntern

/**
* Whether at least one endpoint is open.
* This is a combination of all endpoints' [IEndpoint.isOpen].
* This is a combination of all endpoints' [IEndpoint.isOpenFlow].
*
* To verify if a specific endpoint is open, use [IEndpoint.isOpen] of the endpoint.
* To verify if a specific endpoint is open, use [IEndpoint.isOpenFlow] of the endpoint.
*/
override val isOpen: StateFlow<Boolean> =
combineStates(*endpointInternals.map { it.isOpen }.toTypedArray()) { _ ->
endpointInternals.any { it.isOpen.value }
override val isOpenFlow: StateFlow<Boolean> =
combineStates(*endpointInternals.map { it.isOpenFlow }.toTypedArray()) { _ ->
endpointInternals.any { it.isOpenFlow.value }
}

/**
Expand Down Expand Up @@ -190,7 +190,7 @@ open class CombineEndpoint(protected val endpointInternals: List<IEndpointIntern

endpointInternals.forEach { endpoint ->
try {
if (endpoint.isOpen.value) {
if (endpoint.isOpenFlow.value) {
val endpointStreamId = endpointsToStreamIdsMap[Pair(endpoint, streamPid)]!!
endpoint.write(frame, endpointStreamId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ open class DynamicEndpoint(
private var rtmpEndpoint: IEndpointInternal? = null

@OptIn(ExperimentalCoroutinesApi::class)
override val isOpen: StateFlow<Boolean> = DerivedStateFlow(
getValue = { _endpoint?.isOpen?.value ?: false },
flow = endpointFlow.flatMapLatest { it?.isOpen ?: MutableStateFlow(false) }
override val isOpenFlow: StateFlow<Boolean> = DerivedStateFlow(
getValue = { _endpoint?.isOpenFlow?.value ?: false },
flow = endpointFlow.flatMapLatest { it?.isOpenFlow ?: MutableStateFlow(false) }
)

/**
Expand All @@ -80,7 +80,7 @@ open class DynamicEndpoint(
get() = endpoint.metrics

override suspend fun open(descriptor: MediaDescriptor) {
if (isOpen.value) {
if (isOpenFlow.value) {
Logger.w(TAG, "DynamicEndpoint is already opened")
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ interface IEndpoint {
* Whether if the endpoint is opened.
* For example, if the file is opened for [FileSink].
*/
val isOpen: StateFlow<Boolean>
val isOpenFlow: StateFlow<Boolean>

/**
* A info to verify supported formats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ class MediaMuxerEndpoint(
override val metrics: Any
get() = TODO("Not yet implemented")

private val _isOpen = MutableStateFlow(false)
override val isOpen: StateFlow<Boolean> = _isOpen
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow

override suspend fun open(descriptor: MediaDescriptor) {
if (isOpen.value) {
if (isOpenFlow.value) {
Logger.w(TAG, "MediaMuxerEndpoint is already opened")
return
}
Expand Down Expand Up @@ -121,7 +121,7 @@ class MediaMuxerEndpoint(
throw t
}

_isOpen.emit(true)
_isOpenFlow.emit(true)
}

override suspend fun write(
Expand Down Expand Up @@ -209,7 +209,7 @@ class MediaMuxerEndpoint(
numOfStreams = 0
streamIdToTrackId.clear()
mediaMuxer = null
_isOpen.emit(false)
_isOpenFlow.emit(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ open class CompositeEndpoint(
}
}

override val isOpen: StateFlow<Boolean>
get() = sink.isOpen
override val isOpenFlow: StateFlow<Boolean>
get() = sink.isOpenFlow

override suspend fun open(descriptor: MediaDescriptor) {
sink.open(descriptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ abstract class AbstractSink : ISinkInternal {
get() = TODO("Not yet implemented")

override suspend fun open(mediaDescriptor: MediaDescriptor) {
if (isOpen.value) {
if (isOpenFlow.value) {
Logger.w(TAG, "Sink is already opened")
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import kotlinx.coroutines.flow.StateFlow
*/
class FakeSink(override val supportedSinkTypes: List<MediaSinkType> = MediaSinkType.entries) :
AbstractSink() {
private val _isOpen = MutableStateFlow(false)
override val isOpen: StateFlow<Boolean> = _isOpen
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow

override suspend fun openImpl(mediaDescriptor: MediaDescriptor) {
Logger.d(TAG, "open called: $mediaDescriptor")
_isOpen.emit(true)
_isOpenFlow.emit(true)
}

override fun configure(config: SinkConfiguration) {
Expand All @@ -54,7 +54,7 @@ class FakeSink(override val supportedSinkTypes: List<MediaSinkType> = MediaSinkT

override suspend fun close() {
Logger.d(TAG, "close called")
_isOpen.emit(false)
_isOpenFlow.emit(false)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class FileSink(private val coroutineContext: CoroutineContext = Dispatchers.IO)

private var file: RandomAccessFile? = null

private val _isOpen = MutableStateFlow(false)
override val isOpen: StateFlow<Boolean> = _isOpen
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow

override suspend fun openImpl(mediaDescriptor: MediaDescriptor) {
file = openLocalFile(mediaDescriptor.uri)
_isOpen.emit(true)
_isOpenFlow.emit(true)
}

override fun configure(config: SinkConfiguration) {} // Nothing to configure
Expand Down Expand Up @@ -69,7 +69,7 @@ class FileSink(private val coroutineContext: CoroutineContext = Dispatchers.IO)
// Ignore
} finally {
file = null
_isOpen.emit(false)
_isOpenFlow.emit(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ interface ISink {
* Whether if the endpoint is opened.
* For example, if the file is opened for [FileSink].
*/
val isOpen: StateFlow<Boolean>
val isOpenFlow: StateFlow<Boolean>

/**
* Metrics of the sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext =
AbstractSink() {
protected var outputStream: OutputStream? = null

private val _isOpen = MutableStateFlow(false)
override val isOpen: StateFlow<Boolean> = _isOpen
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow

/**
* Open an [OutputStream] to write data
Expand All @@ -42,7 +42,7 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext =

override suspend fun openImpl(mediaDescriptor: MediaDescriptor) {
outputStream = openOutputStream(mediaDescriptor)
_isOpen.emit(true)
_isOpenFlow.emit(true)
}

override fun configure(config: SinkConfiguration) {} // Nothing to configure
Expand Down Expand Up @@ -77,7 +77,7 @@ abstract class OutputStreamSink(private val coroutineContext: CoroutineContext =
// Ignore
} finally {
outputStream = null
_isOpen.emit(false)
_isOpenFlow.emit(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint
import io.github.thibaultbee.streampack.core.elements.utils.Scheduler
import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator
import io.github.thibaultbee.streampack.core.streamers.single.IAudioSingleStreamer
import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleStreamer
import io.github.thibaultbee.streampack.core.streamers.single.IVideoSingleStreamer

/**
* A [BitrateRegulatorController] implementation that triggers [IBitrateRegulator.update] every [delayTimeInMs].
Expand All @@ -34,7 +36,7 @@ import io.github.thibaultbee.streampack.core.streamers.single.ICoroutineSingleSt
*/
open class DefaultBitrateRegulatorController(
audioEncoder: IEncoder?,
videoEncoder: IEncoder?,
videoEncoder: IEncoder,
endpoint: IEndpoint,
bitrateRegulatorFactory: IBitrateRegulator.Factory,
bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(),
Expand All @@ -46,17 +48,13 @@ open class DefaultBitrateRegulatorController(
bitrateRegulatorFactory,
bitrateRegulatorConfig
) {
init {
requireNotNull(videoEncoder) { "Video encoder is required" }
}

/**
* Bitrate regulator. Calls regularly by [scheduler]. Don't call it otherwise or you might break regulation.
*/
private val bitrateRegulator = bitrateRegulatorFactory.newBitrateRegulator(
bitrateRegulatorConfig,
{
videoEncoder!!.bitrate = it
videoEncoder.bitrate = it
},
{ /* Do nothing for audio */ }
)
Expand All @@ -67,7 +65,7 @@ open class DefaultBitrateRegulatorController(
private val scheduler = Scheduler(delayTimeInMs) {
bitrateRegulator.update(
endpoint.metrics,
videoEncoder?.bitrate ?: 0,
videoEncoder.bitrate,
audioEncoder?.bitrate ?: 0
)
}
Expand All @@ -86,9 +84,22 @@ open class DefaultBitrateRegulatorController(
private val delayTimeInMs: Long = 500
) : BitrateRegulatorController.Factory() {
override fun newBitrateRegulatorController(streamer: ICoroutineSingleStreamer): BitrateRegulatorController {
val audioEncoder = if (streamer is IAudioSingleStreamer) {
streamer.audioEncoder
} else {
null
}

require(streamer is IVideoSingleStreamer) {
"Streamer must be a video single streamer"
}
val videoEncoder = requireNotNull(streamer.videoEncoder) {
"Video encoder must not be null"
}

return DefaultBitrateRegulatorController(
streamer.audioEncoder,
streamer.videoEncoder,
audioEncoder,
videoEncoder,
streamer.endpoint,
bitrateRegulatorFactory,
bitrateRegulatorConfig,
Expand Down
Loading

0 comments on commit 6ad179e

Please sign in to comment.