Skip to content

Commit a0d76d7

Browse files
Improve EOA lock release and reduce max_inflight to 50 (#87)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **Bug Fixes** * Improved error handling and lock management for EOA operations, ensuring locks are properly released even when errors occur. * Enhanced failure detection in transaction preparation to trigger break-on-failure conditions more reliably. * **Performance & Optimization** * Adjusted concurrency limits to optimize resource utilization for EOA job processing. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 35745c2 commit a0d76d7

File tree

4 files changed

+53
-62
lines changed

4 files changed

+53
-62
lines changed

executors/src/eoa/worker/mod.rs

Lines changed: 30 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@ use alloy::primitives::{Address, U256};
33
use alloy::providers::Provider;
44
use engine_core::{
55
chain::{Chain, ChainService},
6-
credentials::{SigningCredential, KmsClientCache},
6+
credentials::{KmsClientCache, SigningCredential},
77
error::AlloyRpcErrorToEngineError,
88
signer::EoaSigner,
99
};
1010
use engine_eip7702_core::delegated_account::DelegatedAccount;
1111
use serde::{Deserialize, Serialize};
1212
use std::{sync::Arc, time::Duration};
1313
use twmq::Queue;
14-
use twmq::redis::AsyncCommands;
1514
use twmq::redis::aio::ConnectionManager;
1615
use twmq::{
1716
DurableExecution, FailHookData, NackHookData, SuccessHookData,
@@ -20,10 +19,7 @@ use twmq::{
2019
};
2120

2221
use crate::eoa::authorization_cache::EoaAuthorizationCache;
23-
use crate::eoa::store::{
24-
AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult,
25-
TransactionStoreError,
26-
};
22+
use crate::eoa::store::{AtomicEoaExecutorStore, EoaExecutorStore, EoaHealth, SubmissionResult};
2723
use crate::metrics::{
2824
EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
2925
};
@@ -127,7 +123,7 @@ where
127123

128124
// EOA metrics abstraction with encapsulated configuration
129125
pub eoa_metrics: EoaMetrics,
130-
126+
131127
// KMS client cache for AWS KMS credentials
132128
pub kms_client_cache: KmsClientCache,
133129
}
@@ -186,7 +182,10 @@ where
186182
let chain_id = chain.chain_id();
187183

188184
// Inject KMS cache into the noop signing credential (after deserialization from Redis)
189-
let noop_signing_credential = data.noop_signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache);
185+
let noop_signing_credential = data
186+
.noop_signing_credential
187+
.clone()
188+
.with_aws_kms_cache(&self.kms_client_cache);
190189

191190
let worker = EoaExecutorWorker {
192191
store: scoped,
@@ -209,16 +208,31 @@ where
209208
};
210209

211210
let job_start_time = current_timestamp_ms();
212-
let result = worker.execute_main_workflow().await?;
211+
let workflow_result = worker.execute_main_workflow().await;
212+
213+
// Always release lock, regardless of workflow success/failure
213214
if let Err(e) = worker.release_eoa_lock().await {
214215
tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock");
215216
}
216217

218+
// Propagate workflow error after releasing lock
219+
let result = workflow_result?;
220+
217221
// Record EOA job processing metrics
218222
let job_end_time = current_timestamp_ms();
219223
let job_duration = calculate_duration_seconds(job_start_time, job_end_time);
220224
record_eoa_job_processing_time(data.chain_id, job_duration);
221225

