Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Structured Concurrency #255

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open

Use Structured Concurrency #255

wants to merge 52 commits into from

Conversation

MahdiBM
Copy link
Collaborator

@MahdiBM MahdiBM commented Oct 13, 2024

  • Use ServiceLifecycle
  • Remove all occurrences of unstructured Task {}s

@MahdiBM MahdiBM changed the base branch from main to mmbm-swift-releases-checker October 13, 2024 08:48
@MahdiBM MahdiBM marked this pull request as ready for review October 13, 2024 12:10
@MahdiBM MahdiBM marked this pull request as draft October 13, 2024 13:58
Base automatically changed from mmbm-swift-releases-checker to main October 13, 2024 18:22
@MahdiBM MahdiBM marked this pull request as ready for review October 20, 2024 08:50
Comment on lines +49 to +59
switch Constants.deploymentEnvironment {
case .local:
break
case .prod:
self.context.backgroundProcessor.process {
await self.cancelIfCachePopulationTakesTooLong()
}
self.context.backgroundProcessor.process {
await self.send(.shutdown)
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some env checking to make sure i can manually run Penny without worrying about interrupting the prod penny.
Other than that the file changes just move stuff to structured concurrency.

Comment on lines +178 to 195
private func setUpResetItemsTask() async {
self.resetItemsTask?.cancel()
self.resetItemsTask = Task {
let task = Task<Void, Never> {
/// Force-refresh cache after 6 hours of no activity
if (try? await Task.sleep(for: .seconds(60 * 60 * 6))) != nil {
self._cachedItems = nil
self.getFreshItemsForCache()
self.setUpResetItemsTask()
await self.getFreshItemsForCache()
await self.setUpResetItemsTask()
} else {
/// If canceled, set up the task again.
/// This way, the functions above can cancel this when they've got fresh items
/// and this will just reschedule itself for a later time.
self.setUpResetItemsTask()
await self.setUpResetItemsTask()
}
}
self.resetItemsTask = task
await task.value
}
Copy link
Collaborator Author

@MahdiBM MahdiBM Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is Task, but not unstructured.

Comment on lines +11 to +22
/// - Parameters:
/// - underlyingService: The underlying service to be run after the continuation is resolved.
/// - logger: A logger to log with.
/// - backgroundProcessor: To process the continuation with.
/// - passContinuation: Passes continuation using this closure to any other service you'd like.
/// Then the other service is responsible for correctly and timely resolving the continuation.
init(
underlyingService: UnderlyingService,
logger: Logger = Logger(label: _typeName(Self.self)),
processingOn backgroundProcessor: BackgroundProcessor,
passingContinuationWith passContinuation: @Sendable @escaping (CheckedContinuation<Void, Never>) async -> Void
) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some explanation.

@@ -23,7 +25,7 @@ actor FakeMainService: MainService {
cacheStorage.guilds[TestData.vaporGuild.id] = TestData.vaporGuild
self.cache = await DiscordCache(
gatewayManager: manager,
intents: [.guilds, .guildMembers],
intents: [.guilds, .guildMembers, .messageContent, .guildMessages],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it the same as the real app.

Comment on lines +32 to +40
mainServiceTask = Task<Void, any Error> {
try await Penny.start(mainService: fakeMainService)
}
await fakeMainService.waitForStateManagerShutdownAndDidShutdownSignals()
}

deinit {
mainServiceTask.cancel()
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cancel the task so it's not really unstructured.

Comment on lines +176 to +206
/// Services that need to wait for bot connection
let botStateManagerWrappedService = WaiterService(
underlyingService: context.botStateManager,
processingOn: context.backgroundProcessor,
passingContinuationWith: {
await context.discordEventListener.addConnectionWaiterContinuation($0)
}
)

/// Services that need to wait for caches population
let evolutionCheckerWrappedService = WaiterService(
underlyingService: context.evolutionChecker,
processingOn: context.backgroundProcessor,
passingContinuationWith: {
await context.botStateManager.addCachesPopulationContinuation($0)
}
)
let soCheckerWrappedService = WaiterService(
underlyingService: context.soChecker,
processingOn: context.backgroundProcessor,
passingContinuationWith: {
await context.botStateManager.addCachesPopulationContinuation($0)
}
)
let swiftReleasesCheckerWrappedService = WaiterService(
underlyingService: context.swiftReleasesChecker,
processingOn: context.backgroundProcessor,
passingContinuationWith: {
await context.botStateManager.addCachesPopulationContinuation($0)
}
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments as to why we're using "wrapped" services.
The WaiterService type also has some doc comments.

Comment on lines 45 to 93
@Test
func waiterServiceRunsUnderlyingService() async throws {
actor SampleService: Service {
var didRun = false
func run() async throws {
self.didRun = true
}
}

let sampleService = SampleService()

let wrappedService = WaiterService(
underlyingService: sampleService,
processingOn: context.backgroundProcessor,
passingContinuationWith: { await self.context.botStateManager.addCachesPopulationContinuation($0) }
)

try await wrappedService.run()

#expect(await sampleService.didRun == true)
}

@Test
func waiterServiceWaitsForUnderlyingService() async throws {
actor SampleService: Service {
var didRun = false
func run() async throws {
self.didRun = true
}
}

let sampleService = SampleService()

let wrappedService = WaiterService(
underlyingService: sampleService,
processingOn: context.backgroundProcessor,
passingContinuationWith: { _ in /* Do nothing */ }
)

let runningService = Task<Void, Never> {
try! await wrappedService.run()
}

try await Task.sleep(for: .seconds(5))

runningService.cancel()

#expect(await sampleService.didRun == false)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests for the waiter service

Comment on lines +462 to +464
let serviceTask = Task<Void, any Error> { [self] in
try await self.context.swiftReleasesChecker.run()
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Task isn't really "unstructured" since we cancel it in the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant