diff --git a/Cargo.lock b/Cargo.lock index 5b1e25ac539..4baef63b935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9000,6 +9000,7 @@ dependencies = [ "pem-rfc7468", "percent-encoding", "petgraph 0.6.5", + "petgraph 0.8.2", "phf_shared 0.11.2", "pkcs8", "portable-atomic", @@ -10159,6 +10160,7 @@ dependencies = [ "hashbrown 0.15.4", "indexmap 2.12.0", "serde", + "serde_derive", ] [[package]] diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index ce376d4b67c..115f4015fb8 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -63,7 +63,7 @@ oxide-tokio-rt.workspace = true oximeter-client.workspace = true oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] } oxnet.workspace = true -petgraph.workspace = true +petgraph = { workspace = true, features = ["serde-1"] } # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" ratatui.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index 91ca84ca79e..6375bad48ce 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -13,6 +13,8 @@ use anyhow::Context; use anyhow::anyhow; use anyhow::bail; use async_bb8_diesel::AsyncRunQueryDsl; +use chrono::DateTime; +use chrono::Utc; use clap::Args; use clap::Subcommand; use diesel::prelude::*; @@ -29,6 +31,11 @@ use nexus_db_queries::db::datastore::SQL_BATCH_SIZE; use nexus_db_queries::db::pagination::Paginator; use nexus_db_queries::db::pagination::paginated; use owo_colors::OwoColorize; +use petgraph::Graph; +use petgraph::graph::NodeIndex; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; use std::collections::HashSet; use std::sync::Arc; use tabled::Tabled; @@ -74,6 +81,9 @@ enum SagaCommands { /// while all inactive Nexuses are unreachable, an unreachable Nexus is not /// necessarily inactive. Abandon(SagaAbandonArgs), + + /// Show the execution of a saga + Show(SagaShowArgs), } #[derive(Clone, Debug, Args)] @@ -94,6 +104,11 @@ struct SagaAbandonArgs { bypass_sec_check: bool, } +#[derive(Clone, Copy, Debug, Args)] +struct SagaShowArgs { + saga_id: Uuid, +} + impl SagaArgs { pub async fn exec( &self, @@ -114,6 +129,10 @@ impl SagaArgs { let token = omdb.check_allow_destructive()?; cmd_sagas_abandon(omdb, opctx, datastore, *args, token).await } + + SagaCommands::Show(args) => { + cmd_sagas_show(omdb, opctx, datastore, *args).await + } } } } @@ -126,29 +145,7 @@ async fn cmd_sagas_running( let sagas = get_all_sagas_in_state(&conn, SagaState::Running).await?; - #[derive(Tabled)] - struct SagaRow { - id: Uuid, - current_sec: String, - time_created: String, - name: String, - state: String, - } - - let rows: Vec<_> = sagas - .into_iter() - .map(|saga: Saga| SagaRow { - id: saga.id.0.into(), - current_sec: if let Some(current_sec) = saga.current_sec { - current_sec.0.to_string() - } else { - String::from("-") - }, - time_created: datetime_rfc3339_concise(&saga.time_created), - name: saga.name, - state: format!("{:?}", saga.saga_state), - }) - .collect(); + let rows: Vec<_> = sagas.into_iter().map(SagaRow::from).collect(); let table = tabled::Table::new(rows) .with(tabled::settings::Style::psql()) @@ -159,6 +156,43 @@ async fn cmd_sagas_running( Ok(()) } +#[derive(Tabled)] +struct SagaRow { + id: Uuid, + current_sec: String, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + name: String, + state: String, +} + +impl From for SagaRow { + fn from(saga: Saga) -> Self { + let Saga { + id, + creator: _, + time_created, + name, + saga_dag: _, + saga_state, + current_sec, + adopt_generation: _, + adopt_time: _, + } = saga; + Self { + id: id.0.into(), + current_sec: if let Some(current_sec) = current_sec { + current_sec.0.to_string() + } else { + String::from("-") + }, + time_created, + name, + state: format!("{saga_state:?}"), + } + } +} + async fn cmd_sagas_inject_error( omdb: &Omdb, opctx: &OpContext, @@ -646,3 +680,351 @@ async fn get_saga_sec_status( }, } } + +// Copy some types from Steno, because steno uses pub(crate) everywhere. We +// don't want to change Steno to make the internals public, but these types +// should be fairly stable. +#[derive(Serialize, Deserialize)] +struct StenoDag { + pub saga_name: String, + pub graph: Graph, + pub start_node: NodeIndex, + pub end_node: NodeIndex, +} + +impl StenoDag { + pub fn get(&self, node_index: NodeIndex) -> Option<&StenoNode> { + self.graph.node_weight(node_index) + } + + pub fn get_from_saga_node_id( + &self, + saga_node_id: &nexus_db_model::saga_types::SagaNodeId, + ) -> Option<&StenoNode> { + self.get(u32::from(saga_node_id.0).into()) + } +} + +#[derive(Serialize, Deserialize, Debug)] +enum StenoNode { + Start { params: Arc }, + End, + Action { name: String, label: String, action_name: String }, + Constant { name: String, value: Arc }, + SubsagaStart { saga_name: String, params_node_name: String }, + SubsagaEnd { name: String }, +} + +async fn cmd_sagas_show( + _omdb: &Omdb, + _opctx: &OpContext, + datastore: &DataStore, + SagaShowArgs { saga_id }: SagaShowArgs, +) -> anyhow::Result<()> { + let conn = datastore.pool_connection_for_tests().await?; + let mut nodes = Vec::new(); + let mut paginator = + Paginator::new(SQL_BATCH_SIZE, dropshot::PaginationOrder::Ascending); + + while let Some(p) = paginator.next() { + use nexus_db_schema::schema::saga_node_event::dsl; + + let batch = paginated( + dsl::saga_node_event, + dsl::event_time, + &p.current_pagparams(), + ) + .filter(dsl::saga_id.eq(saga_id)) + .order_by(dsl::event_time) + .select(SagaNodeEvent::as_select()) + .load_async(&*conn) + .await + .with_context(|| format!("error fetching saga nodes for {saga_id}"))?; + paginator = + p.found_batch(&batch, &|node: &SagaNodeEvent| node.event_time); + nodes.extend(batch); + } + + let saga = { + use nexus_db_schema::schema::saga::dsl; + dsl::saga + .filter(dsl::id.eq(saga_id)) + .select(Saga::as_select()) + .first_async(&*conn) + .await + .with_context(|| format!("error fetching saga {saga_id}"))? + }; + + print_saga_nodes(Some(saga), nodes); + + Ok(()) +} + +/// Print a table showing saga nodes. If a Saga object is supplied as the first +/// argument, then look up the saga node's name and use that for output instead +/// of a node id. +fn print_saga_nodes(saga: Option, saga_nodes: Vec) { + let dag: Option = saga.as_ref().map(|saga: &Saga| { + serde_json::from_value(saga.saga_dag.clone()).unwrap() + }); + + if let Some(saga) = saga { + let dag = saga.saga_dag.clone(); + + let table = tabled::Table::new(Some(SagaRow::from(saga))) + .with(tabled::settings::Style::psql()) + .to_string(); + + println!("{table}\n"); + println!("DAG: {}", dag); + println!(); + } + + struct SagaNodeRow { + saga_id: Uuid, + event_time: String, + sub_saga_id: Option, + node_id: String, + event_type: String, + data: String, + } + + // Keep track of which saga nodes are subsaga start nodes. + let mut sub_saga_starts: BTreeMap< + nexus_db_model::saga_types::SagaNodeId, + u32, + > = BTreeMap::default(); + let mut sub_saga_counter = 0; + + // Keep track of which nodes belong to which sub sagas + let mut sub_saga_map: BTreeMap< + nexus_db_model::saga_types::SagaNodeId, + u32, + > = BTreeMap::default(); + + let mut rows: Vec<_> = Vec::with_capacity(saga_nodes.len()); + + for saga_node in saga_nodes { + let (node_id, sub_saga_id, data) = if let Some(dag) = &dag { + let dag_node = + dag.get_from_saga_node_id(&saga_node.node_id).unwrap(); + + let sub_saga_id: Option = match sub_saga_map + .get(&saga_node.node_id) + { + Some(id) => { + // We already determined this node was part of a sub + // saga. + Some(*id) + } + + None => { + // Figure out if we're in an existing sub saga + let mut this_sub_saga_start_node_id = None; + let mut this_sub_saga_id = None; + + for (sub_saga_start_node_id, sub_saga_id) in + &sub_saga_starts + { + // Is there a path from the start of the sub saga to + // this node? If so, then this node is in the sub saga. + let from: u32 = sub_saga_start_node_id.0.into(); + let to: u32 = saga_node.node_id.0.into(); + + if petgraph::algo::has_path_connecting( + &dag.graph, + from.into(), + to.into(), + None, + ) { + this_sub_saga_start_node_id = + Some(*sub_saga_start_node_id); + this_sub_saga_id = Some(*sub_saga_id); + break; + } + } + + if let Some(this_sub_saga_start_node_id) = + this_sub_saga_start_node_id + { + // Is the sub saga over? + if matches!(dag_node, StenoNode::SubsagaEnd { .. }) { + sub_saga_starts + .remove(&this_sub_saga_start_node_id); + } + } else { + // Did a new sub saga start? + if matches!(dag_node, StenoNode::SubsagaStart { .. }) + && saga_node.event_type == *"started" + { + sub_saga_starts + .insert(saga_node.node_id, sub_saga_counter); + sub_saga_counter += 1; + + this_sub_saga_id = Some(sub_saga_counter - 1); + } else { + // Not in a sub saga + } + } + + if let Some(this_sub_saga_id) = this_sub_saga_id { + sub_saga_map + .insert(saga_node.node_id, this_sub_saga_id); + } + + this_sub_saga_id + } + }; + + let node_id = format!( + "{:3}: {}", + saga_node.node_id.0, + match dag_node { + StenoNode::Start { .. } => String::from("start"), + StenoNode::End => String::from("end"), + StenoNode::Action { action_name, .. } => + action_name.clone(), + StenoNode::Constant { name, .. } => name.clone(), + StenoNode::SubsagaStart { saga_name, params_node_name } => + format!( + "subsaga start {} ({})", + saga_name, params_node_name + ), + StenoNode::SubsagaEnd { name } => + format!("subsaga end {}", name), + }, + ); + + // If the saga node produced data, label it with the saga name, not the action name + let data = match dag_node { + StenoNode::Action { name, .. } => { + if let Some(saga_node_data) = saga_node.data { + match saga_node_data { + serde_json::Value::Null => String::from(""), + + x => { + format!( + "\"{}\" => {}", + name, + serde_json::to_string(&x).unwrap(), + ) + } + } + } else { + String::from("") + } + } + + _ => String::from(""), + }; + + (node_id, sub_saga_id, data) + } else { + let node_id = format!("{}", saga_node.node_id.0); + + let data = if let Some(saga_node_data) = saga_node.data { + match saga_node_data { + serde_json::Value::Null => String::from(""), + x => serde_json::to_string(&x).unwrap(), + } + } else { + String::from("") + }; + + (node_id, None, data) + }; + + rows.push(SagaNodeRow { + saga_id: saga_node.saga_id.0.into(), + event_time: datetime_rfc3339_concise(&saga_node.event_time), + sub_saga_id, + node_id, + event_type: saga_node.event_type, + data, + }) + } + + if rows.is_empty() { + return; + } + + // We avoid `Tabled` here because some columns could be very wide, and its + // auto-sizing would cause all rows to be that wide, resulting in unreadable + // output. + let row_char_counts: Vec<_> = rows + .iter() + .map(|x| { + ( + format!("{}", x.saga_id).chars().count(), + x.event_time.chars().count(), + if let Some(sub_saga_id) = x.sub_saga_id { + format!("{}", sub_saga_id).chars().count() + } else { + 0 + }, + x.node_id.chars().count(), + x.event_type.chars().count(), + x.data.chars().count(), + ) + }) + .collect(); + + let (width0, width1, width2, width3, width4): ( + usize, + usize, + usize, + usize, + usize, + ) = ( + row_char_counts.iter().map(|x| x.0).max().unwrap(), + row_char_counts.iter().map(|x| x.1).max().unwrap(), + std::cmp::max( + row_char_counts.iter().map(|x| x.2).max().unwrap(), + "sub saga".len(), + ), + std::cmp::max( + row_char_counts.iter().map(|x| x.3).max().unwrap(), + "node id".len(), + ), + std::cmp::max( + row_char_counts.iter().map(|x| x.4).max().unwrap(), + "event type".len(), + ), + ); + + println!( + "{:>width0$} | {:width1$} | {:width2$} | {:width3$} | {:width4$} | {}", + String::from("saga id"), + String::from("event time"), + String::from("sub saga"), + String::from("node id"), + String::from("event type"), + String::from("data"), + ); + + println!( + "{:>width0$} | {:width1$} | {:width2$} | {:width3$} | {:width4$} | {}", + (0..width0).map(|_| "-").collect::(), + (0..width1).map(|_| "-").collect::(), + (0..width2).map(|_| "-").collect::(), + (0..width3).map(|_| "-").collect::(), + (0..width4).map(|_| "-").collect::(), + String::from("---"), + ); + + for row in rows { + println!( + "{:>width0$} | {:width1$} | {:>width2$} | {:width3$} | {:width4$} | {}", + row.saga_id, + row.event_time, + if let Some(sub_saga_id) = row.sub_saga_id { + format!("{}", sub_saga_id) + } else { + String::from("") + }, + row.node_id, + row.event_type, + row.data, + ); + } +} diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index b8f1f4d15f5..fded651a425 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -761,6 +761,7 @@ Commands: running List running sagas inject-error Inject an error into a saga's currently running node(s) abandon Prevent new Nexus processes from resuming execution of a saga + show Show the execution of a saga help Print this message or the help of the given subcommand(s) Options: diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 92e06434c33..f80a5193bd7 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -94,7 +94,8 @@ openapiv3 = { version = "2.2.0", default-features = false, features = ["skip_ser peg-runtime = { version = "0.8.5", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } percent-encoding = { version = "2.3.2" } -petgraph = { version = "0.6.5", features = ["serde-1"] } +petgraph-3b31131e45eafb45 = { package = "petgraph", version = "0.6.5", features = ["serde-1"] } +petgraph-c38e5c1d305a1b54 = { package = "petgraph", version = "0.8.2", features = ["serde-1"] } phf_shared = { version = "0.11.2" } pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption", "pem", "std"] } portable-atomic = { version = "1.11.0" } @@ -234,7 +235,8 @@ openapiv3 = { version = "2.2.0", default-features = false, features = ["skip_ser peg-runtime = { version = "0.8.5", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } percent-encoding = { version = "2.3.2" } -petgraph = { version = "0.6.5", features = ["serde-1"] } +petgraph-3b31131e45eafb45 = { package = "petgraph", version = "0.6.5", features = ["serde-1"] } +petgraph-c38e5c1d305a1b54 = { package = "petgraph", version = "0.8.2", features = ["serde-1"] } phf_shared = { version = "0.11.2" } pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption", "pem", "std"] } portable-atomic = { version = "1.11.0" }