Skip to content

Commit 2613247

Browse files
committed
feat(coprocessor): host-listener, report any update during a catchup
1 parent d6ffb71 commit 2613247

File tree

2 files changed

+29
-20
lines changed

2 files changed

+29
-20
lines changed

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,15 @@ async fn db_insert_block_no_retry(
922922
is_allowed,
923923
..tfhe_log
924924
};
925-
db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
925+
let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
926+
if block_logs.catchup && inserted {
927+
warn!(
928+
tfhe_log = ?tfhe_log,
929+
block = ?block_logs.summary,
930+
nb_events = block_logs.logs.len(),
931+
"Missed event detected by catchup",
932+
);
933+
}
926934
}
927935
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
928936
tx.commit().await

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ impl Database {
200200
fhe_operation: FheOperation,
201201
scalar_byte: &FixedBytes<1>,
202202
log: &LogTfhe,
203-
) -> Result<(), SqlxError> {
203+
) -> Result<bool, SqlxError> {
204204
let bucket = self
205205
.sort_computation_into_bucket(
206206
result,
@@ -236,7 +236,7 @@ impl Database {
236236
fhe_operation: FheOperation,
237237
scalar_byte: &FixedBytes<1>,
238238
log: &LogTfhe,
239-
) -> Result<(), SqlxError> {
239+
) -> Result<bool, SqlxError> {
240240
let bucket = self
241241
.sort_computation_into_bucket(
242242
result,
@@ -270,7 +270,7 @@ impl Database {
270270
scalar_byte: &FixedBytes<1>,
271271
log: &LogTfhe,
272272
bucket: &Handle,
273-
) -> Result<(), SqlxError> {
273+
) -> Result<bool, SqlxError> {
274274
let is_scalar = !scalar_byte.is_zero();
275275
let output_handle = result.to_vec();
276276
let query = sqlx::query!(
@@ -297,7 +297,7 @@ impl Database {
297297
log.transaction_hash.map(|txh| txh.to_vec()),
298298
log.is_allowed,
299299
);
300-
query.execute(tx.deref_mut()).await.map(|_| ())
300+
query.execute(tx.deref_mut()).await.map(|result| result.rows_affected() > 0)
301301
}
302302

303303
async fn sort_computation_into_bucket(
@@ -353,7 +353,7 @@ impl Database {
353353
&self,
354354
tx: &mut Transaction<'_>,
355355
log: &LogTfhe,
356-
) -> Result<(), SqlxError> {
356+
) -> Result<bool, SqlxError> {
357357
use TfheContract as C;
358358
use TfheContractEvents as E;
359359
const HAS_SCALAR : FixedBytes::<1> = FixedBytes([1]); // if any dependency is a scalar.
@@ -437,7 +437,7 @@ impl Database {
437437
| E::Initialized(_)
438438
| E::Upgraded(_)
439439
| E::VerifyInput(_)
440-
=> Ok(()),
440+
=> Ok(false),
441441
}
442442
}
443443

@@ -482,7 +482,7 @@ impl Database {
482482
event: &Log<AclContractEvents>,
483483
transaction_hash: &Option<Handle>,
484484
block_number: &Option<u64>,
485-
) -> Result<(), SqlxError> {
485+
) -> Result<bool, SqlxError> {
486486
let data = &event.data;
487487

488488
let transaction_hash = transaction_hash.map(|h| h.to_vec());
@@ -498,12 +498,12 @@ impl Database {
498498
self.record_transaction_begin(&transaction_hash, block_number)
499499
.await;
500500
}
501-
501+
let mut inserted = false;
502502
match data {
503503
AclContractEvents::Allowed(allowed) => {
504504
let handle = allowed.handle.to_vec();
505505

506-
self.insert_allowed_handle(
506+
inserted |= self.insert_allowed_handle(
507507
tx,
508508
handle.clone(),
509509
allowed.account.to_string(),
@@ -512,7 +512,7 @@ impl Database {
512512
)
513513
.await?;
514514

515-
self.insert_pbs_computations(
515+
inserted |= self.insert_pbs_computations(
516516
tx,
517517
&vec![handle],
518518
transaction_hash,
@@ -532,7 +532,7 @@ impl Database {
532532
"Allowed for public decryption"
533533
);
534534

535-
self.insert_allowed_handle(
535+
inserted |= self.insert_allowed_handle(
536536
tx,
537537
handle,
538538
"".to_string(),
@@ -542,7 +542,7 @@ impl Database {
542542
.await?;
543543
}
544544

545-
self.insert_pbs_computations(
545+
inserted |= self.insert_pbs_computations(
546546
tx,
547547
&handles,
548548
transaction_hash.clone(),
@@ -612,7 +612,7 @@ impl Database {
612612
}
613613
}
614614
self.tick.update();
615-
Ok(())
615+
Ok(inserted)
616616
}
617617

618618
/// Adds handles to the pbs_computations table and alerts the SnS worker
@@ -622,8 +622,9 @@ impl Database {
622622
tx: &mut Transaction<'_>,
623623
handles: &Vec<Vec<u8>>,
624624
transaction_id: Option<Vec<u8>>,
625-
) -> Result<(), SqlxError> {
625+
) -> Result<bool, SqlxError> {
626626
let tenant_id = self.tenant_id;
627+
let mut inserted = false;
627628
for handle in handles {
628629
let query = sqlx::query!(
629630
"INSERT INTO pbs_computations(tenant_id, handle, transaction_id) VALUES($1, $2, $3)
@@ -632,9 +633,9 @@ impl Database {
632633
handle,
633634
transaction_id
634635
);
635-
query.execute(tx.deref_mut()).await?;
636+
inserted |= query.execute(tx.deref_mut()).await?.rows_affected() > 0;
636637
}
637-
Ok(())
638+
Ok(inserted)
638639
}
639640

640641
/// Add the handle to the allowed_handles table
@@ -645,7 +646,7 @@ impl Database {
645646
account_address: String,
646647
event_type: AllowEvents,
647648
transaction_id: Option<Vec<u8>>,
648-
) -> Result<(), SqlxError> {
649+
) -> Result<bool, SqlxError> {
649650
let tenant_id = self.tenant_id;
650651
let query = sqlx::query!(
651652
"INSERT INTO allowed_handles(tenant_id, handle, account_address, event_type, transaction_id) VALUES($1, $2, $3, $4, $5)
@@ -656,8 +657,8 @@ impl Database {
656657
event_type as i16,
657658
transaction_id
658659
);
659-
query.execute(tx.deref_mut()).await?;
660-
Ok(())
660+
let inserted = query.execute(tx.deref_mut()).await?.rows_affected() > 0;
661+
Ok(inserted)
661662
}
662663

663664
async fn record_transaction_begin(

0 commit comments

Comments
 (0)