Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ use super::tasks::alert_dispatcher::AlertDispatcher;
use super::tasks::bfd;
use super::tasks::blueprint_execution;
use super::tasks::blueprint_load;
use super::tasks::blueprint_load::LoadedTargetBlueprint;
use super::tasks::blueprint_planner;
use super::tasks::blueprint_rendezvous;
use super::tasks::crdb_node_id_collector;
Expand Down Expand Up @@ -146,8 +147,6 @@ use nexus_config::DnsTasksConfig;
use nexus_db_model::DnsGroup;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintTarget;
use nexus_types::deployment::PendingMgsUpdates;
use nexus_types::fm;
use nexus_types::inventory::Collection;
Expand Down Expand Up @@ -1179,8 +1178,7 @@ pub struct BackgroundTasksData {
/// Channel for TUF repository artifacts to be replicated out to sleds
pub tuf_artifact_replication_rx: mpsc::Receiver<ArtifactsWithPlan>,
/// Channel for exposing the latest loaded blueprint
pub blueprint_load_tx:
watch::Sender<Option<(BlueprintTarget, Arc<Blueprint>)>>,
pub blueprint_load_tx: watch::Sender<Option<LoadedTargetBlueprint>>,
/// `reqwest::Client` for webhook delivery requests.
///
/// This is shared with the external API as it's also used when sending
Expand Down
1 change: 1 addition & 0 deletions nexus/src/app/background/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub use init::BackgroundTasksData;
pub use init::BackgroundTasksInitializer;
pub(crate) use init::BackgroundTasksInternal;
pub use nexus_background_task_interface::Activator;
pub(crate) use tasks::blueprint_load::LoadedTargetBlueprint;
pub use tasks::saga_recovery::SagaRecoveryHelpers;

use futures::future::BoxFuture;
Expand Down
88 changes: 39 additions & 49 deletions nexus/src/app/background/tasks/blueprint_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! Background task for realizing a plan blueprint

use crate::app::{
background::{Activator, BackgroundTask},
background::{Activator, BackgroundTask, LoadedTargetBlueprint},
quiesce::NexusQuiesceHandle,
};
use futures::FutureExt;
Expand All @@ -16,22 +16,20 @@ use nexus_db_queries::db::DataStore;
use nexus_reconfigurator_execution::{
RealizeBlueprintOutput, RequiredRealizeArgs,
};
use nexus_types::deployment::{
Blueprint, BlueprintTarget, PendingMgsUpdates, execution::EventBuffer,
};
use nexus_types::deployment::{PendingMgsUpdates, execution::EventBuffer};
use omicron_uuid_kinds::OmicronZoneUuid;
use serde_json::json;
use slog_error_chain::InlineErrorChain;
use std::sync::Arc;
use tokio::sync::watch;
use update_engine::NestedError;

/// Background task that takes a [`Blueprint`] and realizes the change to
/// Background task that takes a `Blueprint` and realizes the change to
/// the state of the system based on the `Blueprint`.
pub struct BlueprintExecutor {
datastore: Arc<DataStore>,
resolver: Resolver,
rx_blueprint: watch::Receiver<Option<(BlueprintTarget, Arc<Blueprint>)>>,
rx_blueprint: watch::Receiver<Option<LoadedTargetBlueprint>>,
nexus_id: OmicronZoneUuid,
tx: watch::Sender<usize>,
saga_recovery: Activator,
Expand All @@ -43,9 +41,7 @@ impl BlueprintExecutor {
pub fn new(
datastore: Arc<DataStore>,
resolver: Resolver,
rx_blueprint: watch::Receiver<
Option<(BlueprintTarget, Arc<Blueprint>)>,
>,
rx_blueprint: watch::Receiver<Option<LoadedTargetBlueprint>>,
nexus_id: OmicronZoneUuid,
saga_recovery: Activator,
mgs_update_tx: watch::Sender<PendingMgsUpdates>,
Expand Down Expand Up @@ -79,7 +75,7 @@ impl BlueprintExecutor {
// on the watch.
let update = self.rx_blueprint.borrow_and_update().clone();

let Some((bp_target, blueprint)) = update else {
let Some(LoadedTargetBlueprint { target, blueprint }) = update else {
warn!(
&opctx.log, "Blueprint execution: skipped";
"reason" => "no blueprint",
Expand Down Expand Up @@ -127,11 +123,13 @@ impl BlueprintExecutor {
}
};

if !bp_target.enabled {
warn!(&opctx.log,
"Blueprint execution: skipped";
"reason" => "blueprint disabled",
"target_id" => %blueprint.id);
if !target.enabled {
warn!(
&opctx.log,
"Blueprint execution: skipped";
"reason" => "blueprint disabled",
"target_id" => %blueprint.id,
);
return json!({
"target_id": blueprint.id.to_string(),
"enabled": false,
Expand Down Expand Up @@ -220,7 +218,9 @@ impl BackgroundTask for BlueprintExecutor {
#[cfg(test)]
mod test {
use super::BlueprintExecutor;
use crate::app::background::{Activator, BackgroundTask};
use crate::app::background::{
Activator, BackgroundTask, LoadedTargetBlueprint,
};
use crate::app::quiesce::NexusQuiesceHandle;
use httptest::Expectation;
use httptest::matchers::{not, request};
Expand Down Expand Up @@ -274,7 +274,7 @@ mod test {
opctx: &OpContext,
blueprint_zones: BTreeMap<SledUuid, IdOrdMap<BlueprintZoneConfig>>,
dns_version: Generation,
) -> (BlueprintTarget, Blueprint) {
) -> LoadedTargetBlueprint {
let id = BlueprintUuid::new_v4();
// Assume all sleds are active with no disks or datasets.
let blueprint_sleds = blueprint_zones
Expand Down Expand Up @@ -310,7 +310,7 @@ mod test {
enabled: true,
time_made_target: chrono::Utc::now(),
};
let blueprint = Blueprint {
let blueprint = Arc::new(Blueprint {
id,
sleds: blueprint_sleds,
pending_mgs_updates: PendingMgsUpdates::new(),
Expand All @@ -329,7 +329,7 @@ mod test {
creator: "test".to_string(),
comment: "test blueprint".to_string(),
source: BlueprintSource::Test,
};
});

datastore
.blueprint_insert(opctx, &blueprint)
Expand All @@ -340,7 +340,7 @@ mod test {
.await
.expect("set new blueprint as current target");

(target, blueprint)
LoadedTargetBlueprint { target, blueprint }
}

#[nexus_test(server = crate::Server)]
Expand Down Expand Up @@ -450,18 +450,11 @@ mod test {
// With a target blueprint having no zones, the task should trivially
// complete and report a successful (empty) summary.
let generation = Generation::new();
let blueprint = {
let (target, blueprint) = create_blueprint(
&datastore,
&opctx,
BTreeMap::new(),
generation,
)
.await;
(target, Arc::new(blueprint))
};
let blueprint_id = blueprint.1.id;
blueprint_tx.send(Some(blueprint)).unwrap();
let loaded =
create_blueprint(&datastore, &opctx, BTreeMap::new(), generation)
.await;
let blueprint_id = loaded.blueprint.id;
blueprint_tx.send(Some(loaded)).unwrap();
let mut value = task.activate(&opctx).await;

let event_buffer = extract_event_buffer(&mut value);
Expand Down Expand Up @@ -515,7 +508,7 @@ mod test {
// In-service zones should be deployed.
//
// TODO: add expunged zones to the test (should not be deployed).
let mut blueprint = create_blueprint(
let mut loaded = create_blueprint(
&datastore,
&opctx,
BTreeMap::from([
Expand All @@ -528,7 +521,7 @@ mod test {

// Insert records for the zpools backing the datasets in these zones.
for (sled_id, config) in
blueprint.1.all_omicron_zones(BlueprintZoneDisposition::any)
loaded.blueprint.all_omicron_zones(BlueprintZoneDisposition::any)
{
let Some(dataset) = config.zone_type.durable_dataset() else {
continue;
Expand All @@ -547,9 +540,7 @@ mod test {
.expect("failed to upsert zpool");
}

blueprint_tx
.send(Some((blueprint.0, Arc::new(blueprint.1.clone()))))
.unwrap();
blueprint_tx.send(Some(loaded.clone())).unwrap();

// Make sure that requests get made to the sled agent.
for s in [&mut s1, &mut s2] {
Expand All @@ -568,7 +559,7 @@ mod test {
assert_eq!(
value,
json!({
"target_id": blueprint.1.id.to_string(),
"target_id": loaded.blueprint.id.to_string(),
"execution_error": null,
"enabled": true,
"needs_saga_recovery": false,
Expand All @@ -589,19 +580,20 @@ mod test {
// Now, disable the target and make sure that we _don't_ invoke the sled
// agent. It's enough to just not set expectations on
// match_put_omicron_config().
blueprint.1.internal_dns_version =
blueprint.1.internal_dns_version.next();
blueprint.0.enabled = false;
blueprint_tx
.send(Some((blueprint.0, Arc::new(blueprint.1.clone()))))
.unwrap();
{
let blueprint = Arc::make_mut(&mut loaded.blueprint);
blueprint.internal_dns_version =
blueprint.internal_dns_version.next();
}
loaded.target.enabled = false;
blueprint_tx.send(Some(loaded.clone())).unwrap();
let value = task.activate(&opctx).await;
println!("when disabled: {:?}", value);
assert_eq!(
value,
json!({
"enabled": false,
"target_id": blueprint.1.id.to_string()
"target_id": loaded.blueprint.id.to_string()
})
);
s1.verify_and_clear();
Expand All @@ -616,10 +608,8 @@ mod test {

// Do it all again, but configure one of the servers to fail so we can
// verify the task's returned summary of what happened.
blueprint.0.enabled = true;
blueprint_tx
.send(Some((blueprint.0, Arc::new(blueprint.1.clone()))))
.unwrap();
loaded.target.enabled = true;
blueprint_tx.send(Some(loaded.clone())).unwrap();
s1.expect(
Expectation::matching(match_put_omicron_config())
.respond_with(status_code(204)),
Expand Down
59 changes: 39 additions & 20 deletions nexus/src/app/background/tasks/blueprint_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ use serde_json::json;
use std::sync::Arc;
use tokio::sync::watch;

#[derive(Debug, Clone)]
pub struct LoadedTargetBlueprint {
pub target: BlueprintTarget,
pub blueprint: Arc<Blueprint>,
}

pub struct TargetBlueprintLoader {
datastore: Arc<DataStore>,
last: Option<(BlueprintTarget, Arc<Blueprint>)>,
tx: watch::Sender<Option<(BlueprintTarget, Arc<Blueprint>)>>,
last: Option<LoadedTargetBlueprint>,
tx: watch::Sender<Option<LoadedTargetBlueprint>>,
}

impl TargetBlueprintLoader {
pub fn new(
datastore: Arc<DataStore>,
tx: watch::Sender<Option<(BlueprintTarget, Arc<Blueprint>)>>,
tx: watch::Sender<Option<LoadedTargetBlueprint>>,
) -> TargetBlueprintLoader {
TargetBlueprintLoader { datastore, last: None, tx }
}

/// Expose the target blueprint
pub fn watcher(
&self,
) -> watch::Receiver<Option<(BlueprintTarget, Arc<Blueprint>)>> {
pub fn watcher(&self) -> watch::Receiver<Option<LoadedTargetBlueprint>> {
self.tx.subscribe()
}
}
Expand All @@ -49,10 +53,13 @@ impl BackgroundTask for TargetBlueprintLoader {
// the current target.
let log = match &self.last {
None => opctx.log.clone(),
Some(old) => opctx.log.new(o!(
"original_target_id" => old.1.id.to_string(),
"original_time_created" => old.1.time_created.to_string(),
)),
Some(LoadedTargetBlueprint { blueprint, .. }) => {
opctx.log.new(o!(
"original_target_id" => blueprint.id.to_string(),
"original_time_created" =>
blueprint.time_created.to_string(),
))
}
};

// Retrieve the latest target blueprint
Expand Down Expand Up @@ -81,7 +88,10 @@ impl BackgroundTask for TargetBlueprintLoader {

// Decide what to do with the new blueprint
let enabled = new_bp_target.enabled;
let Some((old_bp_target, old_blueprint)) = self.last.as_ref()
let Some(LoadedTargetBlueprint {
target: old_bp_target,
blueprint: old_blueprint,
}) = self.last.as_ref()
else {
// We've found a target blueprint for the first time.
// Save it and notify any watchers.
Expand All @@ -93,7 +103,10 @@ impl BackgroundTask for TargetBlueprintLoader {
"target_id" => %target_id,
"time_created" => %time_created
);
self.last = Some((new_bp_target, Arc::new(new_blueprint)));
self.last = Some(LoadedTargetBlueprint {
target: new_bp_target,
blueprint: Arc::new(new_blueprint),
});
self.tx.send_replace(self.last.clone());
return json!({
"target_id": target_id,
Expand All @@ -114,7 +127,10 @@ impl BackgroundTask for TargetBlueprintLoader {
"target_id" => %target_id,
"time_created" => %time_created
);
self.last = Some((new_bp_target, Arc::new(new_blueprint)));
self.last = Some(LoadedTargetBlueprint {
target: new_bp_target,
blueprint: Arc::new(new_blueprint),
});
self.tx.send_replace(self.last.clone());
json!({
"target_id": target_id,
Expand Down Expand Up @@ -157,7 +173,10 @@ impl BackgroundTask for TargetBlueprintLoader {
"time_created" => %time_created,
"state" => status,
);
self.last = Some((new_bp_target, Arc::new(new_blueprint)));
self.last = Some(LoadedTargetBlueprint {
target: new_bp_target,
blueprint: Arc::new(new_blueprint),
});
self.tx.send_replace(self.last.clone());
json!({
"target_id": target_id,
Expand Down Expand Up @@ -268,7 +287,7 @@ mod test {
let initial_blueprint =
rx.borrow_and_update().clone().expect("no initial blueprint");
let update = serde_json::from_value::<TargetUpdate>(value).unwrap();
assert_eq!(update.target_id, initial_blueprint.1.id);
assert_eq!(update.target_id, initial_blueprint.blueprint.id);
assert_eq!(update.status, "first target blueprint");

let (target, blueprint) = create_blueprint(update.target_id);
Expand All @@ -278,7 +297,7 @@ mod test {
datastore.blueprint_insert(&opctx, &blueprint).await.unwrap();
let value = task.activate(&opctx).await;
let update = serde_json::from_value::<TargetUpdate>(value).unwrap();
assert_eq!(update.target_id, initial_blueprint.1.id);
assert_eq!(update.target_id, initial_blueprint.blueprint.id);
assert_eq!(update.status, "target blueprint unchanged");

// Setting a target blueprint makes the loader see it and broadcast it
Expand All @@ -288,8 +307,8 @@ mod test {
assert_eq!(update.target_id, blueprint.id);
assert_eq!(update.status, "target blueprint updated");
let rx_update = rx.borrow_and_update().clone().unwrap();
assert_eq!(rx_update.0, target);
assert_eq!(rx_update.1, blueprint);
assert_eq!(rx_update.target, target);
assert_eq!(rx_update.blueprint, blueprint);

// Activation without changing the target blueprint results in no update
let value = task.activate(&opctx).await;
Expand All @@ -310,8 +329,8 @@ mod test {
assert_eq!(update.target_id, new_blueprint.id);
assert_eq!(update.status, "target blueprint updated");
let rx_update = rx.borrow_and_update().clone().unwrap();
assert_eq!(rx_update.0, new_target);
assert_eq!(rx_update.1, new_blueprint);
assert_eq!(rx_update.target, new_target);
assert_eq!(rx_update.blueprint, new_blueprint);

// Activating again without changing the target blueprint results in
// no update
Expand Down
Loading
Loading