From e5714a10e83fdec460a9bc13aa741c4e2925c18a Mon Sep 17 00:00:00 2001 From: shogo4405 Date: Thu, 15 Aug 2024 16:39:20 +0900 Subject: [PATCH] Refactoring RTMPStream, SRTStream. --- SRTHaishinKit/SRTConnection.swift | 8 +-- SRTHaishinKit/SRTStream.swift | 62 ++++++++++++++-------- Sources/Mixer/MediaMixerOutput.swift | 3 ++ Sources/RTMP/FLVFoundation.swift | 2 +- Sources/RTMP/RTMPConnection.swift | 6 ++- Sources/RTMP/RTMPMessage.swift | 13 +++-- Sources/RTMP/RTMPStream.swift | 79 +++++++++++++++++----------- 7 files changed, 111 insertions(+), 62 deletions(-) diff --git a/SRTHaishinKit/SRTConnection.swift b/SRTHaishinKit/SRTConnection.swift index a6638b375..48665454c 100644 --- a/SRTHaishinKit/SRTConnection.swift +++ b/SRTHaishinKit/SRTConnection.swift @@ -6,9 +6,9 @@ import libsrt public actor SRTConnection { /// The error comain codes. public enum Error: Swift.Error { - // The uri isn’t supported. - case notSupportedUri(_ uri: URL?) - // The fail to connect. + /// The uri isn’t supported. + case unsupportedUri(_ uri: URL?) + /// The fail to connect. case failedToConnect(_ message: String, reson: Int32) } @@ -47,7 +47,7 @@ public actor SRTConnection { /// Open a two-way connection to an application on SRT Server. public func open(_ uri: URL?, mode: SRTMode = .caller) async throws { guard let uri = uri, let scheme = uri.scheme, let host = uri.host, let port = uri.port, scheme == "srt" else { - throw Error.notSupportedUri(uri) + throw Error.unsupportedUri(uri) } do { try await withCheckedThrowingContinuation { continuation in diff --git a/SRTHaishinKit/SRTStream.swift b/SRTHaishinKit/SRTStream.swift index 39a50a245..93d5b748c 100644 --- a/SRTHaishinKit/SRTStream.swift +++ b/SRTHaishinKit/SRTStream.swift @@ -14,8 +14,8 @@ public actor SRTStream { private lazy var mediaLink = MediaLink() private lazy var mediaCodec = MediaCodec() private var outputs: [any HKStreamOutput] = [] - private var bitrateStorategy: (any HKStreamBitRateStrategy)? private var audioPlayerNode: AudioPlayerNode? + private var bitrateStorategy: (any HKStreamBitRateStrategy)? /// Creates a new stream object. public init(connection: SRTConnection) { @@ -52,12 +52,12 @@ public actor SRTStream { mediaCodec.startRunning() Task { for try await buffer in mediaCodec.video where mediaCodec.isRunning { - writer.append(buffer) + append(buffer) } } Task { for await buffer in mediaCodec.audio where mediaCodec.isRunning { - writer.append(buffer.0, when: buffer.1) + append(buffer.0, when: buffer.1) } } Task { @@ -96,22 +96,19 @@ public actor SRTStream { } } Task { - guard let audioPlayerNode else { - return - } - await audioPlayerNode.startRunning() + await audioPlayerNode?.startRunning() for await audio in mediaCodec.audio where mediaCodec.isRunning { - await audioPlayerNode.enqueue(audio.0, when: audio.1) + append(audio.0, when: audio.1) } } Task { for try await buffer in reader.output where mediaCodec.isRunning { - mediaCodec.append(buffer.1) + append(buffer.1) } } Task { for await video in await mediaLink.dequeue where mediaCodec.isRunning { - outputs.forEach { $0.stream(self, didOutput: video) } + append(video) } } await connection?.listen() @@ -158,12 +155,43 @@ extension SRTStream: HKStream { } public func append(_ sampleBuffer: CMSampleBuffer) { - switch sampleBuffer.formatDescription?.mediaType { - case .video?: + guard sampleBuffer.formatDescription?.mediaType == .video else { + return + } + switch readyState { + case .publishing: if sampleBuffer.formatDescription?.isCompressed == true { + writer.append(sampleBuffer) } else { mediaCodec.append(sampleBuffer) - outputs.forEach { $0.stream(self, didOutput: sampleBuffer) } + } + default: + break + } + if sampleBuffer.formatDescription?.isCompressed == false { + outputs.forEach { $0.stream(self, didOutput: sampleBuffer) } + } + } + + public func append(_ audioBuffer: AVAudioBuffer, when: AVAudioTime) { + switch readyState { + case .playing: + switch audioBuffer { + case let audioBuffer as AVAudioPCMBuffer: + Task { await audioPlayerNode?.enqueue(audioBuffer, when: when) } + outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) } + default: + break + } + case .publishing: + switch audioBuffer { + case let audioBuffer as AVAudioPCMBuffer: + mediaCodec.append(audioBuffer, when: when) + outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) } + case let audioBuffer as AVAudioCompressedBuffer: + writer.append(audioBuffer, when: when) + default: + break } default: break @@ -177,14 +205,6 @@ extension SRTStream: HKStream { } } - public func append(_ buffer: AVAudioBuffer, when: AVAudioTime) { - guard buffer is AVAudioPCMBuffer else { - return - } - mediaCodec.append(buffer, when: when) - outputs.forEach { $0.stream(self, didOutput: buffer, when: when) } - } - public func addOutput(_ observer: some HKStreamOutput) { guard !outputs.contains(where: { $0 === observer }) else { return diff --git a/Sources/Mixer/MediaMixerOutput.swift b/Sources/Mixer/MediaMixerOutput.swift index 42d09b1a4..c43078d9e 100644 --- a/Sources/Mixer/MediaMixerOutput.swift +++ b/Sources/Mixer/MediaMixerOutput.swift @@ -1,6 +1,9 @@ import AVFoundation +/// A delegate protocol implements to receive stream output events. public protocol MediaMixerOutput: AnyObject, Sendable { + /// Tells the receiver to a video buffer incoming. func mixer(_ mixer: MediaMixer, track: UInt8, didOutput sampleBuffer: CMSampleBuffer) + /// Tells the receiver to an audio buffer incoming. func mixer(_ mixer: MediaMixer, track: UInt8, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) } diff --git a/Sources/RTMP/FLVFoundation.swift b/Sources/RTMP/FLVFoundation.swift index 299f91187..4c1482b14 100644 --- a/Sources/RTMP/FLVFoundation.swift +++ b/Sources/RTMP/FLVFoundation.swift @@ -93,7 +93,7 @@ enum FLVAudioCodec: UInt8 { } } - func audioStreamBasicDescription(_ payload: inout Data) -> AudioStreamBasicDescription? { + func audioStreamBasicDescription(_ payload: Data) -> AudioStreamBasicDescription? { guard isSupported, !payload.isEmpty else { return nil } diff --git a/Sources/RTMP/RTMPConnection.swift b/Sources/RTMP/RTMPConnection.swift index b6ce6e8bd..3fee526c6 100644 --- a/Sources/RTMP/RTMPConnection.swift +++ b/Sources/RTMP/RTMPConnection.swift @@ -4,12 +4,16 @@ import Foundation // MARK: - /// The RTMPConneciton class create a two-way RTMP connection. public actor RTMPConnection { - /// The RTMPConnection error domain code. + /// The error domain code. public enum Error: Swift.Error { + /// An invalid internal stare. case invalidState + /// The command isn’t supported. case unsupportedCommand(_ command: String) case socketErrorOccurred(_ error: any Swift.Error) + /// The requested operation timed out. case requestTimedOut + /// A request fails. case requestFailed(response: RTMPResponse) } diff --git a/Sources/RTMP/RTMPMessage.swift b/Sources/RTMP/RTMPMessage.swift index bd343401e..ab4226aa6 100644 --- a/Sources/RTMP/RTMPMessage.swift +++ b/Sources/RTMP/RTMPMessage.swift @@ -418,11 +418,18 @@ struct RTMPAudioMessage: RTMPMessage { } func makeAudioFormat() -> AVAudioFormat? { - var payload = self.payload - guard var audioStreamBasicDescription = codec.audioStreamBasicDescription(&payload) else { + switch payload[1] { + case FLVAACPacketType.seq.rawValue: + let config = AudioSpecificConfig(bytes: [UInt8](payload[codec.headerSize..