Skip to content

Commit

Permalink
actor RTMPConnection and RTMPStream.
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Aug 7, 2024
1 parent b150467 commit 2861e5b
Show file tree
Hide file tree
Showing 71 changed files with 2,716 additions and 3,792 deletions.
69 changes: 27 additions & 42 deletions Examples/iOS/IngestViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ final class IngestViewController: UIViewController {
private var retryCount: Int = 0
private var preferedStereo = false
private let netStreamSwitcher: NetStreamSwitcher = .init()
private var stream: (any IOStreamConvertible)? {
return netStreamSwitcher.stream
}
private var mixer = IOMixer()
private lazy var audioCapture: AudioCapture = {
let audioCapture = AudioCapture()
Expand All @@ -37,7 +34,15 @@ final class IngestViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()

netStreamSwitcher.uri = Preference.default.uri ?? ""
Task {
await netStreamSwitcher.setPreference(Preference.default)
if let stream = await netStreamSwitcher.stream {
mixer.addStream(stream)
if let view = view as? (any IOStreamObserver) {
await stream.addObserver(view)
}
}
}

mixer.screen.size = .init(width: 720, height: 1280)
mixer.screen.backgroundColor = UIColor.white.cgColor
Expand All @@ -55,15 +60,14 @@ final class IngestViewController: UIViewController {
mixer.videoOrientation = orientation
}
mixer.isMonitoringEnabled = DeviceUtil.isHeadphoneConnected()
// stream?.audioSettings.bitRate = 64 * 1000
// stream?.bitrateStrategy = IOStreamVideoAdaptiveBitRateStrategy(mamimumVideoBitrate: VideoCodecSettings.default.bitRate)
videoBitrateSlider?.value = Float(VideoCodecSettings.default.bitRate) / 1000
audioBitrateSlider?.value = Float(AudioCodecSettings.default.bitRate) / 1000
}

override func viewWillAppear(_ animated: Bool) {
logger.info("viewWillAppear")
super.viewWillAppear(animated)

mixer.videoMixerSettings.mode = .offscreen
mixer.screen.startRunning()

Expand All @@ -76,19 +80,15 @@ final class IngestViewController: UIViewController {
videoUnit?.isVideoMirrored = true
}
}
stream.map {
(view as? (any IOStreamView))?.attachStream($0)
}
NotificationCenter.default.addObserver(self, selector: #selector(on(_:)), name: UIDevice.orientationDidChangeNotification, object: nil)
NotificationCenter.default.addObserver(self, selector: #selector(didInterruptionNotification(_:)), name: AVAudioSession.interruptionNotification, object: nil)
NotificationCenter.default.addObserver(self, selector: #selector(didRouteChangeNotification(_:)), name: AVAudioSession.routeChangeNotification, object: nil)
}

override func viewWillDisappear(_ animated: Bool) {
logger.info("viewWillDisappear")
super.viewWillDisappear(animated)
(stream as? RTMPStream)?.close()
Task {
await netStreamSwitcher.close()
try? await mixer.attachAudio(nil)
try? await mixer.attachCamera(nil, track: 0)
try? await mixer.attachCamera(nil, track: 1)
Expand Down Expand Up @@ -156,24 +156,30 @@ final class IngestViewController: UIViewController {
}

@IBAction func on(pause: UIButton) {
(stream as? RTMPStream)?.paused.toggle()
Task {
if let stream = await netStreamSwitcher.stream as? RTMPStream {
_ = try? await stream.pause(true)
}
}
}

@IBAction func on(close: UIButton) {
self.dismiss(animated: true, completion: nil)
}

@IBAction func on(publish: UIButton) {
if publish.isSelected {
UIApplication.shared.isIdleTimerDisabled = false
netStreamSwitcher.close()
publish.setTitle("", for: [])
} else {
UIApplication.shared.isIdleTimerDisabled = true
netStreamSwitcher.open(.ingest)
publish.setTitle("", for: [])
Task {
if publish.isSelected {
UIApplication.shared.isIdleTimerDisabled = false
await netStreamSwitcher.close()
publish.setTitle("", for: [])
} else {
UIApplication.shared.isIdleTimerDisabled = true
await netStreamSwitcher.open(.ingest)
publish.setTitle("", for: [])
}
publish.isSelected.toggle()
}
publish.isSelected.toggle()
}

func tapScreen(_ gesture: UIGestureRecognizer) {
Expand Down Expand Up @@ -287,27 +293,6 @@ final class IngestViewController: UIViewController {
}
}

/*
extension IngestViewController: IOStreamRecorderDelegate {
// MARK: IOStreamRecorderDelegate
func recorder(_ recorder: IOStreamRecorder, errorOccured error: IOStreamRecorder.Error) {
logger.error(error)
}
func recorder(_ recorder: IOStreamRecorder, finishWriting writer: AVAssetWriter) {
PHPhotoLibrary.shared().performChanges({() -> Void in
PHAssetChangeRequest.creationRequestForAssetFromVideo(atFileURL: writer.outputURL)
}, completionHandler: { _, error -> Void in
do {
try FileManager.default.removeItem(at: writer.outputURL)
} catch {
logger.warn(error)
}
})
}
}
*/

extension IngestViewController: AudioCaptureDelegate {
// MARK: AudioCaptureDelegate
func audioCapture(_ audioCapture: AudioCapture, buffer: AVAudioBuffer, time: AVAudioTime) {
Expand Down
154 changes: 56 additions & 98 deletions Examples/iOS/NetStreamSwitcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,144 +3,102 @@ import Foundation
import HaishinKit
import SRTHaishinKit

final class NetStreamSwitcher {
private static let maxRetryCount: Int = 5
actor NetStreamSwitcher {
static let maxRetryCount: Int = 5

enum Mode {
case rtmp
case srt

func makeStream(_ swithcer: NetStreamSwitcher) async -> any IOStreamConvertible {
switch self {
case .rtmp:
let connection = RTMPConnection()
swithcer.connection = connection
return RTMPStream(connection: connection)
case .srt:
let connection = SRTConnection()
swithcer.connection = connection
return await SRTStream(connection: connection)
}
}
}

enum Method {
case ingest
case playback
}

var uri = "" {
didSet {
if uri.contains("srt://") {
mode = .srt
return
}
mode = .rtmp
}
}
private(set) var mode: Mode = .rtmp {
didSet {
Task {
stream = await mode.makeStream(self)
}
}
}
private var retryCount = 0
private var preference: Preference?
private(set) var mode: Mode = .rtmp
private var connection: Any?
private var method: Method = .ingest
private(set) var stream: (any IOStreamConvertible)? {
didSet {
// stream?.delegate = self
private(set) var stream: (any IOStream)?

func setPreference(_ preference: Preference) async {
self.preference = preference
if preference.uri?.contains("srt://") == true {
let connection = SRTConnection()
self.connection = connection
stream = await SRTStream(connection: connection)
mode = .srt
} else {
let connection = RTMPConnection()
self.connection = connection
stream = RTMPStream(connection: connection)
mode = .rtmp
}
}

func open(_ method: Method) {
func open(_ method: Method) async {
guard let preference else {
return
}
self.method = method
switch mode {
case .rtmp:
guard let connection = connection as? RTMPConnection else {
guard
let connection = connection as? RTMPConnection,
let stream = stream as? RTMPStream else {
return
}
switch method {
case .ingest:
// Performing operations for FMLE compatibility purposes.
(stream as? RTMPStream)?.fcPublishName = Preference.default.streamName
case .playback:
break
do {
let response = try await connection.connect(preference.uri ?? "")
logger.info(response)
switch method {
case .ingest:
let response = try await stream.publish(Preference.default.streamName)
logger.info(response)
case .playback:
let response = try await stream.play(Preference.default.streamName)
logger.info(response)
}
} catch RTMPConnection.Error.requestFailed(let response) {
logger.warn(response)
} catch RTMPStream.Error.requestFailed(let response) {
logger.warn(response)
} catch {
logger.warn(error)
}
connection.addEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self)
connection.addEventListener(.ioError, selector: #selector(rtmpErrorHandler), observer: self)
connection.connect(uri)
case .srt:
guard let connection = connection as? SRTConnection, let stream = stream as? SRTStream else {
return
}
Task {
do {
try await connection.open(URL(string: uri))
switch method {
case .playback:
await stream.play()
case .ingest:
await stream.publish()
}
} catch {
logger.warn(error)
do {
try await connection.open(URL(string: preference.uri ?? ""))
switch method {
case .playback:
await stream.play()
case .ingest:
await stream.publish()
}
} catch {
logger.warn(error)
}
}
}

func close() {
func close() async {
switch mode {
case .rtmp:
guard let connection = connection as? RTMPConnection else {
return
}
connection.close()
connection.removeEventListener(.rtmpStatus, selector: #selector(rtmpStatusHandler), observer: self)
connection.removeEventListener(.ioError, selector: #selector(rtmpErrorHandler), observer: self)
try? await connection.close()
logger.info("conneciton.close")
case .srt:
guard let connection = connection as? SRTConnection else {
return
}
Task {
await connection.close()
}
}
}

@objc
private func rtmpStatusHandler(_ notification: Notification) {
let e = Event.from(notification)
guard let data: ASObject = e.data as? ASObject, let code: String = data["code"] as? String else {
return
}
logger.info(code)
switch code {
case RTMPConnection.Code.connectSuccess.rawValue:
retryCount = 0
switch method {
case .playback:
(stream as? RTMPStream)?.play(Preference.default.streamName!)
case .ingest:
(stream as? RTMPStream)?.publish(Preference.default.streamName!)
}
case RTMPConnection.Code.connectFailed.rawValue, RTMPConnection.Code.connectClosed.rawValue:
guard retryCount <= NetStreamSwitcher.maxRetryCount else {
return
}
Thread.sleep(forTimeInterval: pow(2.0, Double(retryCount)))
(connection as? RTMPConnection)?.connect(uri)
retryCount += 1
default:
break
await connection.close()
logger.info("conneciton.close")
}
}

@objc
private func rtmpErrorHandler(_ notification: Notification) {
logger.error(notification)
(connection as? RTMPConnection)?.connect(Preference.default.uri!)
}
}
Loading

0 comments on commit 2861e5b

Please sign in to comment.