Skip to content

Commit d95d9ff

Browse files
committed
wip TaskScheduler
1 parent c66b84e commit d95d9ff

File tree

11 files changed

+298
-39
lines changed

11 files changed

+298
-39
lines changed

WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,16 @@ public final class ClientSessionComponent {
110110

111111
// MARK: - Monitor
112112

113-
public lazy var conversationMonitor: ConversationsMonitorProtocol = {
114-
ConversationsMonitor(repository: conversationRepository, context: syncContext)
113+
public lazy var conversationMonitor: ConversationUpdatesGeneratorProtocol = {
114+
ConversationUpdatesGenerator(repository: conversationRepository, context: syncContext, onConversationUpdated: { [weak self] task in
115+
116+
self?.taskScheduler.enqueue(work: task)
117+
})
118+
}()
119+
120+
public lazy var taskScheduler: TaskScheduler = {
121+
let worker = ConversationUpdatesWorker(repository: conversationRepository)
122+
return TaskScheduler(workers: [worker])
115123
}()
116124

117125
// MARK: - Network API clients

WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationEventProcessor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct ConversationEventProcessor: ConversationEventProcessorProtocol {
3939
let mlsResetEventProcessor: any ConversationMLSResetEventProcessorProtocol
4040

4141
func processEvent(_ event: ConversationEvent) async throws {
42-
WireLogger.eventProcessing.debug("✅ process event \(String(describing: event))", attributes: [.conversationId: event.conversationID.debugDescription])
42+
WireLogger.eventProcessing.debug("✅ process event \(event.name)", attributes: [.conversationId: event.conversationID.debugDescription])
4343

4444
switch event {
4545
case let .accessUpdate(event):

WireDomain/Sources/WireDomain/Synchronization/ConversationsMonitor.swift renamed to WireDomain/Sources/WireDomain/Synchronization/Scheduler/ConversationUpdates/ConversationUpdatesGenerator.swift

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,24 @@
1818

1919
import WireDataModel
2020
import WireLogging
21-
public protocol ConversationsMonitorProtocol {
21+
22+
public protocol ConversationUpdatesGeneratorProtocol {
2223
func start() async
2324
func stop()
24-
2525
}
26-
public final class ConversationsMonitor: NSObject, ConversationsMonitorProtocol {
26+
27+
public final class ConversationUpdatesGenerator: NSObject, ConversationUpdatesGeneratorProtocol {
2728

2829
private let context: NSManagedObjectContext
2930
private let fetchedResultsController: NSFetchedResultsController<ZMConversation>
30-
private let repository: ConversationRepositoryProtocol
3131
/// Prevent duplicate pulls per conversation while one is in flight.
3232
private var inFlightTracker = ConversationIdsTracker()
33-
33+
private var onConversationUpdated: (ScheduledTaskType) -> Void
34+
35+
3436
public init(repository: ConversationRepositoryProtocol,
35-
context: NSManagedObjectContext) {
37+
context: NSManagedObjectContext,
38+
onConversationUpdated: @escaping (ScheduledTaskType) -> Void) {
3639
let request = NSFetchRequest<ZMConversation>(entityName: ZMConversation.entityName())
3740
request.predicate = ZMConversation.predicateForNeedingToBeUpdatedFromBackend()
3841
request.sortDescriptors = [NSSortDescriptor(key: ZMConversationLastServerTimeStampKey, ascending: true)]
@@ -42,8 +45,8 @@ public final class ConversationsMonitor: NSObject, ConversationsMonitorProtocol
4245
sectionNameKeyPath: nil,
4346
cacheName: nil
4447
)
45-
self.repository = repository
4648
self.context = context
49+
self.onConversationUpdated = onConversationUpdated
4750
super.init()
4851
}
4952

@@ -57,39 +60,22 @@ public final class ConversationsMonitor: NSObject, ConversationsMonitorProtocol
5760
}
5861

5962
let conversations = fetchedResultsController.fetchedObjects ?? []
60-
6163
for conversation in conversations {
62-
await triggerPullIfNeeded(for: conversation)
64+
await context.perform {
65+
if let id = conversation.qualifiedID {
66+
self.onConversationUpdated(.conversationNeedToBeUpdated(id: id.toAPIModel()))
67+
}
68+
}
6369
}
6470
}
6571

6672
public func stop() {
6773
fetchedResultsController.delegate = nil
6874
}
69-
70-
// MARK: - Private
71-
72-
private func triggerPullIfNeeded(for conversation: ZMConversation) async {
73-
74-
guard let qualifiedID = await context.perform({ conversation.qualifiedID }), await !inFlightTracker.contains(qualifiedID) else {
75-
return
76-
}
77-
await inFlightTracker.add(qualifiedID)
78-
79-
do {
80-
WireLogger.conversation.debug("Pulling conversation from backend", attributes: [.conversationId: qualifiedID.uuid.uuidString])
81-
try await repository.pullConversation(id: qualifiedID.uuid, domain: qualifiedID.domain)
82-
await inFlightTracker.remove(qualifiedID)
83-
} catch {
84-
await inFlightTracker.remove(qualifiedID)
85-
WireLogger.conversation.error("error updating conversation from the backend: \(String(describing: error))", attributes: [.conversationId: qualifiedID.uuid.uuidString])
86-
}
87-
88-
}
8975
}
9076

9177
// MARK: - NSFetchedResultsControllerDelegate
92-
extension ConversationsMonitor: NSFetchedResultsControllerDelegate {
78+
extension ConversationUpdatesGenerator: NSFetchedResultsControllerDelegate {
9379

9480
public func controller(
9581
_ controller: NSFetchedResultsController<NSFetchRequestResult>,
@@ -103,10 +89,10 @@ extension ConversationsMonitor: NSFetchedResultsControllerDelegate {
10389
switch type {
10490
case .insert:
10591
// Insert == flag flipped to true (matches predicate now)
106-
Task {
107-
await triggerPullIfNeeded(for: conversation)
92+
if let qualifiedID = conversation.qualifiedID {
93+
self.onConversationUpdated(.conversationNeedToBeUpdated(id: qualifiedID.toAPIModel()))
10894
}
109-
95+
11096
case .update:
11197
// Already in the "true" set; we only act on the transition handled by `.insert`.
11298
break
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
import WireLogging
19+
20+
class ConversationUpdatesWorker: Worker {
21+
enum Failure: Error {
22+
case unsupportedTask
23+
}
24+
private let repository: ConversationRepositoryProtocol
25+
26+
public init(repository: ConversationRepositoryProtocol) {
27+
self.repository = repository
28+
}
29+
30+
func canPerform(task: ScheduledTaskType) -> Bool {
31+
if case .conversationNeedToBeUpdated = task {
32+
return true
33+
}
34+
return false
35+
}
36+
37+
func perform(task: ScheduledTaskType) async throws {
38+
guard case let .conversationNeedToBeUpdated(qualifiedID) = task else {
39+
throw Failure.unsupportedTask
40+
}
41+
42+
do {
43+
try await repository.pullConversation(id: qualifiedID.id, domain: qualifiedID.domain)
44+
} catch {
45+
// giving more context to the error
46+
WireLogger.conversation.error("error updating conversation from the backend: \(String(describing: error))", attributes: [.conversationId: qualifiedID.id.uuidString])
47+
throw error
48+
}
49+
}
50+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
public struct ScheduledTask: Equatable, Hashable {
20+
21+
public enum Priority: Int {
22+
case low
23+
case medium
24+
case high
25+
}
26+
27+
// removed the work from the type in order to use Set of tasks
28+
var type: ScheduledTaskType = .other
29+
var priority: Priority = .medium
30+
31+
public static func == (lhs: ScheduledTask, rhs: ScheduledTask) -> Bool {
32+
lhs.type == rhs.type && lhs.priority == rhs.priority
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
import WireFoundation
19+
20+
public enum ScheduledTaskType: Equatable, Hashable {
21+
case conversationNeedToBeUpdated(id: QualifiedID)
22+
case other
23+
24+
public static func == (lhs: ScheduledTaskType, rhs: ScheduledTaskType) -> Bool {
25+
switch (lhs, rhs) {
26+
case (.conversationNeedToBeUpdated(let lhsId), .conversationNeedToBeUpdated(let rhsId)):
27+
return lhsId == rhsId
28+
case (.other, .other):
29+
return true
30+
default:
31+
return false
32+
}
33+
}
34+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
import WireLogging
19+
20+
/// Schedule tasks to be performed
21+
/// Ensures that a task is not performed twice
22+
public actor TaskScheduler {
23+
enum State {
24+
case idle
25+
case running(Task<Void, any Error>)
26+
}
27+
28+
private var tasks: Set<ScheduledTask> = .init()
29+
private var state: State = .idle
30+
private var workers = [Worker]()
31+
32+
public init(workers: [Worker]) {
33+
self.workers = workers
34+
}
35+
36+
public func start() {
37+
let runningTask = Task<Void, any Error> {
38+
while(true) {
39+
if let task = sortedTasks().first {
40+
await execute(task)
41+
}
42+
try await Task.sleep(for: .seconds(0.5))
43+
}
44+
}
45+
state = .running(runningTask)
46+
}
47+
48+
public func stop() {
49+
if case .running(let task) = state {
50+
task.cancel()
51+
}
52+
state = .idle
53+
}
54+
55+
public func enqueue(work: ScheduledTaskType, priority: ScheduledTask.Priority = .medium) {
56+
let task = ScheduledTask(type: work, priority: priority)
57+
tasks.insert(task)
58+
}
59+
60+
private func execute(_ task: ScheduledTask) async {
61+
for worker in workers where worker.canPerform(task: task.type) {
62+
do {
63+
try await worker.perform(task: task.type)
64+
tasks.remove(task)
65+
} catch {
66+
WireLogger.system.error("task \(task.type) failed: \(error)")
67+
}
68+
}
69+
}
70+
71+
private func removeTask(_ task: ScheduledTask) {
72+
tasks.remove(task)
73+
}
74+
75+
private func sortedTasks() -> [ScheduledTask] {
76+
Array(tasks).sorted { lhs, rhs in
77+
lhs.priority.rawValue > rhs.priority.rawValue
78+
}
79+
}
80+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
20+
public protocol Worker {
21+
func canPerform(task: ScheduledTaskType) -> Bool
22+
func perform(task: ScheduledTaskType) async throws
23+
}

0 commit comments

Comments
 (0)