Skip to content

Commit a328989

Browse files
Add batch processing for failed pending transactions in EOA executor (#92)
1 parent 2dbbab2 commit a328989

File tree

2 files changed

+110
-18
lines changed

2 files changed

+110
-18
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,84 @@ impl AtomicEoaExecutorStore {
625625
Ok(())
626626
}
627627

628+
/// Fail multiple transactions that are in the pending state in a single batch operation
629+
/// This is more efficient than calling fail_pending_transaction multiple times
630+
/// when there are many failures at once
631+
pub async fn fail_pending_transactions_batch(
632+
&self,
633+
failures: Vec<(&PendingTransaction, EoaExecutorWorkerError)>,
634+
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
635+
) -> Result<(), TransactionStoreError> {
636+
if failures.is_empty() {
637+
return Ok(());
638+
}
639+
640+
let mut pipeline = twmq::redis::pipe();
641+
pipeline.atomic();
642+
643+
let pending_key = self.pending_transactions_zset_name();
644+
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
645+
646+
// Remove all transaction IDs from pending state in a single ZREM operation
647+
let transaction_ids: Vec<&str> = failures
648+
.iter()
649+
.map(|(p, _)| p.transaction_id.as_str())
650+
.collect();
651+
pipeline.zrem(&pending_key, &transaction_ids);
652+
653+
// Update transaction data with failure for each transaction
654+
for (pending_transaction, error) in &failures {
655+
let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id);
656+
657+
pipeline.hset(&tx_data_key, "completed_at", now);
658+
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
659+
pipeline.hset(&tx_data_key, "status", "failed");
660+
}
661+
662+
// Queue webhooks for all failures
663+
let mut tx_context = webhook_queue.transaction_context_from_pipeline(&mut pipeline);
664+
for (pending_transaction, error) in &failures {
665+
let event = EoaExecutorEvent {
666+
transaction_id: pending_transaction.transaction_id.clone(),
667+
address: pending_transaction.user_request.from,
668+
};
669+
670+
let fail_envelope = event.transaction_failed_envelope(error.clone(), 1);
671+
672+
if !pending_transaction.user_request.webhook_options.is_empty() {
673+
if let Err(e) = queue_webhook_envelopes(
674+
fail_envelope,
675+
pending_transaction.user_request.webhook_options.clone(),
676+
&mut tx_context,
677+
webhook_queue.clone(),
678+
) {
679+
tracing::error!(
680+
transaction_id = %pending_transaction.transaction_id,
681+
error = ?e,
682+
"Failed to queue webhook for batch fail"
683+
);
684+
}
685+
}
686+
}
687+
688+
// Execute the pipeline once
689+
let mut conn = self.redis.clone();
690+
pipeline
691+
.query_async::<Vec<twmq::redis::Value>>(&mut conn)
692+
.await?;
693+
694+
tracing::info!(
695+
count = failures.len(),
696+
eoa = ?self.eoa(),
697+
chain_id = self.chain_id(),
698+
worker_id = %self.worker_id(),
699+
"JOB_LIFECYCLE - Batch deleted {} failed pending transactions from EOA",
700+
failures.len()
701+
);
702+
703+
Ok(())
704+
}
705+
628706
pub async fn clean_submitted_transactions(
629707
&self,
630708
confirmed_transactions: &[ConfirmedTransaction],

executors/src/eoa/worker/send.rs

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,25 @@ impl<C: Chain> EoaExecutorWorker<C> {
9595
);
9696

9797
// 3. Only proceed to new nonces if we successfully used all recycled nonces
98+
let clean_start = current_timestamp_ms();
9899
let remaining_recycled = self.store.clean_and_get_recycled_nonces().await?.len();
100+
101+
tracing::info!(
102+
duration_seconds = calculate_duration_seconds(clean_start, current_timestamp_ms()),
103+
remaining_recycled = remaining_recycled,
104+
eoa = ?self.eoa,
105+
chain_id = self.chain_id,
106+
worker_id = %self.store.worker_id,
107+
"JOB_LIFECYCLE - send_flow: Cleaned and got recycled nonces"
108+
);
109+
99110
if remaining_recycled == 0 {
111+
let budget_start = current_timestamp_ms();
100112
let inflight_budget = self.store.get_inflight_budget(self.max_inflight).await?;
101113

102114
tracing::info!(
103-
duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()),
115+
duration_seconds = calculate_duration_seconds(budget_start, current_timestamp_ms()),
116+
total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()),
104117
inflight_budget = inflight_budget,
105118
eoa = ?self.eoa,
106119
chain_id = self.chain_id,
@@ -304,6 +317,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
304317
let mut cleaned_results = Vec::new();
305318
let mut balance_threshold_update_needed = false;
306319
let mut failure_occurred = false;
320+
let mut non_retryable_failures = Vec::new();
307321

308322
for (pending, result) in results.into_iter() {
309323
match (failure_occurred, result) {
@@ -330,35 +344,35 @@ impl<C: Chain> EoaExecutorWorker<C> {
330344
balance_threshold_update_needed = true;
331345
}
332346

333-
// For deterministic build failures, fail the transaction immediately
347+
// For deterministic build failures, collect for batch processing
334348
if !is_retryable_preparation_error(&e) {
335349
tracing::error!(
336350
error = ?e,
337351
transaction_id = pending.transaction_id,
338352
"Transaction permanently failed due to non-retryable preparation error",
339353
);
340-
if let Err(e) = self
341-
.store
342-
.fail_pending_transaction(
343-
pending,
344-
e.clone(),
345-
self.webhook_queue.clone(),
346-
)
347-
.await
348-
{
349-
tracing::error!(
350-
error = ?e,
351-
transaction_id = pending.transaction_id,
352-
"Failed to mark transaction as failed - transaction may be stuck in pending state"
353-
);
354-
// Don't propagate the error, continue processing
355-
}
354+
non_retryable_failures.push((pending, e.clone()));
356355
}
357356
}
358357
(true, Ok(_)) => continue,
359358
}
360359
}
361360

361+
// Batch fail all non-retryable failures in a single Redis pipeline
362+
if !non_retryable_failures.is_empty() {
363+
if let Err(e) = self
364+
.store
365+
.fail_pending_transactions_batch(non_retryable_failures, self.webhook_queue.clone())
366+
.await
367+
{
368+
tracing::error!(
369+
error = ?e,
370+
"Failed to batch mark transactions as failed - some transactions may be stuck in pending state"
371+
);
372+
// Don't propagate the error, continue processing
373+
}
374+
}
375+
362376
if balance_threshold_update_needed && let Err(e) = self.update_balance_threshold().await {
363377
tracing::error!(error = ?e, "Failed to update balance threshold");
364378
}

0 commit comments

Comments
 (0)