Skip to content

Commit 196bf34

Browse files
Add TTL expiration for completed transactions in EOA executor (#94)
1 parent 5c473ef commit 196bf34

File tree

12 files changed

+79
-17
lines changed

12 files changed

+79
-17
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,14 @@ impl AtomicEoaExecutorStore {
589589
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
590590
pipeline.hset(&tx_data_key, "status", "failed");
591591

592+
// Add TTL expiration
593+
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
594+
pipeline.expire(&tx_data_key, ttl_seconds);
595+
pipeline.expire(
596+
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
597+
ttl_seconds,
598+
);
599+
592600
let event = EoaExecutorEvent {
593601
transaction_id: pending_transaction.transaction_id.clone(),
594602
address: pending_transaction.user_request.from,
@@ -657,6 +665,14 @@ impl AtomicEoaExecutorStore {
657665
pipeline.hset(&tx_data_key, "completed_at", now);
658666
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
659667
pipeline.hset(&tx_data_key, "status", "failed");
668+
669+
// Add TTL expiration
670+
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
671+
pipeline.expire(&tx_data_key, ttl_seconds);
672+
pipeline.expire(
673+
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
674+
ttl_seconds,
675+
);
660676
}
661677

662678
// Queue webhooks for all failures
@@ -715,6 +731,7 @@ impl AtomicEoaExecutorStore {
715731
keys: &self.keys,
716732
webhook_queue,
717733
eoa_metrics: &self.eoa_metrics,
734+
completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds,
718735
})
719736
.await
720737
}
@@ -732,6 +749,7 @@ impl AtomicEoaExecutorStore {
732749
keys: &self.keys,
733750
webhook_queue,
734751
eoa_metrics: &self.eoa_metrics,
752+
completed_transaction_ttl_seconds: self.store.completed_transaction_ttl_seconds,
735753
})
736754
.await
737755
}

executors/src/eoa/store/borrowed.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct ProcessBorrowedTransactions<'a> {
4141
pub keys: &'a EoaExecutorStoreKeys,
4242
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
4343
pub eoa_metrics: &'a EoaMetrics,
44+
pub completed_transaction_ttl_seconds: u64,
4445
}
4546

4647
#[derive(Debug, Default)]
@@ -226,6 +227,14 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
226227
pipeline.hset(&tx_data_key, "completed_at", now);
227228
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());
228229

230+
// Add TTL expiration
231+
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
232+
pipeline.expire(&tx_data_key, ttl_seconds);
233+
pipeline.expire(
234+
&self.keys.transaction_attempts_list_name(transaction_id),
235+
ttl_seconds,
236+
);
237+
229238
// ask for this nonce to be recycled because we did not consume the nonce
230239
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
231240

executors/src/eoa/store/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub struct TransactionData {
100100
pub struct EoaExecutorStore {
101101
pub redis: ConnectionManager,
102102
pub keys: EoaExecutorStoreKeys,
103+
pub completed_transaction_ttl_seconds: u64,
103104
}
104105

105106
pub struct EoaExecutorStoreKeys {
@@ -298,6 +299,7 @@ impl EoaExecutorStore {
298299
namespace: Option<String>,
299300
eoa: Address,
300301
chain_id: u64,
302+
completed_transaction_ttl_seconds: u64,
301303
) -> Self {
302304
Self {
303305
redis,
@@ -306,6 +308,7 @@ impl EoaExecutorStore {
306308
chain_id,
307309
namespace,
308310
},
311+
completed_transaction_ttl_seconds,
309312
}
310313
}
311314
}

executors/src/eoa/store/submitted.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ pub struct CleanSubmittedTransactions<'a> {
200200
pub keys: &'a EoaExecutorStoreKeys,
201201
pub webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
202202
pub eoa_metrics: &'a EoaMetrics,
203+
pub completed_transaction_ttl_seconds: u64,
203204
}
204205

205206
impl<'a> CleanSubmittedTransactions<'a> {
@@ -360,6 +361,11 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
360361
confirmed_tx.receipt_serialized.clone(),
361362
);
362363

364+
// Add TTL expiration
365+
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
366+
pipeline.expire(&data_key_name, ttl_seconds);
367+
pipeline.expire(&self.keys.transaction_attempts_list_name(id), ttl_seconds);
368+
363369
if let SubmittedTransactionHydrated::Real(tx) = tx {
364370
// Record metrics: transaction queued to mined for confirmed transactions
365371
let confirmed_timestamp = current_timestamp_ms();

executors/src/eoa/worker/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ where
126126

127127
// KMS client cache for AWS KMS credentials
128128
pub kms_client_cache: KmsClientCache,
129+
130+
// TTL for completed transactions
131+
pub completed_transaction_ttl_seconds: u64,
129132
}
130133

