-
Notifications
You must be signed in to change notification settings - Fork 24
feat: introduce WorkAgent - WPB-20123 #3800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
d7a0794
create WorkAgent
johnxnguyen 6e47508
add tests
johnxnguyen d8ed75a
add more tests
johnxnguyen 6b75875
format
johnxnguyen 153e5d5
undo package changes
johnxnguyen b7d5f5f
move to WireDomain
johnxnguyen 63ea16f
improve tests
johnxnguyen 2d14d1d
move logger with other definitions
johnxnguyen 43c0d63
include ticket id in attributes
johnxnguyen bcafcbf
remove critical priority
johnxnguyen 87a55b8
revert: disable critical flows temporarily - WPB-21303 (#3801)
netbe 6270831
make work agent an actor
johnxnguyen 616f81b
check cancellation in loop
johnxnguyen 0eef071
store workers in a dictionary
johnxnguyen d23f887
refactor
johnxnguyen 9e3f3f5
reset unrelated files
johnxnguyen a856182
reset unrelated file
johnxnguyen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
150 changes: 150 additions & 0 deletions
150
WireDomain/Sources/WireDomain/WorkAgent/WorkAgent.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2025 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
| import WireLogging | ||
|
|
||
| actor WorkAgent { | ||
|
|
||
| // MARK: - Properties | ||
|
|
||
| /// Whether the agent is currently dequeuing work tickets. | ||
|
|
||
| var isRunning: Bool { | ||
| task != nil | ||
| } | ||
|
|
||
| /// Whether dequeuing should begin after ticket submission. | ||
|
|
||
| private var shouldAutoStart = false | ||
| func setAutoStartEnabled(_ enabled: Bool) async { | ||
| shouldAutoStart = enabled | ||
| } | ||
|
|
||
| // MARK: - Life cycle | ||
|
|
||
| private var task: Task<Void, Never>? | ||
| private let scheduler: any WorkItemScheduler | ||
| private let nonReentrantTaskManager = NonReentrantTaskManager() | ||
|
|
||
| init(scheduler: any WorkItemScheduler) { | ||
| self.scheduler = scheduler | ||
| } | ||
|
|
||
| // MARK: - Operation | ||
|
|
||
| func submitItem(_ item: any WorkItem) { | ||
| WireLogger.workAgent.debug( | ||
| "item submitted: \(item)", | ||
| attributes: .init(item) | ||
| ) | ||
|
|
||
| Task { | ||
| await scheduler.enqueueItem(item) | ||
|
|
||
| if shouldAutoStart, !isRunning { | ||
| await start() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func start() async { | ||
| try? await nonReentrantTaskManager.performIfNeeded { [weak self] in | ||
| await self?.internalStart() | ||
| } | ||
| } | ||
|
|
||
| private func internalStart() async { | ||
| guard task == nil else { | ||
| return | ||
| } | ||
|
|
||
| WireLogger.workAgent.info( | ||
| "starting", | ||
| attributes: .safePublic | ||
| ) | ||
|
|
||
| task = Task { | ||
| let startTime = Date() | ||
| var completedItems = 0 | ||
|
|
||
| while let item = await scheduler.dequeueNextItem() { | ||
| do { | ||
| try Task.checkCancellation() | ||
| } catch { | ||
| WireLogger.workAgent.debug( | ||
| "task has been cancelled, aborting...", | ||
| attributes: .init(item) | ||
| ) | ||
| break | ||
| } | ||
|
|
||
| WireLogger.workAgent.debug( | ||
| "dequeued item", | ||
| attributes: .init(item) | ||
| ) | ||
|
|
||
| do { | ||
| try await item.start() | ||
| completedItems += 1 | ||
| WireLogger.workAgent.debug( | ||
| "item complete", | ||
| attributes: .init(item) | ||
| ) | ||
| } catch { | ||
| WireLogger.workAgent.error( | ||
| "item failed, dropping", | ||
| attributes: .init(item) | ||
| ) | ||
| continue | ||
johnxnguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| let duration = Date().timeIntervalSince(startTime) | ||
| let durationString = String(format: "%.2f seconds", duration) | ||
| WireLogger.workAgent.info( | ||
| "completed \(completedItems) tickets in \(durationString)", | ||
| attributes: .safePublic | ||
| ) | ||
| } | ||
|
|
||
| await task?.value | ||
| task = nil | ||
| } | ||
|
|
||
| func stop() { | ||
| WireLogger.workAgent.info( | ||
| "stopping", | ||
| attributes: .safePublic | ||
| ) | ||
| task?.cancel() | ||
johnxnguyen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| task = nil | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private extension LogAttributes { | ||
|
|
||
| init(_ item: any WorkItem) { | ||
| self = [ | ||
| .public: true, | ||
| .workItemID: "\(item.id)" | ||
| ] | ||
| } | ||
|
|
||
| } | ||
44 changes: 44 additions & 0 deletions
44
WireDomain/Sources/WireDomain/WorkAgent/WorkItem/WorkItem.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2025 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
|
|
||
| /// An item of work that can be performed. | ||
| /// | ||
| /// A `Workitem` is a pending operation that can be performed when instructed | ||
| /// to. Instances are created and scheduled via the `WorkAgent`. | ||
|
|
||
| protocol WorkItem: Sendable { | ||
|
|
||
| /// A unique identifier for this item. | ||
|
|
||
| var id: UUID { get } | ||
|
|
||
| /// The urgency or importance of this ticket. | ||
|
|
||
| var priority: WorkItemPriority { get } | ||
|
|
||
| /// Start the work for this item. | ||
|
|
||
| func start() async throws | ||
|
|
||
| /// Cancel the work item. | ||
|
|
||
| func cancel() async | ||
|
|
||
| } |
73 changes: 73 additions & 0 deletions
73
WireDomain/Sources/WireDomain/WorkAgent/WorkItem/WorkItemPriority.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2025 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
|
|
||
| /// Describes the urgency of a piece of work. | ||
| /// | ||
| /// Use this type to communicate how quickly a task should be addressed relative | ||
| /// to others. Higher priorities indicate greater urgency or impact, and should | ||
| /// generally be reserved for exceptional situations. When in doubt, prefer a | ||
| /// lower priority to avoid overload and allow important issues to be peformed | ||
| /// quickly. | ||
|
|
||
| enum WorkItemPriority: Sendable { | ||
|
|
||
| /// Low priority. | ||
| /// | ||
| /// Use for **non-essential or deferrable background work**. | ||
| /// | ||
| /// Examples include cache cleanup, telemetry uploads, or periodic | ||
| /// maintenance. These tasks can be delayed or suspended without | ||
| /// noticeable impact on the user experience. | ||
|
|
||
| case low | ||
|
|
||
| /// Medium priority. | ||
| /// | ||
| /// The **default priority** for most background work. | ||
| /// | ||
| /// Suitable for tasks that should run reliably but not urgently, | ||
| /// such as data synchronization, scheduled refreshes, or prefetching. | ||
| /// The scheduler may delay these tasks when the app is inactive or | ||
| /// system resources are constrained. | ||
|
|
||
| case medium | ||
|
|
||
| /// High priority. | ||
| /// | ||
| /// For **user-visible or time-sensitive work** that should complete | ||
| /// promptly, but does not require immediate execution. | ||
| /// | ||
| /// Use this for operations that support active user actions, like | ||
| /// uploading a photo or sending a message. | ||
|
|
||
| case high | ||
|
|
||
| /// Blocker priority. | ||
| /// | ||
| /// Indicates a task that **must complete before other dependent work | ||
| /// can proceed**. | ||
| /// | ||
| /// Typically used for setup or critical dependency tasks—such as | ||
| /// refreshing authentication tokens, migrating local storage. The | ||
| /// scheduler should prioritize these before normal background jobs. | ||
|
|
||
| case blocker | ||
|
|
||
| } |
36 changes: 36 additions & 0 deletions
36
WireDomain/Sources/WireDomain/WorkAgent/WorkItem/WorkItemRecoveryStrategy.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2025 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
|
|
||
| /// Defines the strategy to recover from a failed or unprocessable work item. | ||
| /// | ||
| /// When a `WorkItem` cannot be executed successfully, the worker may throw | ||
| /// a `WorkItemRecoveryStrategy` to indicate what should be done with the | ||
| /// failed item. This allows the system to decide whether to retry, drop, | ||
| /// or escalate the ticket based on the chosen strategy. | ||
|
|
||
| enum WorkItemRecoveryStrategy: Error { | ||
|
|
||
| /// The item should be discarded and it will not be retried. | ||
| /// | ||
| /// Use this for tasks that are no longer relevant, redundant, or | ||
| /// when retrying would not be meaningful or safe. | ||
|
|
||
| case drop | ||
| } |
57 changes: 57 additions & 0 deletions
57
...omain/Sources/WireDomain/WorkAgent/WorkItemScheduler/PriorityOrderWorkItemScheduler.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2025 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
|
|
||
| /// A scheduler that dequeues items in priority order. | ||
|
|
||
| actor PriorityOrderWorkItemScheduler: WorkItemScheduler { | ||
|
|
||
| private var blockerQueue: [any WorkItem] = [] | ||
| private var highQueue: [any WorkItem] = [] | ||
| private var mediumQueue: [any WorkItem] = [] | ||
| private var lowQueue: [any WorkItem] = [] | ||
|
|
||
| func enqueueItem(_ item: any WorkItem) async { | ||
| switch item.priority { | ||
| case .low: | ||
| lowQueue.append(item) | ||
| case .medium: | ||
| mediumQueue.append(item) | ||
| case .high: | ||
| highQueue.append(item) | ||
| case .blocker: | ||
| blockerQueue.append(item) | ||
| } | ||
| } | ||
|
|
||
| func dequeueNextItem() async -> (any WorkItem)? { | ||
| if !blockerQueue.isEmpty { | ||
| blockerQueue.removeFirst() | ||
| } else if !highQueue.isEmpty { | ||
| highQueue.removeFirst() | ||
| } else if !mediumQueue.isEmpty { | ||
| mediumQueue.removeFirst() | ||
| } else if !lowQueue.isEmpty { | ||
| lowQueue.removeFirst() | ||
| } else { | ||
| nil | ||
| } | ||
| } | ||
|
|
||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.