Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,15 @@
conversationEventProcessor: conversationEventProcessor
)

public lazy var workAgent: WorkAgent = .init(scheduler: PriorityOrderWorkItemScheduler())

Check warning on line 810 in WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift

View workflow job for this annotation

GitHub Actions / Test Results

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode

Actor-isolated default value in a nonisolated context; this is an error in the Swift 6 language mode
public lazy var conversationUpdatesGenerator: ConversationUpdatesGeneratorProtocol = ConversationUpdatesGenerator(
repository: conversationRepository,
context: syncContext,
onConversationUpdated: { [weak self] workItem in

self?.workAgent.submitItem(workItem)
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import WireLogging
import WireNetwork

struct ConversationEventProcessor: ConversationEventProcessorProtocol {
Expand All @@ -38,6 +39,12 @@ struct ConversationEventProcessor: ConversationEventProcessorProtocol {
let mlsResetEventProcessor: any ConversationMLSResetEventProcessorProtocol

func processEvent(_ event: ConversationEvent) async throws {
WireLogger.eventProcessing.info(
"process conversation event: \(event.name)",
attributes: [.conversationId: event.conversationID.id.safeForLoggingDescription],
.safePublic
)

switch event {
case let .accessUpdate(event):
await accessUpdateEventProcessor.processEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,18 @@ struct ConversationMLSWelcomeEventProcessor: ConversationMLSWelcomeEventProcesso
let welcomeMessage = event.welcomeMessage
let conversationID = event.conversationID

WireLogger.mls.info("MLS event processor is processing welcome message")

// Decrypts the welcome message which returns the group ID of the conversation we were added to.
let groupID = try await mlsDecryptionService.processWelcomeMessage(
welcomeMessage: welcomeMessage,
context: nil
)

var conversation = await conversationRepository.fetchConversation(
// create conversation if needed and it will be sync by worker
let conversation = await conversationRepository.fetchOrCreateConversation(
id: conversationID.id,
domain: conversationID.domain
)

if conversation == nil {
// sync conversation with backend
try await conversationRepository.pullConversation(
id: conversationID.id,
domain: conversationID.domain
)

conversation = await conversationRepository.fetchConversation(
id: conversationID.id,
domain: conversationID.domain
)
}

guard let conversation else {
throw Failure.conversationNotFound
}

// This conversation is now a MLS one so we need to update its group ID and set MLS status to ready..
await conversationLocalStore.storeMLSConversationEstablished(
mlsGroupID: groupID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct UpdateEventProcessor: UpdateEventProcessorProtocol {
let teamEventProcessor: any TeamEventProcessorProtocol

func processEvent(_ event: UpdateEvent) async throws {
WireLogger.eventProcessing.info("process event", attributes: [.eventType: event.name], .safePublic)

switch event {
case let .conversation(event):
try await conversationEventProcessor.processEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import WireNetwork

// sourcery: AutoMockable
/// Facilitate access to conversations related domain objects.
public protocol ConversationRepositoryProtocol {
public protocol ConversationRepositoryProtocol: Sendable {

/// Fetches and persists a conversation with a given ID.
/// - Parameters:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
import Foundation
import WireDataModel
import WireLogging

/// sourcery: AutoMockable
public protocol ConversationUpdatesGeneratorProtocol {
func start() async
func stop()
}

public final class ConversationUpdatesGenerator: NSObject, ConversationUpdatesGeneratorProtocol {

private let context: NSManagedObjectContext
private let fetchedResultsController: NSFetchedResultsController<ZMConversation>
private let repository: ConversationRepositoryProtocol
private var onConversationUpdated: (UpdateConversationItem) -> Void

init(
repository: ConversationRepositoryProtocol,
context: NSManagedObjectContext,
onConversationUpdated: @escaping (UpdateConversationItem) -> Void
) {
let request = NSFetchRequest<ZMConversation>(entityName: ZMConversation.entityName())
request.predicate = ZMConversation.predicateForNeedingToBeUpdatedFromBackend()
request.sortDescriptors = [NSSortDescriptor(key: ZMConversationLastServerTimeStampKey, ascending: true)]
self.fetchedResultsController = NSFetchedResultsController(
fetchRequest: request,
managedObjectContext: context,
sectionNameKeyPath: nil,
cacheName: nil
)
self.context = context
self.onConversationUpdated = onConversationUpdated
self.repository = repository
super.init()
}

/// Starts monitoring and triggers pulls for any needingToBeUpdatedFromBackend conversations.
public func start() async {
fetchedResultsController.delegate = self
do {
try fetchedResultsController.performFetch()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class monitors the coredata change, and create ticket for scheduler

} catch {
WireLogger.conversation.error("error fetching conversations: \(String(describing: error))")
}

let conversations = fetchedResultsController.fetchedObjects ?? []
for conversation in conversations {
await context.perform {

Check warning on line 66 in WireDomain/Sources/WireDomain/Synchronization/ConversationUpdatesGenerator.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'conversation' with non-Sendable type 'ZMConversation' in a '@Sendable' closure

Capture of 'conversation' with non-Sendable type 'ZMConversation' in a '@sendable' closure
if let id = conversation.qualifiedID {

Check warning on line 67 in WireDomain/Sources/WireDomain/Synchronization/ConversationUpdatesGenerator.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'self' with non-Sendable type 'ConversationUpdatesGenerator' in a '@Sendable' closure

Capture of 'self' with non-Sendable type 'ConversationUpdatesGenerator' in a '@sendable' closure
self.onConversationUpdated(UpdateConversationItem(
repository: self.repository,
conversationID: id.toAPIModel()
))
}
}
}
}

public func stop() {
fetchedResultsController.delegate = nil
}
}

// MARK: - NSFetchedResultsControllerDelegate

extension ConversationUpdatesGenerator: NSFetchedResultsControllerDelegate {

public func controller(
_ controller: NSFetchedResultsController<NSFetchRequestResult>,
didChange anObject: Any,
at indexPath: IndexPath?,
for type: NSFetchedResultsChangeType,
newIndexPath: IndexPath?
) {
guard let conversation = anObject as? ZMConversation else { return }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Crash if this fails. The fetched results controller is configured in this file so we know that anObject right?


switch type {
case .insert:
// Insert == flag flipped to true (matches predicate now)
if let qualifiedID = conversation.qualifiedID {
onConversationUpdated(UpdateConversationItem(
repository: repository,
conversationID: qualifiedID.toAPIModel()
))
}

case .update:
// Already in the "true" set; we only act on the transition handled by `.insert`.
break

case .move, .delete:
break

@unknown default:
break
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@

let processedEnvelopeIDs: Set<UUID>
do {
logger.debug("pulling pending update events", attributes: .incrementalSyncV2)
logger.info("pulling pending update events", attributes: .incrementalSyncV2, .safePublic)
syncStateSubject.send(.incrementalSyncing(.pullPendingEvents))
try await updateEventsSync.pull()

logger.debug("processing stored update events", attributes: .incrementalSyncV2)
logger.info("processing stored update events", attributes: .incrementalSyncV2, .safePublic)
syncStateSubject.send(.incrementalSyncing(.processPendingEvents))
processedEnvelopeIDs = try await processStoredEvents()
} catch {
func tearDown() async {
logger.debug(
logger.info(
"incremental sync interrupted, tearing down...",
attributes: .incrementalSyncV2
attributes: .incrementalSyncV2, .safePublic
)
await pushChannel.close()
}
Expand All @@ -120,14 +120,14 @@

await mlsGroupRepairAgent.repairConversations()

let liveEventTask = Task { @Sendable [self] in

Check warning on line 123 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'self' with non-Sendable type 'IncrementalSync' in a '@Sendable' closure; this is an error in the Swift 6 language mode

Capture of 'self' with non-Sendable type 'IncrementalSync' in a '@sendable' closure; this is an error in the Swift 6 language mode
logger.debug("handling live event stream", attributes: .incrementalSyncV2)
logger.info("handling live event stream", attributes: .incrementalSyncV2, .safePublic)
syncStateSubject.send(.liveSyncing(.ongoing))

do {
// because we might be interrupted when in background, we wrap the sync in an expiringActivity that
// will cancel the task - not keeping any db operation (sqlite file opened) in suspend mode
try await withExpiringActivity(reason: "processLiveStream IncrementalSync") {

Check warning on line 130 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift

View workflow job for this annotation

GitHub Actions / Test Results

Capture of 'self' with non-Sendable type 'IncrementalSync' in an isolated closure; this is an error in the Swift 6 language mode

Capture of 'self' with non-Sendable type 'IncrementalSync' in an isolated closure; this is an error in the Swift 6 language mode
await processLiveEvents(
liveEventStream: liveEventStream,
processedEnvelopeIDs: processedEnvelopeIDs
Expand Down Expand Up @@ -217,13 +217,13 @@
do {
logger.debug(
"processing live event: \(event.name)",
attributes: .incrementalSyncV2 + [.eventEnvelopeID: envelope.id]
attributes: .incrementalSyncV2 + [.eventEnvelopeID: envelope.id, .eventType: event.name]
)
try await processor.processEvent(event)
} catch {
logger.error(
"failed to process live event: \(String(describing: error))",
attributes: .incrementalSyncV2 + [.eventEnvelopeID: envelope.id]
attributes: .incrementalSyncV2 + [.eventEnvelopeID: envelope.id, .eventType: event.name]
)
}
}
Expand Down
10 changes: 5 additions & 5 deletions WireDomain/Sources/WireDomain/WorkAgent/WorkAgent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import Foundation
import WireLogging

actor WorkAgent {
public actor WorkAgent {

// MARK: - Properties

Expand All @@ -32,7 +32,7 @@ actor WorkAgent {
/// Whether dequeuing should begin after ticket submission.

private var shouldAutoStart = false
func setAutoStartEnabled(_ enabled: Bool) async {
public func setAutoStartEnabled(_ enabled: Bool) async {
shouldAutoStart = enabled
}

Expand Down Expand Up @@ -63,7 +63,7 @@ actor WorkAgent {
}
}

func start() async {
public func start() async {
try? await nonReentrantTaskManager.performIfNeeded { [weak self] in
await self?.internalStart()
}
Expand Down Expand Up @@ -127,7 +127,7 @@ actor WorkAgent {
task = nil
}

func stop() {
public func stop() {
WireLogger.workAgent.info(
"stopping",
attributes: .safePublic
Expand All @@ -138,7 +138,7 @@ actor WorkAgent {

}

private extension LogAttributes {
extension LogAttributes {

init(_ item: any WorkItem) {
self = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Wire
// Copyright (C) 2025 Wire Swiss GmbH
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//

import Foundation
import WireDataModel
import WireLogging
import WireNetwork

struct UpdateConversationItem: WorkItem {
private let repository: ConversationRepositoryProtocol

var id = UUID()
var priority: WorkItemPriority {
.medium
}

var conversationID: WireNetwork.QualifiedID
Comment on lines +27 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why are id and conversationID publicly mutable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason, true I'll make it a let


public init(
repository: ConversationRepositoryProtocol,
conversationID: WireNetwork.QualifiedID,
) {
self.repository = repository
self.conversationID = conversationID
}

func start() async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (out of scope): @netbe @johnxnguyen Am I right that the way a WorkAgent has changed since we discussed it last Friday? The work now belongs to a WorkItem and not some other actor?

There were a few things that were nice about the old approach:

  1. Less need to pass dependencies to the place that the work item is created.
  2. Possibility to persist work item across app runs.

Anyway this is out of scope for this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's basically what change

do {
WireLogger.conversation.debug(
"updating conversation",
attributes: [.conversationId: conversationID.id.uuidString],
.init(self)
)
try await repository.pullConversation(id: conversationID.id, domain: conversationID.domain)

} catch ConversationRepositoryError.conversationNotFound {
WireLogger.conversation.warn(
"conversation does not on backend, delete locally",
attributes: [.conversationId: conversationID.id.uuidString],
.init(self)
)
try await repository.deleteConversation(id: conversationID.id, domain: conversationID.domain)

} catch {
// giving more context to the error
WireLogger.conversation.error(
"error updating conversation from the backend: \(String(describing: error))",
attributes: [.conversationId: conversationID.id.uuidString],
.init(self)
)
throw error
}

}

func cancel() async {
// do nothing
}
}
Loading
Loading