Skip to content

Commit

Permalink
fix race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
nerzh committed Mar 27, 2020
1 parent 629fc14 commit cbc49d1
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 109 deletions.
4 changes: 2 additions & 2 deletions ActionCableSwift.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Pod::Spec.new do |s|
s.name = 'ActionCableSwift'
s.module_name = 'ActionCableSwift'
s.version = '0.2.1'
s.version = '0.3.0'
s.summary = '🏰 Action Cable Swift is a client library being released for Action Cable Rails 5 which makes it easy to add real-time features to your app.'

s.swift_version = '5.1'
Expand All @@ -32,4 +32,4 @@ Pod::Spec.new do |s|
s.source_files = 'Sources/**/*'
s.frameworks = 'Foundation'
s.dependency 'SwiftExtensionsPack', '~> 0.2.9'
end
end
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ let package = Package(
.target(
name: "ActionCableSwift",
dependencies: [
"SwiftExtensionsPack"
.product(name: "SwiftExtensionsPack", package: "SwiftExtensionsPack")
]),
.testTarget(
name: "ActionCableSwiftTests",
Expand Down
90 changes: 41 additions & 49 deletions Sources/ActionCableSwift/ACChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class ACChannel {
self.options = options ?? ACChannelOptions()
setupAutoSubscribe()
setupOnTextCallbacks()
setupOnCancelledCallbacks()
setupOnDisconnectCallbacks()
}

public func subscribe() throws {
Expand Down Expand Up @@ -108,87 +110,77 @@ public class ACChannel {
private func setupAutoSubscribe() {
if options.autoSubscribe {
if client?.isConnected ?? false { try? subscribe() }
client?.addOnConnected { [weak self] (headers) in
self.client?.addOnConnected { [weak self] (headers) in
guard let self = self else { return }
try? self.subscribe()
self.channelSerialQueue.async {
try? self.subscribe()
}
}
}
}

private func setupOnDisconnectCallbacks() {
client?.addOnDisconnected { [weak self] (reason) in
client?.addOnDisconnected { [weak self] (reason) in
guard let self = self else { return }
self.isSubscribed = false
self.channelSerialQueue.async {
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe)
}
}
}

private func setupOnCancelledCallbacks() {
client?.addOnCancelled { [weak self] in
guard let self = self else { return }
self.isSubscribed = false
self.channelSerialQueue.async {
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe)
}
}
}

private func setupOnTextCallbacks() {
client?.addOnText { [weak self] (text) in
guard let self = self else { return }
let message = ACSerializer.responseFrom(stringData: text)
switch message.type {
case .confirmSubscription:
self.isSubscribed = true
self.executeCallback(callbacks: self.onSubscribe, message: message)
self.flushBuffer()
case .rejectSubscription:
self.isSubscribed = false
self.executeCallback(callbacks: self.onRejectSubscription, message: message)
case .cancelSubscription:
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe, message: message)
case .message:
self.executeCallback(callbacks: self.onMessage, message: message)
case .ping:
self.executeCallback(callbacks: self.onPing)
default: break
self.channelSerialQueue.async {
let message = ACSerializer.responseFrom(stringData: text)
switch message.type {
case .confirmSubscription:
self.isSubscribed = true
self.executeCallback(callbacks: self.onSubscribe, message: message)
self.flushBuffer()
case .rejectSubscription:
self.isSubscribed = false
self.executeCallback(callbacks: self.onRejectSubscription, message: message)
case .cancelSubscription:
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe, message: message)
case .message:
self.executeCallback(callbacks: self.onMessage, message: message)
case .ping:
self.client?.pingRoundWatcher.ping()
self.executeCallback(callbacks: self.onPing)
default: break
}
}
}

client?.addOnDisconnected { [weak self] (reason) in
guard let self = self else { return }
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe)
}

client?.addOnCancelled { [weak self] in
guard let self = self else { return }
self.isSubscribed = false
self.executeCallback(callbacks: self.onUnsubscribe)
}
}

private func executeCallback(callbacks: [ACResponseCallback], message: ACMessage) {
channelSerialQueue.async { [weak self] in
guard let self = self else { return }
for closure in callbacks {
closure(self, message)
}
for closure in callbacks {
closure(self, message)
}
}

private func executeCallback(callbacks: [ACResponseCallbackWithOptionalMessage]) {
channelSerialQueue.async { [weak self] in
guard let self = self else { return }
for closure in callbacks {
closure(self, nil)
}
for closure in callbacks {
closure(self, nil)
}
}

private func flushBuffer() {
channelSerialQueue.async { [weak self] in
guard let self = self else { return }
while let closure = self.actionsBuffer.popLast() {
closure()
}
while let closure = self.actionsBuffer.popLast() {
closure()
}
}
}
85 changes: 60 additions & 25 deletions Sources/ActionCableSwift/ActionCableSwift.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ public final class ACClient {
public var headers: [String: String]?
public let pingRoundWatcher = PingRoundWatcher()
public var options: ACClientOptions

private var channels: [String: ACChannel] = [:]
private let clientConcurrentQueue = DispatchQueue(label: "com.ACClient.Conccurent", attributes: .concurrent)
private let isConnectedLock: NSLock = .init()
private let sendLock: NSLock = .init()

/// callbacks
private var onConnected: [((_ headers: [String: String]?) -> Void)] = []
Expand All @@ -21,6 +23,20 @@ public final class ACClient {
private var onPing: [(() -> Void)] = []
private var onPong: [(() -> Void)] = []

public init(ws: ACWebSocketProtocol,
headers: [String: String]? = nil,
options: ACClientOptions? = nil
) {
self.ws = ws
self.headers = headers
self.options = options ?? ACClientOptions()
setupWSCallbacks()
pingRoundWatcher.client = self
if self.options.reconnect {
self.pingRoundWatcher.start()
}
}

public func addOnConnected(_ handler: @escaping (_ headers: [String: String]?) -> Void) {
onConnected.append(handler)
}
Expand Down Expand Up @@ -49,42 +65,36 @@ public final class ACClient {
onPong.append(handler)
}

public init(ws: ACWebSocketProtocol,
headers: [String: String]? = nil,
options: ACClientOptions? = nil
) {
self.ws = ws
self.headers = headers
self.options = options ?? ACClientOptions()
setupWSCallbacks()
}

subscript(name: String) -> ACChannel? {
channels[name]
}

public func connect() {
isConnectedLock.lock()
ws.connect(headers: headers)
if self.options.reconnect {
self.pingRoundWatcher.client = self
self.pingRoundWatcher.start()
}
isConnectedLock.unlock()
}

public func disconnect() {
isConnectedLock.lock()
ws.disconnect()
isConnectedLock.unlock()
}

public func send(text: String, _ completion: (() -> Void)? = nil) {
sendLock.lock()
ws.send(text: text) {
completion?()
}
sendLock.unlock()
}

public func send(data: Data, _ completion: (() -> Void)? = nil) {
sendLock.lock()
ws.send(data: data) {
completion?()
}
sendLock.unlock()
}

@discardableResult
Expand All @@ -96,64 +106,89 @@ public final class ACClient {
private func setupWSCallbacks() {
ws.onConnected = { [weak self] headers in
guard let self = self else { return }
self.isConnected = true
self.setIsConnected(to: true)
self.clientConcurrentQueue.async { [headers] in
for closure in self.onConnected {
let closures = self.onConnected
for closure in closures {
closure(headers)
}
}
}
ws.onDisconnected = { [weak self] reason in
guard let self = self else { return }
self.isConnected = false
self.setIsConnected(to: false)
self.clientConcurrentQueue.async { [reason] in
for closure in self.onDisconnected {
let closures = self.onDisconnected
for closure in closures {
closure(reason)
}
}
}
ws.onCancelled = { [weak self] in
guard let self = self else { return }
self.isConnected = false
self.setIsConnected(to: false)
self.clientConcurrentQueue.async {
for closure in self.onCancelled {
let closures = self.onCancelled
for closure in closures {
closure()
}
}
}
ws.onText = { [weak self] text in
guard let self = self else { return }
self.clientConcurrentQueue.async { [text] in
for closure in self.onText {
let closures = self.onText
for closure in closures {
closure(text)
}
}
}
ws.onBinary = { [weak self] data in
guard let self = self else { return }
self.clientConcurrentQueue.async { [data] in
for closure in self.onBinary {
let closures = self.onBinary
for closure in closures {
closure(data)
}
}
}
ws.onPing = { [weak self] in
guard let self = self else { return }
self.clientConcurrentQueue.async {
for closure in self.onPing {
let closures = self.onPing
for closure in closures {
closure()
}
}
}
ws.onPong = { [weak self] in
guard let self = self else { return }
let closures = self.onPong
self.clientConcurrentQueue.async {
for closure in self.onPong {
for closure in closures {
closure()
}
}
}
}

func setIsConnected(to: Bool) {
isConnectedLock.lock()
isConnected = to
isConnectedLock.unlock()
}

func getIsConnected() -> Bool {
isConnectedLock.lock()
let result = isConnected
isConnectedLock.unlock()

return result
}

deinit {
pingRoundWatcher.setFinish(to: true)
}
}


Expand Down
Loading

0 comments on commit cbc49d1

Please sign in to comment.