Skip to content

Commit

Permalink
Refactoring RTMPStream, SRTStream.
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Aug 15, 2024
1 parent 51cc759 commit e5714a1
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 62 deletions.
8 changes: 4 additions & 4 deletions SRTHaishinKit/SRTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import libsrt
public actor SRTConnection {
/// The error comain codes.
public enum Error: Swift.Error {
// The uri isn’t supported.
case notSupportedUri(_ uri: URL?)
// The fail to connect.
/// The uri isn’t supported.
case unsupportedUri(_ uri: URL?)
/// The fail to connect.
case failedToConnect(_ message: String, reson: Int32)
}

Expand Down Expand Up @@ -47,7 +47,7 @@ public actor SRTConnection {
/// Open a two-way connection to an application on SRT Server.
public func open(_ uri: URL?, mode: SRTMode = .caller) async throws {
guard let uri = uri, let scheme = uri.scheme, let host = uri.host, let port = uri.port, scheme == "srt" else {
throw Error.notSupportedUri(uri)
throw Error.unsupportedUri(uri)
}
do {
try await withCheckedThrowingContinuation { continuation in
Expand Down
62 changes: 41 additions & 21 deletions SRTHaishinKit/SRTStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public actor SRTStream {
private lazy var mediaLink = MediaLink()
private lazy var mediaCodec = MediaCodec()
private var outputs: [any HKStreamOutput] = []
private var bitrateStorategy: (any HKStreamBitRateStrategy)?
private var audioPlayerNode: AudioPlayerNode?
private var bitrateStorategy: (any HKStreamBitRateStrategy)?

/// Creates a new stream object.
public init(connection: SRTConnection) {
Expand Down Expand Up @@ -52,12 +52,12 @@ public actor SRTStream {
mediaCodec.startRunning()
Task {
for try await buffer in mediaCodec.video where mediaCodec.isRunning {
writer.append(buffer)
append(buffer)
}
}
Task {
for await buffer in mediaCodec.audio where mediaCodec.isRunning {
writer.append(buffer.0, when: buffer.1)
append(buffer.0, when: buffer.1)
}
}
Task {
Expand Down Expand Up @@ -96,22 +96,19 @@ public actor SRTStream {
}
}
Task {
guard let audioPlayerNode else {
return
}
await audioPlayerNode.startRunning()
await audioPlayerNode?.startRunning()
for await audio in mediaCodec.audio where mediaCodec.isRunning {
await audioPlayerNode.enqueue(audio.0, when: audio.1)
append(audio.0, when: audio.1)
}
}
Task {
for try await buffer in reader.output where mediaCodec.isRunning {
mediaCodec.append(buffer.1)
append(buffer.1)
}
}
Task {
for await video in await mediaLink.dequeue where mediaCodec.isRunning {
outputs.forEach { $0.stream(self, didOutput: video) }
append(video)
}
}
await connection?.listen()
Expand Down Expand Up @@ -158,12 +155,43 @@ extension SRTStream: HKStream {
}

public func append(_ sampleBuffer: CMSampleBuffer) {
switch sampleBuffer.formatDescription?.mediaType {
case .video?:
guard sampleBuffer.formatDescription?.mediaType == .video else {
return
}
switch readyState {
case .publishing:
if sampleBuffer.formatDescription?.isCompressed == true {
writer.append(sampleBuffer)
} else {
mediaCodec.append(sampleBuffer)
outputs.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
default:
break
}
if sampleBuffer.formatDescription?.isCompressed == false {
outputs.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
}

public func append(_ audioBuffer: AVAudioBuffer, when: AVAudioTime) {
switch readyState {
case .playing:
switch audioBuffer {
case let audioBuffer as AVAudioPCMBuffer:
Task { await audioPlayerNode?.enqueue(audioBuffer, when: when) }
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
default:
break
}
case .publishing:
switch audioBuffer {
case let audioBuffer as AVAudioPCMBuffer:
mediaCodec.append(audioBuffer, when: when)
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
case let audioBuffer as AVAudioCompressedBuffer:
writer.append(audioBuffer, when: when)
default:
break
}
default:
break
Expand All @@ -177,14 +205,6 @@ extension SRTStream: HKStream {
}
}

public func append(_ buffer: AVAudioBuffer, when: AVAudioTime) {
guard buffer is AVAudioPCMBuffer else {
return
}
mediaCodec.append(buffer, when: when)
outputs.forEach { $0.stream(self, didOutput: buffer, when: when) }
}

public func addOutput(_ observer: some HKStreamOutput) {
guard !outputs.contains(where: { $0 === observer }) else {
return
Expand Down
3 changes: 3 additions & 0 deletions Sources/Mixer/MediaMixerOutput.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import AVFoundation

/// A delegate protocol implements to receive stream output events.
public protocol MediaMixerOutput: AnyObject, Sendable {
/// Tells the receiver to a video buffer incoming.
func mixer(_ mixer: MediaMixer, track: UInt8, didOutput sampleBuffer: CMSampleBuffer)
/// Tells the receiver to an audio buffer incoming.
func mixer(_ mixer: MediaMixer, track: UInt8, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime)
}
2 changes: 1 addition & 1 deletion Sources/RTMP/FLVFoundation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ enum FLVAudioCodec: UInt8 {
}
}

func audioStreamBasicDescription(_ payload: inout Data) -> AudioStreamBasicDescription? {
func audioStreamBasicDescription(_ payload: Data) -> AudioStreamBasicDescription? {
guard isSupported, !payload.isEmpty else {
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/RTMP/RTMPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import Foundation
// MARK: -
/// The RTMPConneciton class create a two-way RTMP connection.
public actor RTMPConnection {
/// The RTMPConnection error domain code.
/// The error domain code.
public enum Error: Swift.Error {
/// An invalid internal stare.
case invalidState
/// The command isn’t supported.
case unsupportedCommand(_ command: String)
case socketErrorOccurred(_ error: any Swift.Error)
/// The requested operation timed out.
case requestTimedOut
/// A request fails.
case requestFailed(response: RTMPResponse)
}

Expand Down
13 changes: 10 additions & 3 deletions Sources/RTMP/RTMPMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,18 @@ struct RTMPAudioMessage: RTMPMessage {
}

func makeAudioFormat() -> AVAudioFormat? {
var payload = self.payload
guard var audioStreamBasicDescription = codec.audioStreamBasicDescription(&payload) else {
switch payload[1] {
case FLVAACPacketType.seq.rawValue:
let config = AudioSpecificConfig(bytes: [UInt8](payload[codec.headerSize..<payload.count]))
return config?.makeAudioFormat()
case FLVAACPacketType.raw.rawValue:
guard var audioStreamBasicDescription = codec.audioStreamBasicDescription(payload) else {
return nil
}
return AVAudioFormat(streamDescription: &audioStreamBasicDescription)
default:
return nil
}
return AVAudioFormat(streamDescription: &audioStreamBasicDescription)
}
}

Expand Down
79 changes: 47 additions & 32 deletions Sources/RTMP/RTMPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ import AVFoundation

/// An object that provides the interface to control a one-way channel over an RTMPConnection.
public actor RTMPStream {
// The RTMPStream error domain code.
/// The error domain code.
public enum Error: Swift.Error {
/// An invalid internal stare.
case invalidState
/// The requested operation timed out.
case requestTimedOut
/// A request fails.
case requestFailed(response: RTMPResponse)
}

Expand Down Expand Up @@ -289,28 +292,23 @@ public actor RTMPStream {
mediaCodec.startRunning()
Task {
await mediaLink.startRunning()
while mediaCodec.isRunning {
do {
for try await video in mediaCodec.video where mediaCodec.isRunning {
await mediaLink.enqueue(video)
}
} catch {
logger.error(error)
}
for await video in await mediaLink.dequeue where await mediaLink.isRunning {
append(video)
}
}
Task {
guard let audioPlayerNode else {
return
}
await audioPlayerNode.startRunning()
for await audio in mediaCodec.audio where mediaCodec.isRunning {
await audioPlayerNode.enqueue(audio.0, when: audio.1)
do {
for try await video in mediaCodec.video where mediaCodec.isRunning {
await mediaLink.enqueue(video)
}
} catch {
logger.error(error)
}
}
Task {
for await video in await mediaLink.dequeue where mediaCodec.isRunning {
outputs.forEach { $0.stream(self, didOutput: video) }
await audioPlayerNode?.startRunning()
for await audio in mediaCodec.audio where mediaCodec.isRunning {
append(audio.0, when: audio.1)
}
}
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
Expand Down Expand Up @@ -630,16 +628,13 @@ public actor RTMPStream {
}

private func append(_ message: RTMPAudioMessage, type: RTMPChunkType) {
let payload = message.payload
let codec = message.codec
audioTimestamp.update(message, chunkType: type)
guard message.codec.isSupported else {
return
}
switch payload[1] {
switch message.payload[1] {
case FLVAACPacketType.seq.rawValue:
let config = AudioSpecificConfig(bytes: [UInt8](payload[codec.headerSize..<payload.count]))
audioFormat = config?.makeAudioFormat()
audioFormat = message.makeAudioFormat()
case FLVAACPacketType.raw.rawValue:
if audioFormat == nil {
audioFormat = message.makeAudioFormat()
Expand Down Expand Up @@ -735,8 +730,11 @@ extension RTMPStream: HKStream {
}

public func append(_ sampleBuffer: CMSampleBuffer) {
switch sampleBuffer.formatDescription?.mediaType {
case .video:
guard sampleBuffer.formatDescription?.mediaType == .video else {
return
}
switch readyState {
case .publishing:
if sampleBuffer.formatDescription?.isCompressed == true {
let decodeTimeStamp = sampleBuffer.decodeTimeStamp.isValid ? sampleBuffer.decodeTimeStamp : sampleBuffer.presentationTimeStamp
let compositionTime = videoTimestamp.getCompositionTime(sampleBuffer)
Expand All @@ -749,23 +747,40 @@ extension RTMPStream: HKStream {
doOutput(.one, chunkStreamId: .video, message: message)
} else {
mediaCodec.append(sampleBuffer)
outputs.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
default:
break
}
if sampleBuffer.formatDescription?.isCompressed == false {
outputs.forEach { $0.stream(self, didOutput: sampleBuffer) }
}
}

public func append(_ audioBuffer: AVAudioBuffer, when: AVAudioTime) {
switch audioBuffer {
case let audioBuffer as AVAudioCompressedBuffer:
let timedelta = audioTimestamp.update(when)
guard let message = RTMPAudioMessage(streamId: id, timestamp: timedelta, audioBuffer: audioBuffer) else {
return
switch readyState {
case .playing:
switch audioBuffer {
case let audioBuffer as AVAudioPCMBuffer:
Task { await audioPlayerNode?.enqueue(audioBuffer, when: when) }
default:
break
}
case .publishing:
switch audioBuffer {
case let audioBuffer as AVAudioCompressedBuffer:
let timedelta = audioTimestamp.update(when)
guard let message = RTMPAudioMessage(streamId: id, timestamp: timedelta, audioBuffer: audioBuffer) else {
return
}
doOutput(.one, chunkStreamId: .audio, message: message)
default:
mediaCodec.append(audioBuffer, when: when)
}
doOutput(.one, chunkStreamId: .audio, message: message)
default:
mediaCodec.append(audioBuffer, when: when)
break
}
if audioBuffer is AVAudioPCMBuffer {
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
}
}

Expand Down

0 comments on commit e5714a1

Please sign in to comment.