-
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Structured Concurrency #255
base: main
Are you sure you want to change the base?
Conversation
1f4e208
to
bfee243
Compare
switch Constants.deploymentEnvironment { | ||
case .local: | ||
break | ||
case .prod: | ||
self.context.backgroundProcessor.process { | ||
await self.cancelIfCachePopulationTakesTooLong() | ||
} | ||
self.context.backgroundProcessor.process { | ||
await self.send(.shutdown) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some env checking to make sure i can manually run Penny without worrying about interrupting the prod penny.
Other than that the file changes just move stuff to structured concurrency.
private func setUpResetItemsTask() async { | ||
self.resetItemsTask?.cancel() | ||
self.resetItemsTask = Task { | ||
let task = Task<Void, Never> { | ||
/// Force-refresh cache after 6 hours of no activity | ||
if (try? await Task.sleep(for: .seconds(60 * 60 * 6))) != nil { | ||
self._cachedItems = nil | ||
self.getFreshItemsForCache() | ||
self.setUpResetItemsTask() | ||
await self.getFreshItemsForCache() | ||
await self.setUpResetItemsTask() | ||
} else { | ||
/// If canceled, set up the task again. | ||
/// This way, the functions above can cancel this when they've got fresh items | ||
/// and this will just reschedule itself for a later time. | ||
self.setUpResetItemsTask() | ||
await self.setUpResetItemsTask() | ||
} | ||
} | ||
self.resetItemsTask = task | ||
await task.value | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is Task
, but not unstructured.
/// - Parameters: | ||
/// - underlyingService: The underlying service to be run after the continuation is resolved. | ||
/// - logger: A logger to log with. | ||
/// - backgroundProcessor: To process the continuation with. | ||
/// - passContinuation: Passes continuation using this closure to any other service you'd like. | ||
/// Then the other service is responsible for correctly and timely resolving the continuation. | ||
init( | ||
underlyingService: UnderlyingService, | ||
logger: Logger = Logger(label: _typeName(Self.self)), | ||
processingOn backgroundProcessor: BackgroundProcessor, | ||
passingContinuationWith passContinuation: @Sendable @escaping (CheckedContinuation<Void, Never>) async -> Void | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some explanation.
@@ -23,7 +25,7 @@ actor FakeMainService: MainService { | |||
cacheStorage.guilds[TestData.vaporGuild.id] = TestData.vaporGuild | |||
self.cache = await DiscordCache( | |||
gatewayManager: manager, | |||
intents: [.guilds, .guildMembers], | |||
intents: [.guilds, .guildMembers, .messageContent, .guildMessages], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it the same as the real app.
mainServiceTask = Task<Void, any Error> { | ||
try await Penny.start(mainService: fakeMainService) | ||
} | ||
await fakeMainService.waitForStateManagerShutdownAndDidShutdownSignals() | ||
} | ||
|
||
deinit { | ||
mainServiceTask.cancel() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cancel the task so it's not really unstructured.
/// Services that need to wait for bot connection | ||
let botStateManagerWrappedService = WaiterService( | ||
underlyingService: context.botStateManager, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { | ||
await context.discordEventListener.addConnectionWaiterContinuation($0) | ||
} | ||
) | ||
|
||
/// Services that need to wait for caches population | ||
let evolutionCheckerWrappedService = WaiterService( | ||
underlyingService: context.evolutionChecker, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { | ||
await context.botStateManager.addCachesPopulationContinuation($0) | ||
} | ||
) | ||
let soCheckerWrappedService = WaiterService( | ||
underlyingService: context.soChecker, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { | ||
await context.botStateManager.addCachesPopulationContinuation($0) | ||
} | ||
) | ||
let swiftReleasesCheckerWrappedService = WaiterService( | ||
underlyingService: context.swiftReleasesChecker, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { | ||
await context.botStateManager.addCachesPopulationContinuation($0) | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments as to why we're using "wrapped" services.
The WaiterService
type also has some doc comments.
@Test | ||
func waiterServiceRunsUnderlyingService() async throws { | ||
actor SampleService: Service { | ||
var didRun = false | ||
func run() async throws { | ||
self.didRun = true | ||
} | ||
} | ||
|
||
let sampleService = SampleService() | ||
|
||
let wrappedService = WaiterService( | ||
underlyingService: sampleService, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { await self.context.botStateManager.addCachesPopulationContinuation($0) } | ||
) | ||
|
||
try await wrappedService.run() | ||
|
||
#expect(await sampleService.didRun == true) | ||
} | ||
|
||
@Test | ||
func waiterServiceWaitsForUnderlyingService() async throws { | ||
actor SampleService: Service { | ||
var didRun = false | ||
func run() async throws { | ||
self.didRun = true | ||
} | ||
} | ||
|
||
let sampleService = SampleService() | ||
|
||
let wrappedService = WaiterService( | ||
underlyingService: sampleService, | ||
processingOn: context.backgroundProcessor, | ||
passingContinuationWith: { _ in /* Do nothing */ } | ||
) | ||
|
||
let runningService = Task<Void, Never> { | ||
try! await wrappedService.run() | ||
} | ||
|
||
try await Task.sleep(for: .seconds(5)) | ||
|
||
runningService.cancel() | ||
|
||
#expect(await sampleService.didRun == false) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests for the waiter service
let serviceTask = Task<Void, any Error> { [self] in | ||
try await self.context.swiftReleasesChecker.run() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Task
isn't really "unstructured" since we cancel it in the end.
ServiceLifecycle
Task {}
s