diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..2960634 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,87 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Development Commands + +### Build and Test +- `cargo build --release` - Build production binary +- `cargo test` - Run all tests including Redis integration tests +- `cargo nextest run -p twmq --profile ci` - Run specific component tests +- `RUST_LOG=debug cargo run` - Run server with debug logging + +### Development Setup +Redis is required for development: +```bash +docker run -d --name redis -p 6379:6379 redis:7-alpine +``` + +Required environment variables: +```bash +export APP__THIRDWEB__SECRET="your_secret_key" +export APP__THIRDWEB__CLIENT_ID="your_client_id" +``` + +## Architecture Overview + +This is a **Rust workspace** with 7 crates providing blockchain transaction infrastructure: + +- **`server/`** - Main HTTP API server (Axum-based REST API with OpenAPI docs) +- **`core/`** - Core blockchain functionality (chain management, transactions, UserOps) +- **`aa-core/`** - Account Abstraction engine (ERC-4337 v0.6/v0.7 support) +- **`aa-types/`** - Account Abstraction type definitions +- **`executors/`** - Background job handlers (webhooks, transaction confirmation) +- **`twmq/`** - Thirdweb Message Queue (Redis-backed job queue with lease-based concurrency) +- **`thirdweb-core/`** - Thirdweb service integrations (Vault SDK, IAW) + +### Key Technologies +- **Axum** for HTTP server +- **Alloy** for Ethereum interactions +- **Redis** for job queue and state +- **Tokio** for async runtime +- **Vault SDK** for secure key management + +## Configuration System + +Hierarchical configuration priority: +1. Environment variables (`APP__` prefix) +2. Environment-specific YAML (`server_development.yaml`, `server_production.yaml`) +3. Base YAML (`server_base.yaml`) + +Configuration files located in `server/configuration/` + +## Transaction Types Supported + +- **EOA transactions** - Traditional wallet transactions +- **Account Abstraction** - ERC-4337 smart accounts with gas sponsorship +- **EIP-7702** - Delegated transaction execution + +## Key Development Areas + +### API Routes +Located in `server/src/http/routes/` - follows RESTful patterns with OpenAPI documentation + +### Background Jobs +Implemented in `executors/src/` - uses TWMQ for reliable job processing with Redis persistence + +### Blockchain Core +`core/src/` contains chain management, transaction building, and UserOperation support + +### Account Abstraction +`aa-core/src/` implements complete ERC-4337 flow including bundler integration + +## Error Handling + +Uses comprehensive error types with context. All errors are structured and logged with tracing spans. + +## Testing + +Integration tests require Redis. Tests cover job queue operations, transaction building, and API endpoints. + +## Production Features + +- Horizontal scaling via shared Redis backend +- Graceful shutdown with job completion guarantees +- Vault-backed private key management +- Comprehensive metrics and health checks +- Docker support with multi-stage builds \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 68ec643..426c29f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,12 +1290,28 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base16ct" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base36" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9c26bddc1271f7112e5ec797e8eeba6de2de211c1488e506b9500196dbf77c5" +dependencies = [ + "base-x", + "failure", +] + [[package]] name = "base64" version = "0.21.7" @@ -1893,6 +1909,42 @@ dependencies = [ "typenum", ] +[[package]] +name = "cuid" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7fe01ebbba358b9af4850d1a2b16d45f765137398e34134643790f19dc935a0" +dependencies = [ + "base36", + "cuid-util", + "cuid2", + "hostname", + "num", + "once_cell", + "rand 0.8.5", + "uuid", +] + +[[package]] +name = "cuid-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d59a706635108a7e8eaae7ec8e6154504fafa4a415ef38690d94fccea051757" + +[[package]] +name = "cuid2" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc4ec2422180444acb04e5dda013369c9860fe66e8f558aa8c3f265ad195d3" +dependencies = [ + "cuid-util", + "getrandom 0.2.16", + "num", + "rand 0.8.5", + "sha3", + "web-time", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -2180,6 +2232,7 @@ name = "engine-executors" version = "0.1.0" dependencies = [ "alloy", + "async-trait", "chrono", "engine-aa-core", "engine-aa-types", @@ -2236,6 +2289,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "failure" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86" +dependencies = [ + "backtrace", + "failure_derive", +] + +[[package]] +name = "failure_derive" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "synstructure 0.12.6", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2664,6 +2739,17 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if", + "libc", + "windows-link", +] + [[package]] name = "http" version = "0.2.12" @@ -3173,6 +3259,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.9.4" @@ -3377,6 +3475,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -3387,6 +3499,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -3402,6 +3523,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -3438,6 +3581,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" dependencies = [ + "proc-macro-crate", "proc-macro2", "quote", "syn 2.0.104", @@ -4100,6 +4244,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.9.0+2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", +] + [[package]] name = "redis" version = "0.31.0" @@ -5007,6 +5182,18 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "unicode-xid", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -5088,13 +5275,18 @@ dependencies = [ "aide", "alloy", "anyhow", + "async-trait", "axum", + "chrono", "config", + "cuid", "engine-aa-core", + "engine-aa-types", "engine-core", "engine-executors", "futures", "rand 0.9.1", + "rdkafka", "schemars 0.8.22", "serde", "serde-bool", @@ -6333,7 +6525,7 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.104", - "synstructure", + "synstructure 0.13.2", ] [[package]] @@ -6374,7 +6566,7 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.104", - "synstructure", + "synstructure 0.13.2", ] [[package]] diff --git a/executors/Cargo.toml b/executors/Cargo.toml index 2148d9e..86bd26f 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -22,3 +22,4 @@ uuid = { version = "1.17.0", features = ["v4"] } chrono = "0.4.41" tokio = { version = "1.45.0", features = ["full"] } futures = "0.3.31" +async-trait = "0.1.83" diff --git a/executors/src/external_bundler/confirm.rs b/executors/src/external_bundler/confirm.rs index 1f6e3b6..bdbc888 100644 --- a/executors/src/external_bundler/confirm.rs +++ b/executors/src/external_bundler/confirm.rs @@ -15,6 +15,7 @@ use twmq::{ }; use crate::{ + kafka_integration::{SharedEventSender, TransactionConfirmedEvent}, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -22,6 +23,8 @@ use crate::{ }, }; +use std::time::{SystemTime, UNIX_EPOCH}; + use super::deployment::RedisDeploymentLock; // --- Job Payload --- @@ -103,6 +106,7 @@ where pub transaction_registry: Arc, pub max_confirmation_attempts: u32, pub confirmation_retry_delay: Duration, + pub event_sender: SharedEventSender, } impl UserOpConfirmationHandler @@ -114,6 +118,7 @@ where deployment_lock: RedisDeploymentLock, webhook_queue: Arc>, transaction_registry: Arc, + event_sender: SharedEventSender, ) -> Self { Self { chain_service, @@ -122,6 +127,7 @@ where transaction_registry, max_confirmation_attempts: 20, // ~5 minutes with 15 second delays confirmation_retry_delay: Duration::from_secs(5), + event_sender, } } @@ -246,6 +252,23 @@ where ); } + // Send Kafka transaction confirmed event + let confirmed_event = TransactionConfirmedEvent { + transaction_id: job.job.data.transaction_id.clone(), + user_op_hash: success_data.result.user_op_hash.clone(), + receipt: success_data.result.receipt.clone(), + deployment_lock_released: success_data.result.deployment_lock_released, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + // TODO: Extract team_id and project_id from job data or RPC credentials + team_id: "team_placeholder".to_string(), + project_id: "prj_placeholder".to_string(), + }; + + self.event_sender.send_transaction_confirmed(confirmed_event).await; + // Queue success webhook if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( diff --git a/executors/src/external_bundler/send.rs b/executors/src/external_bundler/send.rs index d4da1b3..ffb6747 100644 --- a/executors/src/external_bundler/send.rs +++ b/executors/src/external_bundler/send.rs @@ -27,6 +27,7 @@ use twmq::{ }; use crate::{ + kafka_integration::{SharedEventSender, TransactionSentEvent}, transaction_registry::TransactionRegistry, webhook::{ WebhookJobHandler, @@ -34,6 +35,8 @@ use crate::{ }, }; +use std::time::{SystemTime, UNIX_EPOCH}; + use super::{ confirm::{UserOpConfirmationHandler, UserOpConfirmationJobData}, deployment::{RedisDeploymentCache, RedisDeploymentLock}, @@ -199,6 +202,7 @@ where pub webhook_queue: Arc>, pub confirm_queue: Arc>>, pub transaction_registry: Arc, + pub event_sender: SharedEventSender, } impl ExternalBundlerSendHandler @@ -533,6 +537,25 @@ where ); } + // Send Kafka transaction sent event + let sent_event = TransactionSentEvent { + transaction_id: job.job.data.transaction_id.clone(), + chain_id: job.job.data.chain_id, + account_address: success_data.result.account_address, + user_op_hash: success_data.result.user_op_hash.clone(), + nonce: success_data.result.nonce, + deployment_lock_acquired: success_data.result.deployment_lock_acquired, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + // TODO: Extract team_id and project_id from job data or RPC credentials + team_id: "team_placeholder".to_string(), + project_id: "prj_placeholder".to_string(), + }; + + self.event_sender.send_transaction_sent(sent_event).await; + if let Err(e) = self.queue_success_webhook(job, success_data, tx) { tracing::error!( transaction_id = %job.job.data.transaction_id, diff --git a/executors/src/kafka_integration.rs b/executors/src/kafka_integration.rs new file mode 100644 index 0000000..42c964d --- /dev/null +++ b/executors/src/kafka_integration.rs @@ -0,0 +1,53 @@ +use alloy::primitives::{Address, Bytes, U256}; +use engine_core::rpc_clients::UserOperationReceipt; +use serde::Serialize; +use std::sync::Arc; + +/// Trait for sending transaction events to external messaging systems +#[async_trait::async_trait] +pub trait TransactionEventSender: Send + Sync { + async fn send_transaction_sent(&self, message: TransactionSentEvent); + async fn send_transaction_confirmed(&self, message: TransactionConfirmedEvent); +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionSentEvent { + pub transaction_id: String, + pub chain_id: u64, + pub account_address: Address, + pub user_op_hash: Bytes, + pub nonce: U256, + pub deployment_lock_acquired: bool, + pub timestamp: u64, + pub team_id: String, + pub project_id: String, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionConfirmedEvent { + pub transaction_id: String, + pub user_op_hash: Bytes, + pub receipt: UserOperationReceipt, + pub deployment_lock_released: bool, + pub timestamp: u64, + pub team_id: String, + pub project_id: String, +} + +/// No-op implementation for when messaging is disabled +pub struct NoOpEventSender; + +#[async_trait::async_trait] +impl TransactionEventSender for NoOpEventSender { + async fn send_transaction_sent(&self, _message: TransactionSentEvent) { + // Do nothing + } + + async fn send_transaction_confirmed(&self, _message: TransactionConfirmedEvent) { + // Do nothing + } +} + +pub type SharedEventSender = Arc; \ No newline at end of file diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 842ee7d..4eb801d 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -1,5 +1,6 @@ pub mod eip7702_executor; pub mod eoa; pub mod external_bundler; +pub mod kafka_integration; pub mod transaction_registry; pub mod webhook; diff --git a/server/Cargo.toml b/server/Cargo.toml index 93c0ab0..cbf935a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,6 +13,7 @@ vault-sdk = { workspace = true } vault-types = { workspace = true } engine-core = { path = "../core" } engine-aa-core = { path = "../aa-core" } +engine-aa-types = { path = "../aa-types" } engine-executors = { path = "../executors" } twmq = { path = "../twmq" } thirdweb-core = { path = "../thirdweb-core" } @@ -40,3 +41,7 @@ utoipa = { version = "5.4.0", features = [ ] } utoipa-axum = "0.2.0" utoipa-scalar = { version = "0.3.0", features = ["axum"] } +rdkafka = { version = "0.36", features = ["ssl"] } +async-trait = "0.1.83" +cuid = "1.3.2" +chrono = "0.4" diff --git a/server/src/config.rs b/server/src/config.rs index d248764..042eb6c 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -9,6 +9,7 @@ pub struct EngineConfig { pub thirdweb: ThirdwebConfig, pub queue: QueueConfig, pub redis: RedisConfig, + pub kafka: Option, } #[derive(Debug, Clone, Deserialize)] @@ -31,6 +32,41 @@ pub struct RedisConfig { pub url: String, } +#[derive(Debug, Clone, Deserialize)] +pub struct KafkaConfig { + pub url: String, + pub username: String, + pub password: String, + + #[serde(default = "default_batch_size")] + pub batch_size: u32, + + #[serde(default = "default_buffer_memory_kb")] + pub buffer_memory_kb: u32, + + #[serde(default = "default_request_timeout_ms")] + pub request_timeout_ms: u32, + + #[serde(default = "default_max_retries")] + pub max_retries: u32, +} + +fn default_batch_size() -> u32 { + 1000 +} + +fn default_buffer_memory_kb() -> u32 { + 32768 // 32MB +} + +fn default_request_timeout_ms() -> u32 { + 5000 +} + +fn default_max_retries() -> u32 { + 3 +} + #[derive(Debug, Clone, Deserialize)] #[serde(default)] pub struct ServerConfig { diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index 5209c99..c57462e 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -35,6 +35,7 @@ use vault_types::{ }; use crate::chains::ThirdwebChainService; +use crate::kafka::SharedKafkaProducer; pub struct ExecutionRouter { pub webhook_queue: Arc>, @@ -47,6 +48,7 @@ pub struct ExecutionRouter { pub transaction_registry: Arc, pub vault_client: Arc, pub chains: Arc, + pub kafka_producer: SharedKafkaProducer, } impl ExecutionRouter { diff --git a/server/src/kafka/mod.rs b/server/src/kafka/mod.rs new file mode 100644 index 0000000..e80aebf --- /dev/null +++ b/server/src/kafka/mod.rs @@ -0,0 +1,435 @@ +use std::sync::Arc; +use std::time::Duration; + +use alloy::primitives::{Address, Bytes, U256}; +use engine_aa_types::VersionedUserOp; +use engine_core::rpc_clients::UserOperationReceipt; +use rdkafka::config::ClientConfig; +use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; +use rdkafka::util::Timeout; +use serde::{Deserialize, Serialize}; + +use crate::config::{Environment, KafkaConfig}; + +/// Kafka event envelope structure that wraps all transaction data +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KafkaEventEnvelope { + pub id: String, + pub team_id: String, + pub project_id: String, + pub created_at: String, + pub data: T, +} + +impl KafkaEventEnvelope { + pub fn new(team_id: String, project_id: String, data: T) -> Self { + Self { + id: format!("evt_{}", cuid::cuid1().expect("Failed to generate CUID")), + team_id, + project_id, + created_at: chrono::Utc::now().to_rfc3339(), + data, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum TransactionTopic { + #[serde(rename = "engine.transaction.sent")] + Sent, + #[serde(rename = "engine.transaction.confirmed")] + Confirmed, +} + +impl TransactionTopic { + pub fn as_str(&self) -> &'static str { + match self { + TransactionTopic::Sent => "engine.transaction.sent", + TransactionTopic::Confirmed => "engine.transaction.confirmed", + } + } +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionSentMessage { + pub transaction_id: String, + pub chain_id: u64, + pub account_address: Address, + pub user_op_hash: Bytes, + pub nonce: U256, + pub deployment_lock_acquired: bool, + pub timestamp: u64, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionConfirmedMessage { + pub transaction_id: String, + pub user_op_hash: Bytes, + pub receipt: UserOperationReceipt, + pub deployment_lock_released: bool, + pub timestamp: u64, +} + +/// Unified transaction webhook event structure with explicit null fields +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionWebhookEvent { + // Core transaction data (always present) + pub transaction_id: String, + pub chain_id: u64, + pub account_address: Address, + pub user_op_hash: Bytes, + pub nonce: U256, + pub timestamp: u64, + + // Send-specific fields (null for confirm events) + pub user_operation_sent: Option, + pub deployment_lock_acquired: Option, + + // Confirm-specific fields (null for send events) + pub receipt: Option, + pub deployment_lock_released: Option, + + // Transaction status info (null for send events) + pub success: Option, + pub actual_gas_cost: Option, + pub actual_gas_used: Option, +} + +pub struct KafkaProducer { + producer: Option, + config: KafkaConfig, +} + +impl KafkaProducer { + pub fn new(kafka_config: KafkaConfig, environment: &Environment) -> Result { + let group_id = format!("engine-core-{}", environment.as_str()); + + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &kafka_config.url) + .set("security.protocol", "SASL_SSL") + .set("sasl.mechanism", "PLAIN") + .set("sasl.username", &kafka_config.username) + .set("sasl.password", &kafka_config.password) + .set("group.id", &group_id) + // Hardcoded settings as specified + .set("acks", "1") + .set("compression.type", "lz4") + .set("enable.idempotence", "true") + .set("linger.ms", "10") + // Configurable settings + .set("batch.size", kafka_config.batch_size.to_string()) + .set("buffer.memory", (kafka_config.buffer_memory_kb * 1024).to_string()) + .set("request.timeout.ms", kafka_config.request_timeout_ms.to_string()) + .set("retries", kafka_config.max_retries.to_string()); + + let producer = client_config + .create() + .map_err(|e| format!("Failed to create Kafka producer: {}", e))?; + + Ok(KafkaProducer { + producer: Some(producer), + config: kafka_config, + }) + } + + pub fn disabled() -> Self { + KafkaProducer { + producer: None, + config: KafkaConfig { + url: String::new(), + username: String::new(), + password: String::new(), + batch_size: 1000, + buffer_memory_kb: 32768, + request_timeout_ms: 5000, + max_retries: 3, + }, + } + } + + pub fn is_enabled(&self) -> bool { + self.producer.is_some() + } + + pub async fn send_transaction_sent(&self, message: TransactionSentMessage, team_id: String, project_id: String) { + if !self.is_enabled() { + tracing::debug!("Kafka disabled, skipping transaction sent message"); + return; + } + + let envelope = KafkaEventEnvelope::new(team_id, project_id, message); + self.send_message(TransactionTopic::Sent, &envelope.data.transaction_id, &envelope) + .await; + } + + pub async fn send_transaction_confirmed(&self, message: TransactionConfirmedMessage, team_id: String, project_id: String) { + if !self.is_enabled() { + tracing::debug!("Kafka disabled, skipping transaction confirmed message"); + return; + } + + let envelope = KafkaEventEnvelope::new(team_id, project_id, message); + self.send_message(TransactionTopic::Confirmed, &envelope.data.transaction_id, &envelope) + .await; + } + + /// Helper function to send webhook event with unified structure + pub async fn send_webhook_event(&self, topic: &str, event: TransactionWebhookEvent, team_id: String, project_id: String) { + if !self.is_enabled() { + tracing::debug!("Kafka disabled, skipping webhook event"); + return; + } + + let envelope = KafkaEventEnvelope::new(team_id, project_id, event); + let payload_json = match serde_json::to_string(&envelope) { + Ok(json) => json, + Err(e) => { + tracing::error!( + topic = %topic, + transaction_id = %envelope.data.transaction_id, + error = %e, + "Failed to serialize webhook event payload" + ); + return; + } + }; + + let Some(ref producer) = self.producer else { + tracing::debug!("Kafka producer not available, skipping webhook event"); + return; + }; + + let record = FutureRecord::to(topic) + .key(&envelope.data.transaction_id) + .payload(&payload_json); + + let timeout = Timeout::After(Duration::from_millis(self.config.request_timeout_ms as u64)); + + match producer.send(record, timeout).await { + Ok((partition, offset)) => { + tracing::debug!( + topic = %topic, + transaction_id = %envelope.data.transaction_id, + partition = %partition, + offset = %offset, + "Successfully sent webhook event to Kafka" + ); + } + Err((e, _original_record)) => { + tracing::error!( + topic = %topic, + transaction_id = %envelope.data.transaction_id, + error = %e, + "Failed to send webhook event to Kafka" + ); + } + } + } + + async fn send_message(&self, topic: TransactionTopic, key: &str, payload: &T) { + let Some(ref producer) = self.producer else { + tracing::debug!("Kafka producer not available, skipping message send"); + return; + }; + + let payload_json = match serde_json::to_string(payload) { + Ok(json) => json, + Err(e) => { + tracing::error!( + topic = %topic.as_str(), + key = %key, + error = %e, + "Failed to serialize Kafka message payload" + ); + return; + } + }; + + let record = FutureRecord::to(topic.as_str()) + .key(key) + .payload(&payload_json); + + let timeout = Timeout::After(Duration::from_millis(self.config.request_timeout_ms as u64)); + + match producer.send(record, timeout).await { + Ok((partition, offset)) => { + tracing::debug!( + topic = %topic.as_str(), + key = %key, + partition = %partition, + offset = %offset, + "Successfully sent message to Kafka" + ); + } + Err((e, _original_record)) => { + tracing::error!( + topic = %topic.as_str(), + key = %key, + error = %e, + "Failed to send message to Kafka" + ); + } + } + } + + pub async fn flush(&self) -> Result<(), String> { + let Some(ref producer) = self.producer else { + return Ok(()); + }; + + let timeout = Duration::from_millis(self.config.request_timeout_ms as u64); + producer + .flush(timeout) + .map_err(|e| format!("Failed to flush Kafka producer: {}", e)) + } +} + +impl Drop for KafkaProducer { + fn drop(&mut self) { + if self.is_enabled() { + tracing::debug!("Dropping Kafka producer"); + } + } +} + +pub type SharedKafkaProducer = Arc; + +// Helper functions for creating webhook events +impl TransactionWebhookEvent { + /// Create a transaction sent event + pub fn transaction_sent( + transaction_id: String, + chain_id: u64, + account_address: Address, + user_op_hash: Bytes, + nonce: U256, + user_operation_sent: VersionedUserOp, + deployment_lock_acquired: bool, + timestamp: u64, + ) -> Self { + Self { + transaction_id, + chain_id, + account_address, + user_op_hash, + nonce, + timestamp, + user_operation_sent: Some(user_operation_sent), + deployment_lock_acquired: Some(deployment_lock_acquired), + receipt: None, + deployment_lock_released: None, + success: None, + actual_gas_cost: None, + actual_gas_used: None, + } + } + + /// Create a transaction confirmed event + pub fn transaction_confirmed( + transaction_id: String, + chain_id: u64, + account_address: Address, + user_op_hash: Bytes, + nonce: U256, + receipt: UserOperationReceipt, + deployment_lock_released: bool, + timestamp: u64, + ) -> Self { + let success = Some(receipt.success); + let actual_gas_cost = Some(receipt.actual_gas_cost); + let actual_gas_used = Some(receipt.actual_gas_used); + + Self { + transaction_id, + chain_id, + account_address, + user_op_hash, + nonce, + timestamp, + user_operation_sent: None, + deployment_lock_acquired: None, + receipt: Some(receipt), + deployment_lock_released: Some(deployment_lock_released), + success, + actual_gas_cost, + actual_gas_used, + } + } +} + +/// Standalone helper function to send webhook events +pub async fn send_webhook_event( + kafka_producer: &KafkaProducer, + topic: &str, + event: TransactionWebhookEvent, + team_id: String, + project_id: String, +) { + kafka_producer.send_webhook_event(topic, event, team_id, project_id).await; +} + +// Adapter to implement TransactionEventSender trait for KafkaProducer +use engine_executors::kafka_integration::{ + TransactionEventSender, TransactionSentEvent, TransactionConfirmedEvent, + SharedEventSender, NoOpEventSender +}; + +#[async_trait::async_trait] +impl TransactionEventSender for KafkaProducer { + async fn send_transaction_sent(&self, message: TransactionSentEvent) { + let kafka_message = TransactionSentMessage { + transaction_id: message.transaction_id, + chain_id: message.chain_id, + account_address: message.account_address, + user_op_hash: message.user_op_hash, + nonce: message.nonce, + deployment_lock_acquired: message.deployment_lock_acquired, + timestamp: message.timestamp, + }; + self.send_transaction_sent(kafka_message, message.team_id, message.project_id).await; + } + + async fn send_transaction_confirmed(&self, message: TransactionConfirmedEvent) { + let kafka_message = TransactionConfirmedMessage { + transaction_id: message.transaction_id, + user_op_hash: message.user_op_hash, + receipt: message.receipt, + deployment_lock_released: message.deployment_lock_released, + timestamp: message.timestamp, + }; + self.send_transaction_confirmed(kafka_message, message.team_id, message.project_id).await; + } +} + +// Helper function to create KafkaProducer from config +pub fn create_kafka_producer( + kafka_config: Option, + environment: &Environment, +) -> Result { + match kafka_config { + Some(config) => { + tracing::info!("Initializing Kafka producer with WarpStream"); + let producer = KafkaProducer::new(config, environment)?; + Ok(Arc::new(producer)) + } + None => { + tracing::info!("Kafka configuration not provided, creating disabled producer"); + Ok(Arc::new(KafkaProducer::disabled())) + } + } +} + +// Helper function to create event sender from Kafka producer +pub fn create_event_sender(kafka_producer: &SharedKafkaProducer) -> SharedEventSender { + if kafka_producer.is_enabled() { + kafka_producer.clone() as SharedEventSender + } else { + Arc::new(NoOpEventSender) as SharedEventSender + } +} \ No newline at end of file diff --git a/server/src/lib.rs b/server/src/lib.rs index a629adb..b7f2c35 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2,4 +2,5 @@ pub mod chains; pub mod config; pub mod execution_router; pub mod http; +pub mod kafka; pub mod queue; diff --git a/server/src/main.rs b/server/src/main.rs index 995317b..8528900 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,6 +7,7 @@ use thirdweb_engine::{ config, execution_router::ExecutionRouter, http::server::{EngineServer, EngineServerState}, + kafka::create_kafka_producer, queue::manager::QueueManager, }; use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; @@ -15,6 +16,12 @@ use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::Subscrib async fn main() -> anyhow::Result<()> { let config = config::get_config(); + // Detect environment for Kafka group ID + let environment: config::Environment = std::env::var("APP_ENVIRONMENT") + .unwrap_or_else(|_| "local".into()) + .try_into() + .expect("Failed to parse APP_ENVIRONMENT"); + let subscriber = tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| { // Default to info level if RUST_LOG environment variable is not set @@ -52,12 +59,17 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client)); + let kafka_producer = create_kafka_producer(config.kafka.clone(), &environment) + .map_err(|e| anyhow::anyhow!("Failed to create Kafka producer: {}", e))?; + tracing::info!("Kafka producer initialized (enabled: {})", kafka_producer.is_enabled()); + let queue_manager = QueueManager::new( &config.redis, &config.queue, chains.clone(), signer.clone(), eoa_signer.clone(), + kafka_producer.clone(), ) .await?; @@ -84,6 +96,7 @@ async fn main() -> anyhow::Result<()> { transaction_registry: queue_manager.transaction_registry.clone(), vault_client: Arc::new(vault_client.clone()), chains: chains.clone(), + kafka_producer: kafka_producer.clone(), }; let mut server = EngineServer::new(EngineServerState { @@ -123,5 +136,12 @@ async fn main() -> anyhow::Result<()> { tracing::info!("All workers shut down successfully"); } + // Flush and shutdown Kafka producer + if let Err(e) = kafka_producer.flush().await { + tracing::error!("Error flushing Kafka producer during shutdown: {}", e); + } else { + tracing::info!("Kafka producer flushed successfully"); + } + Ok(()) } diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 04e216e..a76a7c9 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -19,6 +19,7 @@ use twmq::{Queue, queue::QueueOptions, shutdown::ShutdownHandle}; use crate::{ chains::ThirdwebChainService, config::{QueueConfig, RedisConfig}, + kafka::{SharedKafkaProducer, create_event_sender}, }; pub struct QueueManager { @@ -53,6 +54,7 @@ impl QueueManager { chain_service: Arc, userop_signer: Arc, eoa_signer: Arc, + kafka_producer: SharedKafkaProducer, ) -> Result { // Create Redis clients let redis_client = twmq::redis::Client::open(redis_config.url.as_str())?; @@ -147,12 +149,16 @@ impl QueueManager { .await? .arc(); + // Create event sender from Kafka producer + let event_sender = create_event_sender(&kafka_producer); + // Create confirmation queues first (needed by send queues) let confirm_handler = UserOpConfirmationHandler::new( chain_service.clone(), deployment_lock.clone(), webhook_queue.clone(), transaction_registry.clone(), + event_sender.clone(), ); let userop_confirm_queue = Queue::builder() @@ -189,6 +195,7 @@ impl QueueManager { webhook_queue: webhook_queue.clone(), confirm_queue: userop_confirm_queue.clone(), transaction_registry: transaction_registry.clone(), + event_sender: event_sender.clone(), }; let external_bundler_send_queue = Queue::builder()