Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions WireDomain/Sources/WireDomain/WorkAgent/WorkAgent.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation
import WireLogging

actor WorkAgent {

// MARK: - Properties

/// Whether the agent is currently dequeuing work tickets.

var isRunning: Bool {
task != nil
}

/// Whether dequeuing should begin after ticket submission.

private var shouldAutoStart = false
func setAutoStartEnabled(_ enabled: Bool) async {
shouldAutoStart = enabled
}

// MARK: - Life cycle

private var task: Task<Void, Never>?
private let scheduler: any WorkItemScheduler
private let nonReentrantTaskManager = NonReentrantTaskManager()

init(scheduler: any WorkItemScheduler) {
self.scheduler = scheduler
}

// MARK: - Operation

func submitItem(_ item: any WorkItem) {
WireLogger.workAgent.debug(
"item submitted: \(item)",
attributes: .init(item)
)

Task {
await scheduler.enqueueItem(item)

if shouldAutoStart, !isRunning {
await start()
}
}
}

func start() async {
try? await nonReentrantTaskManager.performIfNeeded { [weak self] in
await self?.internalStart()
}
}

private func internalStart() async {
guard task == nil else {
return
}

WireLogger.workAgent.info(
"starting",
attributes: .safePublic
)

task = Task {
let startTime = Date()
var completedItems = 0

while let item = await scheduler.dequeueNextItem() {
do {
try Task.checkCancellation()
} catch {
WireLogger.workAgent.debug(
"task has been cancelled, aborting...",
attributes: .init(item)
)
break
}

WireLogger.workAgent.debug(
"dequeued item",
attributes: .init(item)
)

do {
try await item.start()
completedItems += 1
WireLogger.workAgent.debug(
"item complete",
attributes: .init(item)
)
} catch {
WireLogger.workAgent.error(
"item failed, dropping",
attributes: .init(item)
)
continue
}
}

let duration = Date().timeIntervalSince(startTime)
let durationString = String(format: "%.2f seconds", duration)
WireLogger.workAgent.info(
"completed \(completedItems) tickets in \(durationString)",
attributes: .safePublic
)
}

await task?.value
task = nil
}

func stop() {
WireLogger.workAgent.info(
"stopping",
attributes: .safePublic
)
task?.cancel()
task = nil
}

}

private extension LogAttributes {

init(_ item: any WorkItem) {
self = [
.public: true,
.workItemID: "\(item.id)"
]
}

}
44 changes: 44 additions & 0 deletions WireDomain/Sources/WireDomain/WorkAgent/WorkItem/WorkItem.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation

/// An item of work that can be performed.
///
/// A `Workitem` is a pending operation that can be performed when instructed
/// to. Instances are created and scheduled via the `WorkAgent`.

protocol WorkItem: Sendable {

/// A unique identifier for this item.

var id: UUID { get }

/// The urgency or importance of this ticket.

var priority: WorkItemPriority { get }

/// Start the work for this item.

func start() async throws

/// Cancel the work item.

func cancel() async

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation

/// Describes the urgency of a piece of work.
///
/// Use this type to communicate how quickly a task should be addressed relative
/// to others. Higher priorities indicate greater urgency or impact, and should
/// generally be reserved for exceptional situations. When in doubt, prefer a
/// lower priority to avoid overload and allow important issues to be peformed
/// quickly.

enum WorkItemPriority: Sendable {

/// Low priority.
///
/// Use for **non-essential or deferrable background work**.
///
/// Examples include cache cleanup, telemetry uploads, or periodic
/// maintenance. These tasks can be delayed or suspended without
/// noticeable impact on the user experience.

case low

/// Medium priority.
///
/// The **default priority** for most background work.
///
/// Suitable for tasks that should run reliably but not urgently,
/// such as data synchronization, scheduled refreshes, or prefetching.
/// The scheduler may delay these tasks when the app is inactive or
/// system resources are constrained.

case medium

/// High priority.
///
/// For **user-visible or time-sensitive work** that should complete
/// promptly, but does not require immediate execution.
///
/// Use this for operations that support active user actions, like
/// uploading a photo or sending a message.

case high

/// Blocker priority.
///
/// Indicates a task that **must complete before other dependent work
/// can proceed**.
///
/// Typically used for setup or critical dependency tasks—such as
/// refreshing authentication tokens, migrating local storage. The
/// scheduler should prioritize these before normal background jobs.

case blocker

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation

/// Defines the strategy to recover from a failed or unprocessable work item.
///
/// When a `WorkItem` cannot be executed successfully, the worker may throw
/// a `WorkItemRecoveryStrategy` to indicate what should be done with the
/// failed item. This allows the system to decide whether to retry, drop,
/// or escalate the ticket based on the chosen strategy.

enum WorkItemRecoveryStrategy: Error {

/// The item should be discarded and it will not be retried.
///
/// Use this for tasks that are no longer relevant, redundant, or
/// when retrying would not be meaningful or safe.

case drop
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation

/// A scheduler that dequeues items in priority order.

actor PriorityOrderWorkItemScheduler: WorkItemScheduler {

private var blockerQueue: [any WorkItem] = []
private var highQueue: [any WorkItem] = []
private var mediumQueue: [any WorkItem] = []
private var lowQueue: [any WorkItem] = []

func enqueueItem(_ item: any WorkItem) async {
switch item.priority {
case .low:
lowQueue.append(item)
case .medium:
mediumQueue.append(item)
case .high:
highQueue.append(item)
case .blocker:
blockerQueue.append(item)
}
}

func dequeueNextItem() async -> (any WorkItem)? {
if !blockerQueue.isEmpty {
blockerQueue.removeFirst()
} else if !highQueue.isEmpty {
highQueue.removeFirst()
} else if !mediumQueue.isEmpty {
mediumQueue.removeFirst()
} else if !lowQueue.isEmpty {
lowQueue.removeFirst()
} else {
nil
}
}

}
Loading
Loading