From 07c3d45d55debaef4cfafc58d13ba9e67ed5bd86 Mon Sep 17 00:00:00 2001 From: Soumyajit Saha Date: Mon, 24 Jul 2023 14:41:42 -0600 Subject: [PATCH] Fix: Added removal of subscriptions on complete Previously there was only a single disposal for all the subsriptions. So, all the subscriptions were getting cancelled when the disposal bag was going out of scope and not when a completion request was received on a particular subscription. So, I added a dictionary of disposal bags with subscription 'id' as key and the disposal bag for that subscription as value. On subscription, a new disposal bag corresponding to the subscription is created and inserted in the dictionary. On completion, the disposal bag is removed from the dictionary. Now, the test for consecutive subscription of same operation is passing. Link: https://app.clickup.com/t/866amdey9 --- Sources/GraphQLTransportWS-DataSync/Server.swift | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Sources/GraphQLTransportWS-DataSync/Server.swift b/Sources/GraphQLTransportWS-DataSync/Server.swift index fbe60e7..2cc960a 100644 --- a/Sources/GraphQLTransportWS-DataSync/Server.swift +++ b/Sources/GraphQLTransportWS-DataSync/Server.swift @@ -25,9 +25,13 @@ public class Server { var initialized = false - let disposeBag = DisposeBag() + var disposeBags = [String: DisposeBag]() let encoder = GraphQLJSONEncoder() let decoder = JSONDecoder() + + enum DuplicateKeyError: Error { + case duplicateKey + } /// Create a new server /// @@ -87,6 +91,7 @@ public class Server { self.error(.invalidRequestFormat(messageType: .complete)) return } + self.disposeBags.removeValue(forKey: completeRequest.id) self.onOperationComplete(completeRequest.id) case .unknown: self.error(.invalidType()) @@ -207,6 +212,12 @@ public class Server { // swiftlint:disable:next force_cast let stream = streamOpt as! ObservableSubscriptionEventStream let observable = stream.observable + guard disposeBags[id] == nil else { + self.sendError(DuplicateKeyError.duplicateKey, id: id) + return + } + let disposeBag = DisposeBag() + disposeBags[id] = disposeBag observable.subscribe( onNext: { [weak self] resultFuture in @@ -226,7 +237,7 @@ public class Server { guard let self = self else { return } self.sendComplete(id: id) } - ).disposed(by: self.disposeBag) + ).disposed(by: disposeBag) } subscribeFuture.whenFailure { error in self.sendError(error, id: id) @@ -274,6 +285,7 @@ public class Server { id: id ).toJSON(encoder) ) + self.disposeBags.removeValue(forKey: id) self.onOperationComplete(id) }