Skip to content

Commit 384ab88

Browse files
committed
fix(coprocessor): fix handling of dependence cycles
1 parent c120f92 commit 384ab88

File tree

2 files changed

+59
-8
lines changed

2 files changed

+59
-8
lines changed

coprocessor/fhevm-engine/scheduler/src/dfg.rs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use daggy::petgraph::{
1111
};
1212
use daggy::{petgraph::graph::node_index, Dag, NodeIndex};
1313
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts, SupportedFheOperations};
14+
use tracing::error;
1415

1516
#[derive(Debug)]
1617
pub struct DFGOp {
@@ -145,14 +146,59 @@ impl DFTxGraph {
145146
}
146147
}
147148
}
149+
150+
// We build a replica of the graph and map it to the
151+
// underlying DiGraph so we can identify cycles.
152+
let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone();
148153
// Add transaction dependence edges
149-
for (producer, consumer) in dependence_pairs {
150-
// Error only occurs in case of cyclic dependence which
151-
// shoud not be possible between transactions. In that
152-
// case, the whole cycle should be put in an error state.
153-
self.graph
154-
.add_edge(*producer, consumer, ())
155-
.map_err(|_| SchedulerError::CyclicDependence)?;
154+
for (producer, consumer) in dependence_pairs.iter() {
155+
digraph.add_edge(**producer, *consumer, ());
156+
}
157+
let mut tarjan = daggy::petgraph::algo::TarjanScc::new();
158+
let mut sccs = Vec::new();
159+
tarjan.run(&self.graph.graph(), |scc| {
160+
if scc.len() > 1 {
161+
// All non-singleton SCCs in a directed graph are
162+
// dependence cycles
163+
sccs.push(scc.to_vec());
164+
}
165+
});
166+
if !sccs.is_empty() {
167+
for scc in sccs {
168+
error!(target: "scheduler", { cycle_size = ?scc.len() },
169+
"Dependence cycle detected");
170+
for idx in scc {
171+
// Mark the node as uncomputable so we don't go
172+
// and mark as completed operations that are in
173+
// error.
174+
self.set_uncomputable(idx, &Dag::new())?;
175+
let tx = self
176+
.graph
177+
.node_weight(idx)
178+
.ok_or(SchedulerError::DataflowGraphError)?;
179+
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
180+
"Transaction is part of a dependence cycle");
181+
for (_, op) in tx.graph.graph.node_references() {
182+
self.results.push(DFGTxResult {
183+
transaction_id: tx.transaction_id.clone(),
184+
handle: op.result_handle.to_vec(),
185+
compressed_ct: Err(SchedulerError::CyclicDependence.into()),
186+
});
187+
}
188+
}
189+
}
190+
return Err(SchedulerError::CyclicDependence.into());
191+
} else {
192+
// If no dependence cycles were found, then we can
193+
// complete the graph and proceed to execution
194+
for (producer, consumer) in dependence_pairs.iter() {
195+
// The error case here should not happen as we've
196+
// already covered it by testing for SCCs in the graph
197+
// first
198+
self.graph
199+
.add_edge(**producer, *consumer, ())
200+
.map_err(|_| SchedulerError::CyclicDependence)?;
201+
}
156202
}
157203
Ok(())
158204
}

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,12 @@ async fn build_transaction_graph_and_execute<'a>(
443443
loop_ctx: &opentelemetry::Context,
444444
) -> Result<DFTxGraph, Box<dyn std::error::Error + Send + Sync>> {
445445
let mut tx_graph = DFTxGraph::default();
446-
tx_graph.build(tenant_txs)?;
446+
if tx_graph.build(tenant_txs).is_err() {
447+
// If we had an error while building the graph, we don't
448+
// execute anything and return to allow any set results
449+
// (essentially errors) to be set in DB.
450+
return Ok(tx_graph);
451+
}
447452
let cts_to_query = tx_graph.needed_map.keys().cloned().collect::<Vec<_>>();
448453
let ciphertext_map =
449454
query_ciphertexts(&cts_to_query, *tenant_id, trx, tracer, loop_ctx).await?;

0 commit comments

Comments
 (0)