@@ -24,6 +24,7 @@ pub(crate) mod open_candidate_region;
2424pub mod test_util;
2525pub ( crate ) mod update_metadata;
2626pub ( crate ) mod upgrade_candidate_region;
27+ pub ( crate ) mod utils;
2728
2829use std:: any:: Any ;
2930use std:: collections:: { HashMap , HashSet } ;
@@ -100,9 +101,14 @@ where
100101#[ derive( Debug , Clone , Serialize , Deserialize , PartialEq ) ]
101102pub struct PersistentContext {
102103 /// The table catalog.
103- pub ( crate ) catalog : String ,
104+ #[ serde( default , skip_serializing_if = "Option::is_none" ) ]
105+ pub ( crate ) catalog : Option < String > ,
104106 /// The table schema.
105- pub ( crate ) schema : String ,
107+ #[ serde( default , skip_serializing_if = "Option::is_none" ) ]
108+ pub ( crate ) schema : Option < String > ,
109+ /// The catalog and schema of the regions.
110+ #[ serde( default ) ]
111+ pub ( crate ) catalog_and_schema : Vec < ( String , String ) > ,
106112 /// The [Peer] of migration source.
107113 pub ( crate ) from_peer : Peer ,
108114 /// The [Peer] of migration destination.
@@ -124,9 +130,17 @@ fn default_timeout() -> Duration {
124130
125131impl PersistentContext {
126132 pub fn lock_key ( & self ) -> Vec < StringKey > {
127- let mut lock_keys = Vec :: with_capacity ( self . region_ids . len ( ) + 2 ) ;
128- lock_keys. push ( CatalogLock :: Read ( & self . catalog ) . into ( ) ) ;
129- lock_keys. push ( SchemaLock :: read ( & self . catalog , & self . schema ) . into ( ) ) ;
133+ let mut lock_keys =
134+ Vec :: with_capacity ( self . region_ids . len ( ) + 2 + self . catalog_and_schema . len ( ) * 2 ) ;
135+ if let ( Some ( catalog) , Some ( schema) ) = ( & self . catalog , & self . schema ) {
136+ lock_keys. push ( CatalogLock :: Read ( catalog) . into ( ) ) ;
137+ lock_keys. push ( SchemaLock :: read ( catalog, schema) . into ( ) ) ;
138+ }
139+ for ( catalog, schema) in self . catalog_and_schema . iter ( ) {
140+ lock_keys. push ( CatalogLock :: Read ( catalog) . into ( ) ) ;
141+ lock_keys. push ( SchemaLock :: read ( catalog, schema) . into ( ) ) ;
142+ }
143+
130144 // Sort the region ids to ensure the same order of region ids.
131145 let mut region_ids = self . region_ids . clone ( ) ;
132146 region_ids. sort_unstable ( ) ;
0 commit comments