131134
impl<CS> DurableExecution for EoaExecutorJobHandler<CS>
@@ -161,6 +164,7 @@ where
161164
self.namespace.clone(),
162165
data.eoa_address,
163166
data.chain_id,
167+
self.completed_transaction_ttl_seconds,
164168
)
165169
.acquire_eoa_lock_aggressively(&worker_id, self.eoa_metrics.clone())
166170
.await

scripts/redis-cleanup/index.tsx

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ if (!process.env.REDIS_URL) {
1212
const CONFIG = {
1313
redisUrl: process.env.REDIS_URL,
1414
namespace: "engine-cloud" as string | undefined, // Set to your namespace if needed
15-
batchSize: 2000,
16-
dryRun: false, // Set to false when ready to actually delete
17-
progressInterval: 2000, // Report progress every N transactions
15+
batchSize: 10000,
16+
dryRun: true, // Set to false when ready to actually delete
17+
progressInterval: 10000, // Report progress every N transactions
1818
} as const;
1919

2020
// === TYPES ===
@@ -219,12 +219,12 @@ class EoaRedisCleanup {
219219
return false;
220220
}
221221

222-
// CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 minute minimum)
222+
// CRITICAL SAFETY CHECK: Only clean transactions that are reasonably old (1 month minimum)
223223
// This prevents cleaning transactions that just completed
224-
const oneMinuteAgo = Date.now() - 60 * 1000;
225-
if (completedMs > oneMinuteAgo) {
224+
const oneMonthAgo = Date.now() - 30 * 24 * 60 * 60 * 1000;
225+
if (completedMs > oneMonthAgo) {
226226
this.log(
227-
`🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 minute ago)`
227+
`🛡️ SAFETY: Skipping tx ${tx.id} - completed too recently (less than 1 month ago)`
228228
);
229229
return false;
230230
}
@@ -302,11 +302,11 @@ class EoaRedisCleanup {
302302
if (tx.status === "failed") this.stats.failed++;
303303

304304
const keysToDelete = this.buildKeysToDelete(tx.id);
305-
this.log(
306-
`🔍 [DRY RUN] Would clean: ${tx.id} (${tx.status}) - ${keysToDelete.length} keys`
307-
);
308305
this.stats.cleaned++;
309306
}
307+
this.log(
308+
`🔍 [DRY RUN] Would clean: ${transactions.length} transactions (${this.stats.confirmed} confirmed, ${this.stats.failed} failed)`
309+
);
310310
return;
311311
}
312312

scripts/simple-redis-cleanup.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ if (!process.env.REDIS_URL) {
1010
const CONFIG = {
1111
redisUrl: process.env.REDIS_URL,
1212
batchSize: 5000,
13-
dryRun: false, // Set to false to actually delete
13+
dryRun: true, // Set to false to actually delete
1414
} as const;
1515

1616
class SimpleRedisCleanup {

server/configuration/server_base.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,4 @@ queue:
4040
eoa_send_degradation_threshold_seconds: 30
4141
eoa_confirmation_degradation_threshold_seconds: 60
4242
eoa_stuck_threshold_seconds: 300
43+
completed_transaction_ttl_seconds: 86400 # 1 day

server/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ pub struct QueueConfig {
5050

5151
#[serde(default)]
5252
pub monitoring: MonitoringConfig,
53+
54+
#[serde(default = "default_completed_transaction_ttl_seconds")]
55+
pub completed_transaction_ttl_seconds: u64,
5356
}
5457

5558
#[derive(Debug, Clone, Deserialize)]
@@ -70,6 +73,10 @@ impl Default for MonitoringConfig {
7073
}
7174
}
7275

76+
fn default_completed_transaction_ttl_seconds() -> u64 {
77+
86400 // 1 day in seconds
78+
}
79+
7380
#[derive(Debug, Clone, Deserialize)]
7481
pub struct RedisConfig {
7582
pub url: String,

server/src/execution_router/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,9 @@ impl ExecutionRouter {
453453
self.namespace.clone(),
454454
eoa_execution_options.from,
455455
base_execution_options.chain_id,
456+
self.eoa_executor_queue
457+
.handler
458+
.completed_transaction_ttl_seconds,
456459
);
457460

458461
// Add transaction to the store

0 commit comments

Comments
 (0)