diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg.rs b/coprocessor/fhevm-engine/scheduler/src/dfg.rs index 30dcf87ff..a204b739e 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg.rs @@ -11,6 +11,7 @@ use daggy::petgraph::{ }; use daggy::{petgraph::graph::node_index, Dag, NodeIndex}; use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts, SupportedFheOperations}; +use tracing::{error, warn}; #[derive(Debug)] pub struct DFGOp { @@ -136,7 +137,12 @@ impl DFTxGraph { for (consumer, tx) in self.graph.node_references() { for i in tx.inputs.keys() { if let Some(producer) = self.allowed_map.get(i) { - dependence_pairs.push((producer, consumer)); + if *producer == consumer { + warn!(target: "scheduler", { }, + "Self-dependence on node"); + } else { + dependence_pairs.push((*producer, consumer)); + } } else { self.needed_map .entry(i.clone()) @@ -145,14 +151,72 @@ impl DFTxGraph { } } } + + // We build a replica of the graph and map it to the + // underlying DiGraph so we can identify cycles. + let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone(); // Add transaction dependence edges - for (producer, consumer) in dependence_pairs { - // Error only occurs in case of cyclic dependence which - // shoud not be possible between transactions. In that - // case, the whole cycle should be put in an error state. - self.graph - .add_edge(*producer, consumer, ()) - .map_err(|_| SchedulerError::CyclicDependence)?; + for (producer, consumer) in dependence_pairs.iter() { + digraph.add_edge(*producer, *consumer, ()); + } + let mut tarjan = daggy::petgraph::algo::TarjanScc::new(); + let mut sccs = Vec::new(); + tarjan.run(&digraph, |scc| { + if scc.len() > 1 { + // All non-singleton SCCs in a directed graph are + // dependence cycles + sccs.push(scc.to_vec()); + } + }); + if !sccs.is_empty() { + for scc in sccs { + error!(target: "scheduler", { cycle_size = ?scc.len() }, + "Dependence cycle detected"); + for idx in scc { + let idx = digraph + .node_weight(idx) + .ok_or(SchedulerError::DataflowGraphError)?; + let tx = self + .graph + .node_weight_mut(*idx) + .ok_or(SchedulerError::DataflowGraphError)?; + // Mark the node as uncomputable so we don't go + // and mark as completed operations that are in + // error. + tx.is_uncomputable = true; + error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) }, + "Transaction is part of a dependence cycle"); + for (_, op) in tx.graph.graph.node_references() { + self.results.push(DFGTxResult { + transaction_id: tx.transaction_id.clone(), + handle: op.result_handle.to_vec(), + compressed_ct: Err(SchedulerError::CyclicDependence.into()), + }); + } + } + } + return Err(SchedulerError::CyclicDependence.into()); + } else { + // If no dependence cycles were found, then we can + // complete the graph and proceed to execution + for (producer, consumer) in dependence_pairs.iter() { + // The error case here should not happen as we've + // already covered it by testing for SCCs in the graph + // first + if self.graph.add_edge(*producer, *consumer, ()).is_err() { + let prod = self + .graph + .node_weight(*producer) + .ok_or(SchedulerError::DataflowGraphError)?; + let cons = self + .graph + .node_weight(*consumer) + .ok_or(SchedulerError::DataflowGraphError)?; + error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) }, + "Dependence cycle when adding dependence - initial cycle detection failed"); + return Err(SchedulerError::CyclicDependence.into()); + } + } } Ok(()) } @@ -220,7 +284,17 @@ impl DFTxGraph { .graph .node_weight_mut(tx_node_index) .ok_or(SchedulerError::DataflowGraphError)?; + if tx_node.is_uncomputable { + return Ok(()); + } tx_node.is_uncomputable = true; + for (_idx, op) in tx_node.graph.graph.node_references() { + self.results.push(DFGTxResult { + transaction_id: tx_node.transaction_id.clone(), + handle: op.result_handle.to_vec(), + compressed_ct: Err(SchedulerError::MissingInputs.into()), + }); + } for edge in edges.edges_directed(tx_node_index, Direction::Outgoing) { let dependent_tx_index = edge.target(); self.set_uncomputable(dependent_tx_index, edges)?; diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index f85a2842c..c85436861 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -430,7 +430,13 @@ async fn build_transaction_graph_and_execute<'a>( loop_ctx: &opentelemetry::Context, ) -> Result> { let mut tx_graph = DFTxGraph::default(); - tx_graph.build(tenant_txs)?; + if let Err(e) = tx_graph.build(tenant_txs) { + // If we had an error while building the graph, we don't + // execute anything and return to allow any set results + // (essentially errors) to be set in DB. + warn!(target: "tfhe_worker", { error = %e }, "error while building transaction graph"); + return Ok(tx_graph); + } let cts_to_query = tx_graph.needed_map.keys().cloned().collect::>(); let ciphertext_map = query_ciphertexts(&cts_to_query, *tenant_id, trx, tracer, loop_ctx).await?;