diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 2a4ee417df6ec..1f2095e3987ac 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -431,13 +431,13 @@ impl GlobalBarrierManagerContext { .collect(); if expired_worker_slots.is_empty() { - debug!("no expired worker slots, skipping."); + info!("no expired worker slots, skipping."); return self.resolve_graph_info().await; } - debug!("start migrate actors."); + info!("start migrate actors."); let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec(); - debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots); + info!("got to migrate worker slots {:#?}", to_migrate_worker_slots); let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots .intersection(&active_worker_slots) @@ -538,6 +538,8 @@ impl GlobalBarrierManagerContext { warn!(?changed, "get worker changed or timed out. Retry migrate"); } + info!("migration plan {:?}", plan); + mgr.catalog_controller.migrate_actors(plan).await?; info!("migrate actors succeed."); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 267db45e75672..d6efcf932039f 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -54,7 +54,7 @@ use sea_orm::{ ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value, }; - +use tracing::debug; use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, @@ -1028,12 +1028,17 @@ impl CatalogController { .insert(*actor_id); } - let expired_workers: HashSet<_> = plan.keys().map(|k| k.worker_id() as WorkerId).collect(); + let expired_or_changed_workers: HashSet<_> = + plan.keys().map(|k| k.worker_id() as WorkerId).collect(); let mut actor_migration_plan = HashMap::new(); for (worker, fragment) in actor_locations { - if expired_workers.contains(&worker) { - for (_, actors) in fragment { + if expired_or_changed_workers.contains(&worker) { + for (fragment_id, actors) in fragment { + debug!( + "worker {} expired or changed, migrating fragment {}", + worker, fragment_id + ); let worker_slot_to_actor: HashMap<_, _> = actors .iter() .enumerate() @@ -1043,8 +1048,9 @@ impl CatalogController { .collect(); for (worker_slot, actor) in worker_slot_to_actor { - actor_migration_plan - .insert(actor, plan[&worker_slot].worker_id() as WorkerId); + if let Some(target) = plan.get(&worker_slot) { + actor_migration_plan.insert(actor, target.worker_id() as WorkerId); + } } } }