Skip to content

Commit

Permalink
Merge pull request #5 from Soumyajit-Saha/feature/remove_subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
NeedleInAJayStack authored Jul 25, 2023
2 parents 237d290 + 07c3d45 commit 8c2f5ad
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions Sources/GraphQLTransportWS-DataSync/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ public class Server<InitPayload: Equatable & Codable> {

var initialized = false

let disposeBag = DisposeBag()
var disposeBags = [String: DisposeBag]()
let encoder = GraphQLJSONEncoder()
let decoder = JSONDecoder()

enum DuplicateKeyError: Error {
case duplicateKey
}

/// Create a new server
///
Expand Down Expand Up @@ -87,6 +91,7 @@ public class Server<InitPayload: Equatable & Codable> {
self.error(.invalidRequestFormat(messageType: .complete))
return
}
self.disposeBags.removeValue(forKey: completeRequest.id)
self.onOperationComplete(completeRequest.id)
case .unknown:
self.error(.invalidType())
Expand Down Expand Up @@ -207,6 +212,12 @@ public class Server<InitPayload: Equatable & Codable> {
// swiftlint:disable:next force_cast
let stream = streamOpt as! ObservableSubscriptionEventStream
let observable = stream.observable
guard disposeBags[id] == nil else {
self.sendError(DuplicateKeyError.duplicateKey, id: id)
return
}
let disposeBag = DisposeBag()
disposeBags[id] = disposeBag

observable.subscribe(
onNext: { [weak self] resultFuture in
Expand All @@ -226,7 +237,7 @@ public class Server<InitPayload: Equatable & Codable> {
guard let self = self else { return }
self.sendComplete(id: id)
}
).disposed(by: self.disposeBag)
).disposed(by: disposeBag)
}
subscribeFuture.whenFailure { error in
self.sendError(error, id: id)
Expand Down Expand Up @@ -274,6 +285,7 @@ public class Server<InitPayload: Equatable & Codable> {
id: id
).toJSON(encoder)
)
self.disposeBags.removeValue(forKey: id)
self.onOperationComplete(id)
}

Expand Down

0 comments on commit 8c2f5ad

Please sign in to comment.