Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions coprocessor/fhevm-engine/scheduler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use daggy::petgraph::{
};
use daggy::{petgraph::graph::node_index, Dag, NodeIndex};
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts, SupportedFheOperations};
use tracing::{error, warn};

#[derive(Debug)]
pub struct DFGOp {
Expand Down Expand Up @@ -136,7 +137,12 @@ impl DFTxGraph {
for (consumer, tx) in self.graph.node_references() {
for i in tx.inputs.keys() {
if let Some(producer) = self.allowed_map.get(i) {
dependence_pairs.push((producer, consumer));
if *producer == consumer {
warn!(target: "scheduler", { },
"Self-dependence on node");
} else {
dependence_pairs.push((*producer, consumer));
}
} else {
self.needed_map
.entry(i.clone())
Expand All @@ -145,14 +151,72 @@ impl DFTxGraph {
}
}
}

// We build a replica of the graph and map it to the
// underlying DiGraph so we can identify cycles.
let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone();
// Add transaction dependence edges
for (producer, consumer) in dependence_pairs {
// Error only occurs in case of cyclic dependence which
// shoud not be possible between transactions. In that
// case, the whole cycle should be put in an error state.
self.graph
.add_edge(*producer, consumer, ())
.map_err(|_| SchedulerError::CyclicDependence)?;
for (producer, consumer) in dependence_pairs.iter() {
digraph.add_edge(*producer, *consumer, ());
}
let mut tarjan = daggy::petgraph::algo::TarjanScc::new();
let mut sccs = Vec::new();
tarjan.run(&digraph, |scc| {
if scc.len() > 1 {
// All non-singleton SCCs in a directed graph are
// dependence cycles
sccs.push(scc.to_vec());
}
});
if !sccs.is_empty() {
for scc in sccs {
error!(target: "scheduler", { cycle_size = ?scc.len() },
"Dependence cycle detected");
for idx in scc {
let idx = digraph
.node_weight(idx)
.ok_or(SchedulerError::DataflowGraphError)?;
let tx = self
.graph
.node_weight_mut(*idx)
.ok_or(SchedulerError::DataflowGraphError)?;
// Mark the node as uncomputable so we don't go
// and mark as completed operations that are in
// error.
tx.is_uncomputable = true;
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
"Transaction is part of a dependence cycle");
for (_, op) in tx.graph.graph.node_references() {
self.results.push(DFGTxResult {
transaction_id: tx.transaction_id.clone(),
handle: op.result_handle.to_vec(),
compressed_ct: Err(SchedulerError::CyclicDependence.into()),
});
}
}
}
return Err(SchedulerError::CyclicDependence.into());
} else {
// If no dependence cycles were found, then we can
// complete the graph and proceed to execution
for (producer, consumer) in dependence_pairs.iter() {
// The error case here should not happen as we've
// already covered it by testing for SCCs in the graph
// first
if self.graph.add_edge(*producer, *consumer, ()).is_err() {
let prod = self
.graph
.node_weight(*producer)
.ok_or(SchedulerError::DataflowGraphError)?;
let cons = self
.graph
.node_weight(*consumer)
.ok_or(SchedulerError::DataflowGraphError)?;
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
"Dependence cycle when adding dependence - initial cycle detection failed");
return Err(SchedulerError::CyclicDependence.into());
}
}
}
Ok(())
}
Expand Down Expand Up @@ -220,7 +284,17 @@ impl DFTxGraph {
.graph
.node_weight_mut(tx_node_index)
.ok_or(SchedulerError::DataflowGraphError)?;
if tx_node.is_uncomputable {
return Ok(());
}
tx_node.is_uncomputable = true;
for (_idx, op) in tx_node.graph.graph.node_references() {
self.results.push(DFGTxResult {
transaction_id: tx_node.transaction_id.clone(),
handle: op.result_handle.to_vec(),
compressed_ct: Err(SchedulerError::MissingInputs.into()),
});
}
for edge in edges.edges_directed(tx_node_index, Direction::Outgoing) {
let dependent_tx_index = edge.target();
self.set_uncomputable(dependent_tx_index, edges)?;
Expand Down
8 changes: 7 additions & 1 deletion coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,13 @@ async fn build_transaction_graph_and_execute<'a>(
loop_ctx: &opentelemetry::Context,
) -> Result<DFTxGraph, Box<dyn std::error::Error + Send + Sync>> {
let mut tx_graph = DFTxGraph::default();
tx_graph.build(tenant_txs)?;
if let Err(e) = tx_graph.build(tenant_txs) {
// If we had an error while building the graph, we don't
// execute anything and return to allow any set results
// (essentially errors) to be set in DB.
warn!(target: "tfhe_worker", { error = %e }, "error while building transaction graph");
return Ok(tx_graph);
}
let cts_to_query = tx_graph.needed_map.keys().cloned().collect::<Vec<_>>();
let ciphertext_map =
query_ciphertexts(&cts_to_query, *tenant_id, trx, tracer, loop_ctx).await?;
Expand Down
Loading