diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 619c48445..2ee2d4286 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -264,7 +264,7 @@ extension Room { throw LiveKitError(.invalidState) } - guard let url = _state.url, let token = _state.token else { + guard let url = _state.providedUrl, let token = _state.token else { log("[Connect] Url or token is nil", .error) throw LiveKitError(.invalidState) } @@ -332,16 +332,46 @@ extension Room { $0.connectionState = .reconnecting } - await cleanUp(isFullReconnect: true) + let (providedUrl, connectedUrl, token) = _state.read { ($0.providedUrl, $0.connectedUrl, $0.token) } - guard let url = _state.url, - let token = _state.token - else { + guard let providedUrl, let connectedUrl, let token else { log("[Connect] Url or token is nil") throw LiveKitError(.invalidState) } - try await fullConnectSequence(url, token) + var nextUrl = connectedUrl + var nextRegion: RegionInfo? + + while true { + do { + // Prepare for next connect attempt. + await cleanUp(isFullReconnect: true) + + try await fullConnectSequence(nextUrl, token) + _state.mutate { $0.connectedUrl = nextUrl } + // Exit loop on successful connection + break + } catch { + // Re-throw if is cancel. + if error is CancellationError { + throw error + } + + if let region = nextRegion { + nextRegion = nil + log("Connect failed with region: \(region)") + regionManager(addFailedRegion: region) + } + + try Task.checkCancellation() + + if providedUrl.isCloud { + let region = try await regionManagerResolveBest() + nextUrl = region.url + nextRegion = region + } + } + } } do { diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift new file mode 100644 index 000000000..a0747ebd9 --- /dev/null +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -0,0 +1,150 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +// MARK: - Room+Region + +extension Room { + static let regionManagerCacheInterval: TimeInterval = 3000 + + // MARK: - Public + + // prepareConnection should be called as soon as the page is loaded, in order + // to speed up the connection attempt. + // + // With LiveKit Cloud, it will also determine the best edge data center for + // the current client to connect to if a token is provided. + public func prepareConnection(url providedUrlString: String, token: String) { + // Must be in disconnected state. + guard _state.connectionState == .disconnected else { + log("Room is not in disconnected state", .info) + return + } + + guard let providedUrl = URL(string: providedUrlString), providedUrl.isValidForConnect else { + log("URL parse failed", .error) + return + } + + guard providedUrl.isCloud else { + log("Provided url is not a livekit cloud url", .warning) + return + } + + _state.mutate { + $0.providedUrl = providedUrl + $0.token = token + } + + regionManagerPrepareRegionSettings() + } + + // MARK: - Internal + + func regionManagerResolveBest() async throws -> RegionInfo { + try await regionManagerRequestSettings() + + guard let selectedRegion = _regionState.remaining.first else { + throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") + } + + log("[Region] Resolved region: \(String(describing: selectedRegion))") + + return selectedRegion + } + + func regionManager(addFailedRegion region: RegionInfo) { + _regionState.mutate { + $0.remaining.removeAll { $0 == region } + } + } + + func regionManagerPrepareRegionSettings() { + Task.detached { + try await self.regionManagerRequestSettings() + } + } + + func regionManager(shouldRequestSettingsForUrl providedUrl: URL) -> Bool { + guard providedUrl.isCloud else { return false } + return _regionState.read { + guard providedUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } + let interval = Date().timeIntervalSince(regionSettingsUpdated) + return interval > Self.regionManagerCacheInterval + } + } + + // MARK: - Private + + private func regionManagerRequestSettings() async throws { + let (providedUrl, token) = _state.read { ($0.providedUrl, $0.token) } + + guard let providedUrl, let token else { + throw LiveKitError(.invalidState) + } + + // Ensure url is for cloud. + guard providedUrl.isCloud else { + throw LiveKitError(.onlyForCloud) + } + + guard regionManager(shouldRequestSettingsForUrl: providedUrl) else { + return + } + + // Make a request which ignores cache. + var request = URLRequest(url: providedUrl.regionSettingsUrl(), + cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) + + request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") + + log("[Region] Requesting region settings...") + + let (data, response) = try await URLSession.shared.data(for: request) + // Response must be a HTTPURLResponse. + guard let httpResponse = response as? HTTPURLResponse else { + throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings") + } + + // Check the status code. + guard httpResponse.isStatusCodeOK else { + log("[Region] Failed to fetch region settings, error: \(String(describing: httpResponse))", .error) + throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings with status code: \(httpResponse.statusCode)") + } + + do { + // Try to parse the JSON data. + let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) + let allRegions = regionSettings.regions.compactMap { $0.toLKType() } + + if allRegions.isEmpty { + throw LiveKitError(.regionUrlProvider, message: "Fetched region data is empty.") + } + + log("[Region] all regions: \(String(describing: allRegions))") + + _regionState.mutate { + $0.url = providedUrl + $0.all = allRegions + $0.remaining = allRegions + $0.lastRequested = Date() + } + } catch { + throw LiveKitError(.regionUrlProvider, message: "Failed to parse region settings with error: \(error)") + } + } +} diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index c44122611..78856835d 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -77,7 +77,7 @@ public class Room: NSObject, ObservableObject, Loggable { // expose engine's vars @objc - public var url: String? { _state.url?.absoluteString } + public var url: String? { _state.providedUrl?.absoluteString } @objc public var token: String? { _state.token } @@ -134,8 +134,10 @@ public class Room: NSObject, ObservableObject, Loggable { var serverInfo: Livekit_ServerInfo? // Engine - var url: URL? + var providedUrl: URL? + var connectedUrl: URL? var token: String? + // preferred reconnect mode which will be used only for next attempt var nextReconnectMode: ReconnectMode? var isReconnectingWithMode: ReconnectMode? @@ -168,7 +170,16 @@ public class Room: NSObject, ObservableObject, Loggable { } } + struct RegionState { + // Region + var url: URL? + var lastRequested: Date? + var all: [RegionInfo] = [] + var remaining: [RegionInfo] = [] + } + let _state: StateSync + let _regionState = StateSync(RegionState()) private let _sidCompleter = AsyncCompleter(label: "sid", defaultTimeout: .resolveSid) @@ -278,12 +289,12 @@ public class Room: NSObject, ObservableObject, Loggable { } @objc - public func connect(url: String, + public func connect(url urlString: String, token: String, connectOptions: ConnectOptions? = nil, roomOptions: RoomOptions? = nil) async throws { - guard let url = URL(string: url), url.isValidForConnect else { + guard let providedUrl = URL(string: urlString), providedUrl.isValidForConnect else { log("URL parse failed", .error) throw LiveKitError(.failedToParseUrl) } @@ -315,28 +326,68 @@ public class Room: NSObject, ObservableObject, Loggable { try Task.checkCancellation() - _state.mutate { $0.connectionState = .connecting } + _state.mutate { + $0.providedUrl = providedUrl + $0.token = token + $0.connectionState = .connecting + } + + var nextUrl = providedUrl + var nextRegion: RegionInfo? + + if providedUrl.isCloud { + if regionManager(shouldRequestSettingsForUrl: providedUrl) { + regionManagerPrepareRegionSettings() + } else { + // If region info already available, use it instead of provided url. + let region = try await regionManagerResolveBest() + nextUrl = region.url + nextRegion = region + } + } do { - try await fullConnectSequence(url, token) + while true { + do { + try await fullConnectSequence(nextUrl, token) + // Connect sequence successful + log("Connect sequence completed") + // Final check if cancelled, don't fire connected events + try Task.checkCancellation() + + _state.mutate { + $0.connectedUrl = nextUrl + $0.connectionState = .connected + } + // Exit loop on successful connection + break + } catch { + // Re-throw if is cancel. + if error is CancellationError { + throw error + } - // Connect sequence successful - log("Connect sequence completed") + if let region = nextRegion { + nextRegion = nil + log("Connect failed with region: \(region)") + regionManager(addFailedRegion: region) + } - // Final check if cancelled, don't fire connected events - try Task.checkCancellation() + try Task.checkCancellation() + // Prepare for next connect attempt. + await cleanUp(isFullReconnect: true) - // update internal vars (only if connect succeeded) - _state.mutate { - $0.url = url - $0.token = token - $0.connectionState = .connected + if providedUrl.isCloud { + let region = try await regionManagerResolveBest() + nextUrl = region.url + nextRegion = region + } + } } - } catch { + log("Failed to resolve a region or connect: \(error)") await cleanUp(withError: error) - // Re-throw error - throw error + throw error // Re-throw the original error } log("Connected to \(String(describing: self))", .info) @@ -386,7 +437,8 @@ extension Room { $0 = isFullReconnect ? State( connectOptions: $0.connectOptions, roomOptions: $0.roomOptions, - url: $0.url, + providedUrl: $0.providedUrl, + connectedUrl: $0.connectedUrl, token: $0.token, nextReconnectMode: $0.nextReconnectMode, isReconnectingWithMode: $0.isReconnectingWithMode, diff --git a/Sources/LiveKit/Errors.swift b/Sources/LiveKit/Errors.swift index 40f18fe0e..cc04c2a54 100644 --- a/Sources/LiveKit/Errors.swift +++ b/Sources/LiveKit/Errors.swift @@ -51,6 +51,10 @@ public enum LiveKitErrorType: Int, Sendable { case unableToResolveFPSRange = 703 case capturerDimensionsNotResolved = 704 case deviceAccessDenied = 705 + + // LiveKit Cloud + case onlyForCloud = 901 + case regionUrlProvider = 902 } extension LiveKitErrorType: CustomStringConvertible { diff --git a/Sources/LiveKit/Extensions/URL.swift b/Sources/LiveKit/Extensions/URL.swift index 18873b440..a72596dae 100644 --- a/Sources/LiveKit/Extensions/URL.swift +++ b/Sources/LiveKit/Extensions/URL.swift @@ -28,4 +28,33 @@ extension URL { var isSecure: Bool { scheme == "https" || scheme == "wss" } + + /// Checks whether the URL is a LiveKit Cloud URL. + var isCloud: Bool { + guard let host else { return false } + return host.hasSuffix(".livekit.cloud") || host.hasSuffix(".livekit.run") + } + + func cloudConfigUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + components.path = "/settings" + return components.url! + } + + func regionSettingsUrl() -> URL { + cloudConfigUrl().appendingPathComponent("/regions") + } + + func toSocketUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "http", with: "ws") + return components.url! + } + + func toHTTPUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + return components.url! + } } diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index ef31754c5..6d1a54abe 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -283,6 +283,12 @@ extension MutableCollection { } } +extension HTTPURLResponse { + var isStatusCodeOK: Bool { + (200 ... 299).contains(statusCode) + } +} + func computeAttributesDiff(oldValues: [String: String], newValues: [String: String]) -> [String: String] { let allKeys = Set(oldValues.keys).union(newValues.keys) var diff = [String: String]() diff --git a/Sources/LiveKit/Types/RegionInfo.swift b/Sources/LiveKit/Types/RegionInfo.swift new file mode 100644 index 000000000..a983bf18d --- /dev/null +++ b/Sources/LiveKit/Types/RegionInfo.swift @@ -0,0 +1,58 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +@objc +public class RegionInfo: NSObject { + let regionId: String + let url: URL + let distance: Int64 + + init?(region: String, url: String, distance: Int64) { + guard let url = URL(string: url) else { return nil } + regionId = region + self.url = url + self.distance = distance + } + + // MARK: - Equal + + override public func isEqual(_ object: Any?) -> Bool { + guard let other = object as? Self else { return false } + return regionId == other.regionId + } + + override public var hash: Int { + var hasher = Hasher() + hasher.combine(regionId) + return hasher.finalize() + } + + // + + override public var description: String { + "RegionInfo(id: \(regionId), url: \(url), distance: \(distance))" + } +} + +extension Livekit_RegionInfo { + func toLKType() -> RegionInfo? { + RegionInfo(region: region, + url: url, + distance: distance) + } +} diff --git a/Tests/LiveKitTests/RegionUrlProviderTests.swift b/Tests/LiveKitTests/RegionUrlProviderTests.swift new file mode 100644 index 000000000..24749beb9 --- /dev/null +++ b/Tests/LiveKitTests/RegionUrlProviderTests.swift @@ -0,0 +1,91 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class RegionUrlProviderTests: XCTestCase { + func testResolveUrl() async throws { + let room = Room() + + let testCacheInterval: TimeInterval = 3 + // Test data. + let testRegionSettings = [Livekit_RegionInfo.with { + $0.region = "otokyo1a" + $0.url = "https://example.otokyo1a.production.livekit.cloud" + $0.distance = 32838 + }, + Livekit_RegionInfo.with { + $0.region = "dblr1a" + $0.url = "https://example.dblr1a.production.livekit.cloud" + $0.distance = 6_660_301 + }, + Livekit_RegionInfo.with { + $0.region = "dsyd1a" + $0.url = "https://example.dsyd1a.production.livekit.cloud" + $0.distance = 7_823_582 + }].map { $0.toLKType() }.compactMap { $0 } + + let providedUrl = URL(string: "https://example.livekit.cloud")! + + // See if request should be initiated. + XCTAssert(room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") + + // Set test data. + room._state.mutate { + $0.providedUrl = providedUrl + $0.token = "" + } + + room._regionState.mutate { + $0.url = providedUrl + $0.all = testRegionSettings + $0.remaining = testRegionSettings + $0.lastRequested = Date() + } + + // See if request is not required to be initiated. + XCTAssert(!room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") + + let attempt1 = try await room.regionManagerResolveBest() + print("Next url: \(String(describing: attempt1))") + XCTAssert(attempt1.url == testRegionSettings[0].url) + room.regionManager(addFailedRegion: attempt1) + + let attempt2 = try await room.regionManagerResolveBest() + print("Next url: \(String(describing: attempt2))") + XCTAssert(attempt2.url == testRegionSettings[1].url) + room.regionManager(addFailedRegion: attempt2) + + let attempt3 = try await room.regionManagerResolveBest() + print("Next url: \(String(describing: attempt3))") + XCTAssert(attempt3.url == testRegionSettings[2].url) + room.regionManager(addFailedRegion: attempt3) + + // No more regions + let attempt4 = try? await room.regionManagerResolveBest() + XCTAssert(attempt4 == nil) + + // Simulate cache time elapse. + room._regionState.mutate { + // Roll back time. + $0.lastRequested = Date().addingTimeInterval(-Room.regionManagerCacheInterval) + } + + // After cache time elapsed, should require to request region settings again. + XCTAssert(room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") + } +} diff --git a/Tests/LiveKitTests/Support/Utils.swift b/Tests/LiveKitTests/Support/Utils.swift new file mode 100644 index 000000000..b25cffbdc --- /dev/null +++ b/Tests/LiveKitTests/Support/Utils.swift @@ -0,0 +1,22 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +func asyncSleep(for duration: TimeInterval) async { + let nanoseconds = UInt64(duration * Double(NSEC_PER_SEC)) + try? await Task.sleep(nanoseconds: nanoseconds) +}