Skip to content

Commit

Permalink
fix: Fix the issue of actor migration panic caused by the in-place sc…
Browse files Browse the repository at this point in the history
…ale-down (#20316)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Feb 5, 2025
1 parent 8c94a3b commit c1da990
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
8 changes: 5 additions & 3 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ impl GlobalBarrierManagerContext {
.collect();

if expired_worker_slots.is_empty() {
debug!("no expired worker slots, skipping.");
info!("no expired worker slots, skipping.");
return self.resolve_graph_info().await;
}

debug!("start migrate actors.");
info!("start migrate actors.");
let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
.intersection(&active_worker_slots)
Expand Down Expand Up @@ -538,6 +538,8 @@ impl GlobalBarrierManagerContext {
warn!(?changed, "get worker changed or timed out. Retry migrate");
}

info!("migration plan {:?}", plan);

mgr.catalog_controller.migrate_actors(plan).await?;

info!("migrate actors succeed.");
Expand Down
18 changes: 12 additions & 6 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sea_orm::{
ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter,
QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
};

use tracing::debug;
use crate::controller::catalog::{CatalogController, CatalogControllerInner};
use crate::controller::utils::{
get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors,
Expand Down Expand Up @@ -1028,12 +1028,17 @@ impl CatalogController {
.insert(*actor_id);
}

let expired_workers: HashSet<_> = plan.keys().map(|k| k.worker_id() as WorkerId).collect();
let expired_or_changed_workers: HashSet<_> =
plan.keys().map(|k| k.worker_id() as WorkerId).collect();

let mut actor_migration_plan = HashMap::new();
for (worker, fragment) in actor_locations {
if expired_workers.contains(&worker) {
for (_, actors) in fragment {
if expired_or_changed_workers.contains(&worker) {
for (fragment_id, actors) in fragment {
debug!(
"worker {} expired or changed, migrating fragment {}",
worker, fragment_id
);
let worker_slot_to_actor: HashMap<_, _> = actors
.iter()
.enumerate()
Expand All @@ -1043,8 +1048,9 @@ impl CatalogController {
.collect();

for (worker_slot, actor) in worker_slot_to_actor {
actor_migration_plan
.insert(actor, plan[&worker_slot].worker_id() as WorkerId);
if let Some(target) = plan.get(&worker_slot) {
actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
}
}
}
}
Expand Down

0 comments on commit c1da990

Please sign in to comment.