226+
tracing::info!(
227+
eoa = ?data.eoa_address,
228+
chain_id = data.chain_id,
229+
worker_id = worker_id,
230+
job_duration_seconds = job_duration,
231+
work_remaining = result.is_work_remaining(),
232+
result = ?result,
233+
"EOA executor job completed"
234+
);
235+
222236
let delay = if is_minimal_account {
223237
Some(Duration::from_secs(2))
224238
} else {
@@ -243,65 +257,29 @@ where
243257

244258
async fn on_success(
245259
&self,
246-
job: &BorrowedJob<Self::JobData>,
260+
_job: &BorrowedJob<Self::JobData>,
247261
_success_data: SuccessHookData<'_, Self::Output>,
248262
_tx: &mut TransactionContext<'_>,
249263
) {
250-
self.soft_release_eoa_lock(&job.job.data).await;
264+
// Lock is already released in process() with ownership checking
251265
}
252266

253267
async fn on_nack(
254268
&self,
255-
job: &BorrowedJob<Self::JobData>,
269+
_job: &BorrowedJob<Self::JobData>,
256270
_nack_data: NackHookData<'_, Self::ErrorData>,
257271
_tx: &mut TransactionContext<'_>,
258272
) {
259-
self.soft_release_eoa_lock(&job.job.data).await;
273+
// Lock is already released in process() with ownership checking
260274
}
261275

262-
#[tracing::instrument(name = "eoa_executor_worker_on_fail", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id, job_id = ?job.job.id))]
263276
async fn on_fail(
264277
&self,
265-
job: &BorrowedJob<Self::JobData>,
266-
fail_data: FailHookData<'_, Self::ErrorData>,
278+
_job: &BorrowedJob<Self::JobData>,
279+
_fail_data: FailHookData<'_, Self::ErrorData>,
267280
_tx: &mut TransactionContext<'_>,
268281
) {
269-
if let EoaExecutorWorkerError::StoreError { inner_error, .. } = &fail_data.error {
270-
if let TransactionStoreError::LockLost { .. } = &inner_error {
271-
tracing::error!(
272-
eoa = ?job.job.data.eoa_address,
273-
chain_id = job.job.data.chain_id,
274-
"Encountered lock lost store error, skipping soft release of EOA lock"
275-
);
276-
return;
277-
}
278-
} else {
279-
self.soft_release_eoa_lock(&job.job.data).await;
280-
}
281-
}
282-
}
283-
284-
impl<CS> EoaExecutorJobHandler<CS>
285-
where
286-
CS: ChainService + Send + Sync + 'static,
287-
{
288-
async fn soft_release_eoa_lock(&self, job_data: &EoaExecutorWorkerJobData) {
289-
let keys = EoaExecutorStoreKeys::new(
290-
job_data.eoa_address,
291-
job_data.chain_id,
292-
self.namespace.clone(),
293-
);
294-
295-
let lock_key = keys.eoa_lock_key_name();
296-
let mut conn = self.redis.clone();
297-
if let Err(e) = conn.del::<&str, ()>(&lock_key).await {
298-
tracing::error!(
299-
eoa = ?job_data.eoa_address,
300-
chain_id = job_data.chain_id,
301-
error = ?e,
302-
"Failed to release EOA lock"
303-
);
304-
}
282+
// Lock is already released in process() with ownership checking
305283
}
306284
}
307285

executors/src/eoa/worker/send.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
251251
(_, Err(e)) => {
252252
// Track balance threshold issues
253253

254+
if should_break_on_failure {
255+
failure_occurred = true;
256+
}
257+
254258
if let EoaExecutorWorkerError::TransactionSimulationFailed {
255259
inner_error, ..
256260
} = &e
@@ -287,8 +291,6 @@ impl<C: Chain> EoaExecutorWorker<C> {
287291
);
288292
// Don't propagate the error, continue processing
289293
}
290-
} else if should_break_on_failure {
291-
failure_occurred = true;
292294
}
293295
}
294296
(true, Ok(_)) => continue,

server/src/main.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
use std::{sync::Arc, time::Duration};
22

3-
use engine_core::{signer::{EoaSigner, SolanaSigner}, userop::UserOpSigner, credentials::KmsClientCache};
4-
use engine_executors::{eoa::authorization_cache::EoaAuthorizationCache, metrics::{ExecutorMetrics, initialize_metrics}, solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls}};
3+
use engine_core::{
4+
credentials::KmsClientCache,
5+
signer::{EoaSigner, SolanaSigner},
6+
userop::UserOpSigner,
7+
};
8+
use engine_executors::{
9+
eoa::authorization_cache::EoaAuthorizationCache,
10+
metrics::{ExecutorMetrics, initialize_metrics},
11+
solana_executor::rpc_cache::{SolanaRpcCache, SolanaRpcUrls},
12+
};
513
use thirdweb_core::{abi::ThirdwebAbiServiceBuilder, auth::ThirdwebAuth, iaw::IAWClient};
614
use thirdweb_engine::{
715
chains::ThirdwebChainService,
@@ -93,7 +101,10 @@ async fn main() -> anyhow::Result<()> {
93101
)
94102
.await?;
95103

96-
tracing::info!("Queue manager initialized");
104+
tracing::info!(
105+
"Queue manager initialized with queue config: {:?}",
106+
config.queue
107+
);
97108

98109
// Start queue workers
99110
tracing::info!("Starting queue workers...");
@@ -123,12 +134,12 @@ async fn main() -> anyhow::Result<()> {
123134

124135
// Initialize metrics registry and executor metrics
125136
let metrics_registry = Arc::new(prometheus::Registry::new());
126-
let executor_metrics = ExecutorMetrics::new(&metrics_registry)
127-
.expect("Failed to create executor metrics");
128-
137+
let executor_metrics =
138+
ExecutorMetrics::new(&metrics_registry).expect("Failed to create executor metrics");
139+
129140
// Initialize the executor metrics globally
130141
initialize_metrics(executor_metrics);
131-
142+
132143
tracing::info!("Executor metrics initialized");
133144

134145
let mut server = EngineServer::new(EngineServerState {

server/src/queue/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl QueueManager {
257257
namespace: queue_config.execution_namespace.clone(),
258258
redis: redis_client.get_connection_manager().await?,
259259
authorization_cache,
260-
max_inflight: 100,
260+
max_inflight: 50,
261261
max_recycled_nonces: 50,
262262
eoa_metrics,
263263
kms_client_cache,

0 commit comments

Comments
 (0)