Skip to content

Commit d5a373c

Browse files
committed
feat(coprocessor): host-listener, report any update during a catchup
1 parent fadd8c1 commit d5a373c

File tree

3 files changed

+89
-60
lines changed

3 files changed

+89
-60
lines changed

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -877,27 +877,35 @@ async fn db_insert_block_no_retry(
877877
let mut tfhe_event_log = vec![];
878878
let block_hash = block_logs.summary.hash;
879879
let block_number = block_logs.summary.number;
880+
let mut catchup_insertion = 0;
880881
for log in &block_logs.logs {
881882
let current_address = Some(log.inner.address);
882883
let is_acl_address = &current_address == acl_contract_address;
884+
let transaction_hash = log.transaction_hash;
883885
if acl_contract_address.is_none() || is_acl_address {
884886
if let Ok(event) =
885887
AclContract::AclContractEvents::decode_log(&log.inner)
886888
{
887-
info!(acl_event = ?event, "ACL event");
888889
let handles = acl_result_handles(&event);
889890
for handle in handles {
890891
is_allowed.insert(handle.to_vec());
891892
}
892-
db.handle_acl_event(
893-
&mut tx,
894-
&event,
895-
&log.transaction_hash,
896-
chain_id,
897-
block_hash.as_ref(),
898-
block_number,
899-
)
900-
.await?;
893+
let inserted = db
894+
.handle_acl_event(
895+
&mut tx,
896+
&event,
897+
&log.transaction_hash,
898+
chain_id,
899+
block_hash.as_ref(),
900+
block_number,
901+
)
902+
.await?;
903+
if block_logs.catchup && inserted {
904+
info!(acl_event = ?event, ?transaction_hash, ?block_number, "ACL event missed before");
905+
catchup_insertion += 1;
906+
} else {
907+
info!(acl_event = ?event, ?transaction_hash, ?block_number, "ACL event");
908+
}
901909
continue;
902910
}
903911
}
@@ -927,7 +935,6 @@ async fn db_insert_block_no_retry(
927935
}
928936
}
929937
for tfhe_log in tfhe_event_log {
930-
info!(tfhe_log = ?tfhe_log, "TFHE event");
931938
let is_allowed =
932939
if let Some(result_handle) = tfhe_result_handle(&tfhe_log.event) {
933940
is_allowed.contains(&result_handle.to_vec())
@@ -938,7 +945,18 @@ async fn db_insert_block_no_retry(
938945
is_allowed,
939946
..tfhe_log
940947
};
941-
db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
948+
let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
949+
if block_logs.catchup && inserted {
950+
info!(tfhe_log = ?tfhe_log, "TFHE event missed before");
951+
catchup_insertion += 1;
952+
} else {
953+
info!(tfhe_log = ?tfhe_log, "TFHE event");
954+
}
955+
}
956+
if catchup_insertion == block_logs.logs.len() {
957+
info!(block_number, catchup_insertion, "Catchup inserted a block");
958+
} else if catchup_insertion > 0 {
959+
info!(block_number, catchup_insertion, "Catchup inserted events");
942960
}
943961
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
944962
tx.commit().await

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 57 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl Database {
206206
fhe_operation: FheOperation,
207207
scalar_byte: &FixedBytes<1>,
208208
log: &LogTfhe,
209-
) -> Result<(), SqlxError> {
209+
) -> Result<bool, SqlxError> {
210210
let bucket = self
211211
.sort_computation_into_bucket(
212212
result,
@@ -242,7 +242,7 @@ impl Database {
242242
fhe_operation: FheOperation,
243243
scalar_byte: &FixedBytes<1>,
244244
log: &LogTfhe,
245-
) -> Result<(), SqlxError> {
245+
) -> Result<bool, SqlxError> {
246246
let bucket = self
247247
.sort_computation_into_bucket(
248248
result,
@@ -276,7 +276,7 @@ impl Database {
276276
scalar_byte: &FixedBytes<1>,
277277
log: &LogTfhe,
278278
bucket: &Handle,
279-
) -> Result<(), SqlxError> {
279+
) -> Result<bool, SqlxError> {
280280
let is_scalar = !scalar_byte.is_zero();
281281
let output_handle = result.to_vec();
282282
let query = sqlx::query!(
@@ -303,7 +303,10 @@ impl Database {
303303
log.transaction_hash.map(|txh| txh.to_vec()),
304304
log.is_allowed,
305305
);
306-
query.execute(tx.deref_mut()).await.map(|_| ())
306+
query
307+
.execute(tx.deref_mut())
308+
.await
309+
.map(|result| result.rows_affected() > 0)
307310
}
308311

309312
async fn sort_computation_into_bucket(
@@ -359,7 +362,7 @@ impl Database {
359362
&self,
360363
tx: &mut Transaction<'_>,
361364
log: &LogTfhe,
362-
) -> Result<(), SqlxError> {
365+
) -> Result<bool, SqlxError> {
363366
use TfheContract as C;
364367
use TfheContractEvents as E;
365368
const HAS_SCALAR : FixedBytes::<1> = FixedBytes([1]); // if any dependency is a scalar.
@@ -443,7 +446,7 @@ impl Database {
443446
| E::Initialized(_)
444447
| E::Upgraded(_)
445448
| E::VerifyInput(_)
446-
=> Ok(()),
449+
=> Ok(false),
447450
}
448451
}
449452

@@ -490,7 +493,7 @@ impl Database {
490493
chain_id: u64,
491494
block_hash: &[u8],
492495
block_number: u64,
493-
) -> Result<(), SqlxError> {
496+
) -> Result<bool, SqlxError> {
494497
let data = &event.data;
495498

496499
let transaction_hash = transaction_hash.map(|h| h.to_vec());
@@ -502,30 +505,34 @@ impl Database {
502505
data,
503506
AclContractEvents::Allowed(_)
504507
| AclContractEvents::AllowedForDecryption(_)
508+
| AclContractEvents::DelegatedForUserDecryption(_)
509+
| AclContractEvents::RevokedDelegationForUserDecryption(_)
505510
) {
506511
self.record_transaction_begin(&transaction_hash, block_number)
507512
.await;
508513
}
509-
514+
let mut inserted = false;
510515
match data {
511516
AclContractEvents::Allowed(allowed) => {
512517
let handle = allowed.handle.to_vec();
513518

514-
self.insert_allowed_handle(
515-
tx,
516-
handle.clone(),
517-
allowed.account.to_string(),
518-
AllowEvents::AllowedAccount,
519-
transaction_hash.clone(),
520-
)
521-
.await?;
519+
inserted |= self
520+
.insert_allowed_handle(
521+
tx,
522+
handle.clone(),
523+
allowed.account.to_string(),
524+
AllowEvents::AllowedAccount,
525+
transaction_hash.clone(),
526+
)
527+
.await?;
522528

523-
self.insert_pbs_computations(
524-
tx,
525-
&vec![handle],
526-
transaction_hash,
527-
)
528-
.await?;
529+
inserted |= self
530+
.insert_pbs_computations(
531+
tx,
532+
&vec![handle],
533+
transaction_hash,
534+
)
535+
.await?;
529536
}
530537
AclContractEvents::AllowedForDecryption(allowed_for_decryption) => {
531538
let handles = allowed_for_decryption
@@ -540,26 +547,28 @@ impl Database {
540547
"Allowed for public decryption"
541548
);
542549

543-
self.insert_allowed_handle(
550+
inserted |= self
551+
.insert_allowed_handle(
552+
tx,
553+
handle,
554+
"".to_string(),
555+
AllowEvents::AllowedForDecryption,
556+
transaction_hash.clone(),
557+
)
558+
.await?;
559+
}
560+
561+
inserted |= self
562+
.insert_pbs_computations(
544563
tx,
545-
handle,
546-
"".to_string(),
547-
AllowEvents::AllowedForDecryption,
564+
&handles,
548565
transaction_hash.clone(),
549566
)
550567
.await?;
551-
}
552-
553-
self.insert_pbs_computations(
554-
tx,
555-
&handles,
556-
transaction_hash.clone(),
557-
)
558-
.await?;
559568
}
560569
AclContractEvents::DelegatedForUserDecryption(delegation) => {
561570
info!(?delegation, "Delegation for user decryption");
562-
Self::insert_delegation(
571+
inserted |= Self::insert_delegation(
563572
tx,
564573
delegation.delegator,
565574
delegation.delegate,
@@ -578,7 +587,7 @@ impl Database {
578587
delegation,
579588
) => {
580589
info!(?delegation, "Revoke delegation for user decryption");
581-
Self::insert_delegation(
590+
inserted |= Self::insert_delegation(
582591
tx,
583592
delegation.delegator,
584593
delegation.delegate,
@@ -642,7 +651,7 @@ impl Database {
642651
}
643652
}
644653
self.tick.update();
645-
Ok(())
654+
Ok(inserted)
646655
}
647656

648657
/// Adds handles to the pbs_computations table and alerts the SnS worker
@@ -652,8 +661,9 @@ impl Database {
652661
tx: &mut Transaction<'_>,
653662
handles: &Vec<Vec<u8>>,
654663
transaction_id: Option<Vec<u8>>,
655-
) -> Result<(), SqlxError> {
664+
) -> Result<bool, SqlxError> {
656665
let tenant_id = self.tenant_id;
666+
let mut inserted = false;
657667
for handle in handles {
658668
let query = sqlx::query!(
659669
"INSERT INTO pbs_computations(tenant_id, handle, transaction_id) VALUES($1, $2, $3)
@@ -662,9 +672,10 @@ impl Database {
662672
handle,
663673
transaction_id
664674
);
665-
query.execute(tx.deref_mut()).await?;
675+
inserted |=
676+
query.execute(tx.deref_mut()).await?.rows_affected() > 0;
666677
}
667-
Ok(())
678+
Ok(inserted)
668679
}
669680

670681
/// Add the handle to the allowed_handles table
@@ -675,7 +686,7 @@ impl Database {
675686
account_address: String,
676687
event_type: AllowEvents,
677688
transaction_id: Option<Vec<u8>>,
678-
) -> Result<(), SqlxError> {
689+
) -> Result<bool, SqlxError> {
679690
let tenant_id = self.tenant_id;
680691
let query = sqlx::query!(
681692
"INSERT INTO allowed_handles(tenant_id, handle, account_address, event_type, transaction_id) VALUES($1, $2, $3, $4, $5)
@@ -686,8 +697,8 @@ impl Database {
686697
event_type as i16,
687698
transaction_id
688699
);
689-
query.execute(tx.deref_mut()).await?;
690-
Ok(())
700+
let inserted = query.execute(tx.deref_mut()).await?.rows_affected() > 0;
701+
Ok(inserted)
691702
}
692703

693704
async fn record_transaction_begin(
@@ -720,7 +731,7 @@ impl Database {
720731
block_hash: &[u8],
721732
block_number: u64,
722733
transaction_id: Option<Vec<u8>>,
723-
) -> Result<(), SqlxError> {
734+
) -> Result<bool, SqlxError> {
724735
// ON CONFLIT is done on Unique constraint
725736
let query = sqlx::query!(
726737
"INSERT INTO delegate_user_decrypt(
@@ -738,8 +749,8 @@ impl Database {
738749
block_hash,
739750
transaction_id
740751
);
741-
query.execute(tx.deref_mut()).await?;
742-
Ok(())
752+
let inserted = query.execute(tx.deref_mut()).await?.rows_affected() > 0;
753+
Ok(inserted)
743754
}
744755

745756
pub async fn block_notification(

coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async fn insert_tfhe_event(
3636
tx: &mut Transaction<'_>,
3737
log: alloy::rpc::types::Log<TfheContractEvents>,
3838
is_allowed: bool,
39-
) -> Result<(), sqlx::Error> {
39+
) -> Result<bool, sqlx::Error> {
4040
let event = LogTfhe {
4141
event: log.inner,
4242
transaction_hash: log.transaction_hash,
@@ -50,7 +50,7 @@ pub async fn allow_handle(
5050
db: &ListenerDatabase,
5151
tx: &mut Transaction<'_>,
5252
handle: &[u8],
53-
) -> Result<(), sqlx::Error> {
53+
) -> Result<bool, sqlx::Error> {
5454
let account_address = String::new();
5555
let event_type = AllowEvents::AllowedForDecryption;
5656
db.insert_allowed_handle(tx, handle.to_owned(), account_address, event_type, None)

0 commit comments

Comments
 (0)