99// by the Apache License, Version 2.0.
1010
1111use std:: ops:: RangeInclusive ;
12- use std:: sync:: Arc ;
1312use std:: time:: { Duration , SystemTime } ;
1413
1514use anyhow:: Context ;
16- use futures:: StreamExt ;
15+ use futures:: { Stream , StreamExt } ;
16+ use tokio:: sync:: mpsc:: { self , Sender } ;
1717use tokio:: time:: { Instant , MissedTickBehavior } ;
18+ use tokio_stream:: wrappers:: ReceiverStream ;
1819use tracing:: { debug, instrument, warn} ;
1920
20- use restate_bifrost:: Bifrost ;
21- use restate_core:: cancellation_watcher;
21+ use restate_core:: { ShutdownError , TaskCenter , TaskHandle , TaskId , TaskKind , cancellation_watcher} ;
2222use restate_storage_api:: invocation_status_table:: { InvocationStatus , ScanInvocationStatusTable } ;
23- use restate_types:: identifiers:: WithPartitionKey ;
24- use restate_types:: identifiers:: { LeaderEpoch , PartitionKey } ;
25- use restate_types:: invocation:: PurgeInvocationRequest ;
23+ use restate_types:: identifiers:: PartitionKey ;
24+ use restate_types:: identifiers:: { InvocationId , PartitionId } ;
2625use restate_types:: retries:: with_jitter;
27- use restate_wal_protocol:: { Command , Destination , Envelope , Header , Source } ;
26+
27+ const CLEANER_EFFECT_QUEUE_SIZE : usize = 10 ;
28+
29+ #[ derive( Debug , Clone ) ]
30+ pub enum CleanerEffect {
31+ PurgeInvocation ( InvocationId ) ,
32+ PurgeJournal ( InvocationId ) ,
33+ }
34+
35+ pub ( super ) struct CleanerHandle {
36+ task_id : TaskId ,
37+ rx : ReceiverStream < CleanerEffect > ,
38+ }
39+
40+ impl CleanerHandle {
41+ pub fn stop ( self ) -> Option < TaskHandle < ( ) > > {
42+ TaskCenter :: cancel_task ( self . task_id )
43+ }
44+
45+ pub fn effects ( & mut self ) -> impl Stream < Item = CleanerEffect > {
46+ & mut self . rx
47+ }
48+ }
2849
2950pub ( super ) struct Cleaner < Storage > {
30- leader_epoch : LeaderEpoch ,
51+ partition_id : PartitionId ,
3152 partition_key_range : RangeInclusive < PartitionKey > ,
3253 storage : Storage ,
33- bifrost : Bifrost ,
3454 cleanup_interval : Duration ,
3555}
3656
@@ -39,53 +59,54 @@ where
3959 Storage : ScanInvocationStatusTable + Send + Sync + ' static ,
4060{
4161 pub ( super ) fn new (
42- leader_epoch : LeaderEpoch ,
4362 storage : Storage ,
44- bifrost : Bifrost ,
63+ partition_id : PartitionId ,
4564 partition_key_range : RangeInclusive < PartitionKey > ,
4665 cleanup_interval : Duration ,
4766 ) -> Self {
4867 Self {
49- leader_epoch ,
68+ partition_id ,
5069 partition_key_range,
5170 storage,
52- bifrost,
5371 cleanup_interval,
5472 }
5573 }
5674
57- #[ instrument( skip_all) ]
58- pub ( super ) async fn run ( self ) -> anyhow:: Result < ( ) > {
59- let Self {
60- leader_epoch,
61- partition_key_range,
62- storage,
63- bifrost,
64- cleanup_interval,
65- } = self ;
75+ pub ( super ) fn start ( self ) -> Result < CleanerHandle , ShutdownError > {
76+ let ( tx, rx) = mpsc:: channel ( CLEANER_EFFECT_QUEUE_SIZE ) ;
77+ let task_id = TaskCenter :: spawn_child ( TaskKind :: Cleaner , "cleaner" , self . run ( tx) ) ?;
6678
67- debug ! ( ?cleanup_interval, "Running cleaner" ) ;
79+ Ok ( CleanerHandle {
80+ task_id,
81+ rx : ReceiverStream :: new ( rx) ,
82+ } )
83+ }
6884
69- let bifrost_envelope_source = Source :: Processor {
70- partition_id : None ,
71- partition_key : None ,
72- leader_epoch,
73- } ;
85+ #[ instrument( skip_all) ]
86+ async fn run ( self , tx : Sender < CleanerEffect > ) -> anyhow:: Result < ( ) > {
87+ debug ! (
88+ partition_id=%self . partition_id,
89+ cleanup_interval=?self . cleanup_interval,
90+ "Running cleaner"
91+ ) ;
7492
7593 // the cleaner is currently quite an expensive scan and we don't strictly need to do it on startup, so we will wait
7694 // for 20-40% of the interval (so, 12-24 minutes by default) before doing the first one
77- let initial_wait = with_jitter ( cleanup_interval. mul_f32 ( 0.2 ) , 1.0 ) ;
95+ let initial_wait = with_jitter ( self . cleanup_interval . mul_f32 ( 0.2 ) , 1.0 ) ;
7896
7997 // the first tick will fire after initial_wait
8098 let mut interval =
81- tokio:: time:: interval_at ( Instant :: now ( ) + initial_wait, cleanup_interval) ;
99+ tokio:: time:: interval_at ( Instant :: now ( ) + initial_wait, self . cleanup_interval ) ;
82100 interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
83101
84102 loop {
85103 tokio:: select! {
86104 _ = interval. tick( ) => {
87- if let Err ( e) = Self :: do_cleanup( & storage, & bifrost, partition_key_range. clone( ) , & bifrost_envelope_source) . await {
88- warn!( "Error when trying to cleanup completed invocations: {e:?}" ) ;
105+ if let Err ( e) = self . do_cleanup( & tx) . await {
106+ warn!(
107+ partition_id=%self . partition_id,
108+ "Error when trying to cleanup completed invocations: {e:?}"
109+ ) ;
89110 }
90111 } ,
91112 _ = cancellation_watcher( ) => {
@@ -99,15 +120,12 @@ where
99120 Ok ( ( ) )
100121 }
101122
102- pub ( super ) async fn do_cleanup (
103- storage : & Storage ,
104- bifrost : & Bifrost ,
105- partition_key_range : RangeInclusive < PartitionKey > ,
106- bifrost_envelope_source : & Source ,
107- ) -> anyhow:: Result < ( ) > {
108- debug ! ( "Executing completed invocations cleanup" ) ;
123+ pub ( super ) async fn do_cleanup ( & self , tx : & Sender < CleanerEffect > ) -> anyhow:: Result < ( ) > {
124+ debug ! ( partition_id=%self . partition_id, "Starting invocation cleanup" ) ;
109125
110- let invocations_stream = storage. scan_invocation_statuses ( partition_key_range) ?;
126+ let invocations_stream = self
127+ . storage
128+ . scan_invocation_statuses ( self . partition_key_range . clone ( ) ) ?;
111129 tokio:: pin!( invocations_stream) ;
112130
113131 while let Some ( ( invocation_id, invocation_status) ) = invocations_stream
@@ -132,24 +150,9 @@ where
132150 . checked_add ( completed_invocation. completion_retention_duration )
133151 && now >= status_expiration_time
134152 {
135- restate_bifrost:: append_to_bifrost (
136- bifrost,
137- Arc :: new ( Envelope {
138- header : Header {
139- source : bifrost_envelope_source. clone ( ) ,
140- dest : Destination :: Processor {
141- partition_key : invocation_id. partition_key ( ) ,
142- dedup : None ,
143- } ,
144- } ,
145- command : Command :: PurgeInvocation ( PurgeInvocationRequest {
146- invocation_id,
147- response_sink : None ,
148- } ) ,
149- } ) ,
150- )
151- . await
152- . context ( "Cannot append to bifrost purge invocation" ) ?;
153+ tx. send ( CleanerEffect :: PurgeInvocation ( invocation_id) )
154+ . await
155+ . context ( "Cannot append to bifrost purge invocation" ) ?;
153156 continue ;
154157 }
155158
@@ -165,24 +168,9 @@ where
165168 } ;
166169
167170 if now >= journal_expiration_time {
168- restate_bifrost:: append_to_bifrost (
169- bifrost,
170- Arc :: new ( Envelope {
171- header : Header {
172- source : bifrost_envelope_source. clone ( ) ,
173- dest : Destination :: Processor {
174- partition_key : invocation_id. partition_key ( ) ,
175- dedup : None ,
176- } ,
177- } ,
178- command : Command :: PurgeJournal ( PurgeInvocationRequest {
179- invocation_id,
180- response_sink : None ,
181- } ) ,
182- } ) ,
183- )
184- . await
185- . context ( "Cannot append to bifrost purge journal" ) ?;
171+ tx. send ( CleanerEffect :: PurgeJournal ( invocation_id) )
172+ . await
173+ . context ( "Cannot append to bifrost purge journal" ) ?;
186174 continue ;
187175 }
188176 }
@@ -198,16 +186,13 @@ mod tests {
198186
199187 use futures:: { Stream , stream} ;
200188 use googletest:: prelude:: * ;
201- use restate_core:: { Metadata , TaskCenter , TaskKind , TestCoreEnvBuilder } ;
202189 use restate_storage_api:: StorageError ;
203190 use restate_storage_api:: invocation_status_table:: {
204191 CompletedInvocation , InFlightInvocationMetadata , InvocationStatus ,
205192 InvokedInvocationStatusLite , JournalMetadata , ScanInvocationStatusTable ,
206193 } ;
207194 use restate_storage_api:: protobuf_types:: v1:: lazy:: InvocationStatusV2Lazy ;
208- use restate_types:: Version ;
209195 use restate_types:: identifiers:: { InvocationId , InvocationUuid } ;
210- use restate_types:: partition_table:: { FindPartition , PartitionTable } ;
211196 use test_log:: test;
212197
213198 #[ allow( dead_code) ]
@@ -256,15 +241,6 @@ mod tests {
256241 // Start paused makes sure the timer is immediately fired
257242 #[ test( restate_core:: test( start_paused = true ) ) ]
258243 pub async fn cleanup_works ( ) {
259- let env = TestCoreEnvBuilder :: with_incoming_only_connector ( )
260- . set_partition_table ( PartitionTable :: with_equally_sized_partitions (
261- Version :: MIN ,
262- 1 ,
263- ) )
264- . build ( )
265- . await ;
266- let bifrost = Bifrost :: init_in_memory ( env. metadata_writer ) . await ;
267-
268244 let expired_invocation =
269245 InvocationId :: from_parts ( PartitionKey :: MIN , InvocationUuid :: mock_random ( ) ) ;
270246 let expired_journal =
@@ -315,50 +291,26 @@ mod tests {
315291 ) ,
316292 ] ) ;
317293
318- TaskCenter :: spawn (
319- TaskKind :: Cleaner ,
320- "cleaner" ,
321- Cleaner :: new (
322- LeaderEpoch :: INITIAL ,
323- mock_storage,
324- bifrost. clone ( ) ,
325- RangeInclusive :: new ( PartitionKey :: MIN , PartitionKey :: MAX ) ,
326- Duration :: from_secs ( 1 ) ,
327- )
328- . run ( ) ,
294+ let mut handle = Cleaner :: new (
295+ mock_storage,
296+ 0 . into ( ) ,
297+ RangeInclusive :: new ( PartitionKey :: MIN , PartitionKey :: MAX ) ,
298+ Duration :: from_secs ( 1 ) ,
329299 )
300+ . start ( )
330301 . unwrap ( ) ;
331302
332303 // cleanup will run after around 200ms
333- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
304+ tokio:: time:: advance ( Duration :: from_secs ( 1 ) ) . await ;
334305
335- // All the invocation ids were created with same partition keys, hence same partition id.
336- let partition_id = Metadata :: with_current ( |m| {
337- m. partition_table_snapshot ( )
338- . find_partition_id ( expired_invocation. partition_key ( ) )
339- } )
340- . unwrap ( ) ;
341-
342- let log_entries: Vec < _ > = bifrost
343- . read_all ( partition_id. into ( ) )
344- . await
345- . unwrap ( )
346- . into_iter ( )
347- . map ( |e| e. try_decode :: < Envelope > ( ) . unwrap ( ) . unwrap ( ) . command )
348- . collect ( ) ;
306+ let received: Vec < _ > = handle. effects ( ) . ready_chunks ( 10 ) . next ( ) . await . unwrap ( ) ;
349307
350308 assert_that ! (
351- log_entries ,
309+ received ,
352310 all!(
353311 len( eq( 2 ) ) ,
354- contains( pat!( Command :: PurgeInvocation ( pat!(
355- PurgeInvocationRequest {
356- invocation_id: eq( expired_invocation) ,
357- }
358- ) ) ) ) ,
359- contains( pat!( Command :: PurgeJournal ( pat!( PurgeInvocationRequest {
360- invocation_id: eq( expired_journal) ,
361- } ) ) ) ) ,
312+ contains( pat!( CleanerEffect :: PurgeInvocation ( eq( expired_invocation) ) ) ) ,
313+ contains( pat!( CleanerEffect :: PurgeJournal ( eq( expired_journal) ) ) )
362314 )
363315 ) ;
364316 }
0 commit comments