1
1
use std:: {
2
2
collections:: HashSet ,
3
+ ops:: Deref ,
3
4
sync:: Arc ,
4
5
time:: Duration ,
5
6
} ;
@@ -43,6 +44,7 @@ use database::{
43
44
} ;
44
45
use errors:: ErrorMetadataAnyhowExt ;
45
46
use futures:: {
47
+ channel:: oneshot,
46
48
future:: Either ,
47
49
select_biased,
48
50
stream:: FuturesUnordered ,
@@ -132,6 +134,19 @@ const INITIAL_BACKOFF: Duration = Duration::from_millis(10);
132
134
const MAX_BACKOFF : Duration = Duration :: from_secs ( 5 ) ;
133
135
134
136
pub struct ScheduledJobExecutor < RT : Runtime > {
137
+ context : ScheduledJobContext < RT > ,
138
+ }
139
+
140
+ impl < RT : Runtime > Deref for ScheduledJobExecutor < RT > {
141
+ type Target = ScheduledJobContext < RT > ;
142
+
143
+ fn deref ( & self ) -> & Self :: Target {
144
+ & self . context
145
+ }
146
+ }
147
+
148
+ #[ derive( Clone ) ]
149
+ pub struct ScheduledJobContext < RT : Runtime > {
135
150
rt : RT ,
136
151
database : Database < RT > ,
137
152
runner : Arc < ApplicationFunctionRunner < RT > > ,
@@ -146,10 +161,12 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
146
161
function_log : FunctionExecutionLog < RT > ,
147
162
) -> impl Future < Output = ( ) > + Send {
148
163
let executor = Self {
149
- rt,
150
- database,
151
- runner,
152
- function_log,
164
+ context : ScheduledJobContext {
165
+ rt,
166
+ database,
167
+ runner,
168
+ function_log,
169
+ } ,
153
170
} ;
154
171
async move {
155
172
let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
@@ -170,10 +187,12 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
170
187
function_log : FunctionExecutionLog < RT > ,
171
188
) -> Self {
172
189
Self {
173
- rt,
174
- database,
175
- runner,
176
- function_log,
190
+ context : ScheduledJobContext {
191
+ rt,
192
+ database,
193
+ runner,
194
+ function_log,
195
+ } ,
177
196
}
178
197
}
179
198
@@ -231,7 +250,22 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
231
250
next_job_wait = Some ( Duration :: from_secs ( 5 ) ) ;
232
251
break ;
233
252
}
234
- futures. push ( self . execute_job ( job, job_id) ) ;
253
+ let ( tx, rx) = oneshot:: channel ( ) ;
254
+ let context = self . context . clone ( ) ;
255
+ self . rt . spawn ( "spawn_scheduled_job" , async move {
256
+ let result = context. execute_job ( job, job_id) . await ;
257
+ let _ = tx. send ( result) ;
258
+ } ) ;
259
+
260
+ futures. push ( async move {
261
+ let Ok ( result) = rx. await else {
262
+ // This should never happen, but if it does, it's the same scenario as if
263
+ // backend crashed during execution which we have to handle anyway.
264
+ report_error ( & mut anyhow:: anyhow!( "Cancelled job!" ) ) ;
265
+ return job_id;
266
+ } ;
267
+ result
268
+ } ) ;
235
269
running_job_ids. insert ( job_id) ;
236
270
}
237
271
@@ -243,6 +277,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
243
277
244
278
let token = tx. into_token ( ) ?;
245
279
let subscription = self . database . subscribe ( token) . await ?;
280
+
246
281
select_biased ! {
247
282
job_id = futures. select_next_some( ) => {
248
283
running_job_ids. remove( & job_id) ;
@@ -251,11 +286,13 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
251
286
}
252
287
_ = subscription. wait_for_invalidation( ) . fuse( ) => {
253
288
} ,
254
- } ;
289
+ }
255
290
backoff. reset ( ) ;
256
291
}
257
292
}
293
+ }
258
294
295
+ impl < RT : Runtime > ScheduledJobContext < RT > {
259
296
// This handles re-running the scheduled function on transient errors. It
260
297
// guarantees that the job was successfully run or the job state changed.
261
298
pub async fn execute_job (
@@ -265,8 +302,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
265
302
) -> ResolvedDocumentId {
266
303
let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
267
304
loop {
268
- let result = self . run_function ( job. clone ( ) , job_id) . await ;
269
- match result {
305
+ match self . run_function ( job. clone ( ) , job_id) . await {
270
306
Ok ( result) => {
271
307
metrics:: log_scheduled_job_success ( backoff. failures ( ) ) ;
272
308
return result;
@@ -711,7 +747,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
711
747
}
712
748
_ = subscription. wait_for_invalidation( ) . fuse( ) => {
713
749
} ,
714
- } ;
750
+ }
715
751
backoff. reset ( ) ;
716
752
}
717
753
}
0 commit comments