Skip to content

Commit ae2835e

Browse files
johnxnguyennetbe
andauthored
feat: introduce WorkAgent - WPB-20123 (#3800)
Co-authored-by: François Benaiteau <[email protected]>
1 parent 768150b commit ae2835e

File tree

10 files changed

+672
-0
lines changed

10 files changed

+672
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
import Foundation
20+
import WireLogging
21+
22+
actor WorkAgent {
23+
24+
// MARK: - Properties
25+
26+
/// Whether the agent is currently dequeuing work tickets.
27+
28+
var isRunning: Bool {
29+
task != nil
30+
}
31+
32+
/// Whether dequeuing should begin after ticket submission.
33+
34+
private var shouldAutoStart = false
35+
func setAutoStartEnabled(_ enabled: Bool) async {
36+
shouldAutoStart = enabled
37+
}
38+
39+
// MARK: - Life cycle
40+
41+
private var task: Task<Void, Never>?
42+
private let scheduler: any WorkItemScheduler
43+
private let nonReentrantTaskManager = NonReentrantTaskManager()
44+
45+
init(scheduler: any WorkItemScheduler) {
46+
self.scheduler = scheduler
47+
}
48+
49+
// MARK: - Operation
50+
51+
func submitItem(_ item: any WorkItem) {
52+
WireLogger.workAgent.debug(
53+
"item submitted: \(item)",
54+
attributes: .init(item)
55+
)
56+
57+
Task {
58+
await scheduler.enqueueItem(item)
59+
60+
if shouldAutoStart, !isRunning {
61+
await start()
62+
}
63+
}
64+
}
65+
66+
func start() async {
67+
try? await nonReentrantTaskManager.performIfNeeded { [weak self] in
68+
await self?.internalStart()
69+
}
70+
}
71+
72+
private func internalStart() async {
73+
guard task == nil else {
74+
return
75+
}
76+
77+
WireLogger.workAgent.info(
78+
"starting",
79+
attributes: .safePublic
80+
)
81+
82+
task = Task {
83+
let startTime = Date()
84+
var completedItems = 0
85+
86+
while let item = await scheduler.dequeueNextItem() {
87+
do {
88+
try Task.checkCancellation()
89+
} catch {
90+
WireLogger.workAgent.debug(
91+
"task has been cancelled, aborting...",
92+
attributes: .init(item)
93+
)
94+
break
95+
}
96+
97+
WireLogger.workAgent.debug(
98+
"dequeued item",
99+
attributes: .init(item)
100+
)
101+
102+
do {
103+
try await item.start()
104+
completedItems += 1
105+
WireLogger.workAgent.debug(
106+
"item complete",
107+
attributes: .init(item)
108+
)
109+
} catch {
110+
WireLogger.workAgent.error(
111+
"item failed, dropping",
112+
attributes: .init(item)
113+
)
114+
continue
115+
}
116+
}
117+
118+
let duration = Date().timeIntervalSince(startTime)
119+
let durationString = String(format: "%.2f seconds", duration)
120+
WireLogger.workAgent.info(
121+
"completed \(completedItems) tickets in \(durationString)",
122+
attributes: .safePublic
123+
)
124+
}
125+
126+
await task?.value
127+
task = nil
128+
}
129+
130+
func stop() {
131+
WireLogger.workAgent.info(
132+
"stopping",
133+
attributes: .safePublic
134+
)
135+
task?.cancel()
136+
task = nil
137+
}
138+
139+
}
140+
141+
private extension LogAttributes {
142+
143+
init(_ item: any WorkItem) {
144+
self = [
145+
.public: true,
146+
.workItemID: "\(item.id)"
147+
]
148+
}
149+
150+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
import Foundation
20+
21+
/// An item of work that can be performed.
22+
///
23+
/// A `Workitem` is a pending operation that can be performed when instructed
24+
/// to. Instances are created and scheduled via the `WorkAgent`.
25+
26+
protocol WorkItem: Sendable {
27+
28+
/// A unique identifier for this item.
29+
30+
var id: UUID { get }
31+
32+
/// The urgency or importance of this ticket.
33+
34+
var priority: WorkItemPriority { get }
35+
36+
/// Start the work for this item.
37+
38+
func start() async throws
39+
40+
/// Cancel the work item.
41+
42+
func cancel() async
43+
44+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
import Foundation
20+
21+
/// Describes the urgency of a piece of work.
22+
///
23+
/// Use this type to communicate how quickly a task should be addressed relative
24+
/// to others. Higher priorities indicate greater urgency or impact, and should
25+
/// generally be reserved for exceptional situations. When in doubt, prefer a
26+
/// lower priority to avoid overload and allow important issues to be peformed
27+
/// quickly.
28+
29+
enum WorkItemPriority: Sendable {
30+
31+
/// Low priority.
32+
///
33+
/// Use for **non-essential or deferrable background work**.
34+
///
35+
/// Examples include cache cleanup, telemetry uploads, or periodic
36+
/// maintenance. These tasks can be delayed or suspended without
37+
/// noticeable impact on the user experience.
38+
39+
case low
40+
41+
/// Medium priority.
42+
///
43+
/// The **default priority** for most background work.
44+
///
45+
/// Suitable for tasks that should run reliably but not urgently,
46+
/// such as data synchronization, scheduled refreshes, or prefetching.
47+
/// The scheduler may delay these tasks when the app is inactive or
48+
/// system resources are constrained.
49+
50+
case medium
51+
52+
/// High priority.
53+
///
54+
/// For **user-visible or time-sensitive work** that should complete
55+
/// promptly, but does not require immediate execution.
56+
///
57+
/// Use this for operations that support active user actions, like
58+
/// uploading a photo or sending a message.
59+
60+
case high
61+
62+
/// Blocker priority.
63+
///
64+
/// Indicates a task that **must complete before other dependent work
65+
/// can proceed**.
66+
///
67+
/// Typically used for setup or critical dependency tasks—such as
68+
/// refreshing authentication tokens, migrating local storage. The
69+
/// scheduler should prioritize these before normal background jobs.
70+
71+
case blocker
72+
73+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
import Foundation
20+
21+
/// Defines the strategy to recover from a failed or unprocessable work item.
22+
///
23+
/// When a `WorkItem` cannot be executed successfully, the worker may throw
24+
/// a `WorkItemRecoveryStrategy` to indicate what should be done with the
25+
/// failed item. This allows the system to decide whether to retry, drop,
26+
/// or escalate the ticket based on the chosen strategy.
27+
28+
enum WorkItemRecoveryStrategy: Error {
29+
30+
/// The item should be discarded and it will not be retried.
31+
///
32+
/// Use this for tasks that are no longer relevant, redundant, or
33+
/// when retrying would not be meaningful or safe.
34+
35+
case drop
36+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Wire
3+
// Copyright (C) 2025 Wire Swiss GmbH
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with this program. If not, see http://www.gnu.org/licenses/.
17+
//
18+
19+
import Foundation
20+
21+
/// A scheduler that dequeues items in priority order.
22+
23+
actor PriorityOrderWorkItemScheduler: WorkItemScheduler {
24+
25+
private var blockerQueue: [any WorkItem] = []
26+
private var highQueue: [any WorkItem] = []
27+
private var mediumQueue: [any WorkItem] = []
28+
private var lowQueue: [any WorkItem] = []
29+
30+
func enqueueItem(_ item: any WorkItem) async {
31+
switch item.priority {
32+
case .low:
33+
lowQueue.append(item)
34+
case .medium:
35+
mediumQueue.append(item)
36+
case .high:
37+
highQueue.append(item)
38+
case .blocker:
39+
blockerQueue.append(item)
40+
}
41+
}
42+
43+
func dequeueNextItem() async -> (any WorkItem)? {
44+
if !blockerQueue.isEmpty {
45+
blockerQueue.removeFirst()
46+
} else if !highQueue.isEmpty {
47+
highQueue.removeFirst()
48+
} else if !mediumQueue.isEmpty {
49+
mediumQueue.removeFirst()
50+
} else if !lowQueue.isEmpty {
51+
lowQueue.removeFirst()
52+
} else {
53+
nil
54+
}
55+
}
56+
57+
}

0 commit comments

Comments
 (0)