diff --git a/Cargo.toml b/Cargo.toml index 1c0f7e3..3d4d4c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,13 @@ members = [ "collectors/wlf-binlog-collector", # transformers "transformers/wlf-binlog-filter", + "transformers/wlf-event-replicator", # dispatchers "dispatchers/wlf-kafka-dispatcher", "dispatchers/wlf-redis-dispatcher", # others - "demos", "utils", + "wlf-aio", ] [patch.crates-io] diff --git a/collectors/wlf-binlog-collector/Cargo.toml b/collectors/wlf-binlog-collector/Cargo.toml index 395f193..ce43260 100644 --- a/collectors/wlf-binlog-collector/Cargo.toml +++ b/collectors/wlf-binlog-collector/Cargo.toml @@ -22,4 +22,5 @@ futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3.28" } chrono = "0.4.26" sqlparser = { version = "0.35.0", features = ["visitor"] } -serde_json = "1.0.99" +serde = { version = "1.0", features = ["derive"] } +async-trait = "0.1.68" diff --git a/collectors/wlf-binlog-collector/src/lib.rs b/collectors/wlf-binlog-collector/src/lib.rs index e1393d8..c37304b 100644 --- a/collectors/wlf-binlog-collector/src/lib.rs +++ b/collectors/wlf-binlog-collector/src/lib.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_trait::async_trait; use chrono::{LocalResult, TimeZone, Utc}; use futures_util::{pin_mut, StreamExt}; use mysql_cdc::{ @@ -7,12 +8,12 @@ use mysql_cdc::{ events::{binlog_event::BinlogEvent, event_header::EventHeader}, }; -use serde_json::json; +use serde::Deserialize; use sql_analyzer::SqlAnalyzer; use tracing::{error, info}; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, - ComponentApi, ComponentKind, Event, EventMeta, + value, ComponentApi, ComponentKind, Event, EventMeta, }; mod error; @@ -23,12 +24,27 @@ pub use mysql_cdc::binlog_options::BinlogOptions; pub use mysql_cdc::replica_options::ReplicaOptions; pub use mysql_cdc::ssl_mode::SslMode; +#[derive(Deserialize, Debug)] pub struct BinlogCollector { - id: String, - destination: String, - replica_options: ReplicaOptions, + pub id: String, + pub destination: String, + #[serde(default = "default_host")] + pub host: String, + pub user: String, + pub password: String, + #[serde(default = "default_port")] + pub port: u16, } +pub fn default_host() -> String { + "localhost".to_string() +} + +pub const fn default_port() -> u16 { + 3306 +} + +#[async_trait] impl ComponentApi for BinlogCollector { fn id(&self) -> &str { self.id.as_str() @@ -36,24 +52,17 @@ impl ComponentApi for BinlogCollector { fn kind(&self) -> ComponentKind { ComponentKind::Collector } -} - -impl BinlogCollector { - pub fn new( - id: impl Into, - destination: impl Into, - replica_options: ReplicaOptions, - ) -> Self { - Self { - id: id.into(), - destination: destination.into(), - replica_options, - } - } - pub async fn start_collecting(self, router: Arc) -> Result<(), Error> { + async fn run(&self, router: Arc) -> Result<(), Box> { // create the binlog client - let mut client = BinlogClient::new(self.replica_options); + let mut client = BinlogClient::new(ReplicaOptions { + username: self.user.clone(), + password: self.password.clone(), + ssl_mode: SslMode::Disabled, + binlog: BinlogOptions::from_end(), + ..Default::default() + }); + let events_stream = client.replicate().await?; pin_mut!(events_stream); @@ -85,7 +94,7 @@ fn into_wlf_event( let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else { return Err(Error::Other("failed to convert timestamp".to_string())); }; - let meta = json!({ + let meta = value!({ "database": e.database_name, "timestamp": timestamp, "server_id": event_header.server_id, @@ -93,7 +102,7 @@ fn into_wlf_event( }); let properties = sql_parser.analyze(&e.sql_statement)?; - let value = json!({"meta": meta, "sql": properties}); + let value = value!({"meta": meta, "sql": properties}); Ok(Event { value, @@ -108,28 +117,24 @@ fn into_wlf_event( mod tests { use std::sync::Arc; - use mysql_cdc::{ - binlog_options::BinlogOptions, replica_options::ReplicaOptions, ssl_mode::SslMode, - }; use utils::test_utils::DummyComponent; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, - ComponentKind, + ComponentApi, ComponentKind, }; - use crate::BinlogCollector; + use crate::{default_host, default_port, BinlogCollector}; #[tokio::test] async fn collect() { - let options = ReplicaOptions { - username: String::from("root"), - password: String::from("password"), - blocking: true, - ssl_mode: SslMode::Disabled, - binlog: BinlogOptions::from_start(), - ..Default::default() + let collector = BinlogCollector { + id: "binlog_collector".to_string(), + user: "root".to_string(), + destination: "dispatcher".to_string(), + host: default_host(), + password: "password".to_string(), + port: default_port(), }; - let collector = BinlogCollector::new("binlog_collector", "test", options); let dummy_dispatcher = DummyComponent::new("dispatcher", ComponentKind::Dispatcher); @@ -138,7 +143,13 @@ mod tests { router.register_component(&dummy_dispatcher); let router = Arc::new(router); - tokio::spawn(collector.start_collecting(Arc::clone(&router))); + let router_c = Arc::clone(&router); + tokio::spawn(async move { + collector + .run(Arc::clone(&router_c)) + .await + .expect("failed to run collector"); + }); while let Ok(event) = router.poll_event("dispatcher").await { println!("{event:#?}"); diff --git a/collectors/wlf-binlog-collector/src/sql_analyzer.rs b/collectors/wlf-binlog-collector/src/sql_analyzer.rs index 7c5dc62..7bd3d08 100644 --- a/collectors/wlf-binlog-collector/src/sql_analyzer.rs +++ b/collectors/wlf-binlog-collector/src/sql_analyzer.rs @@ -1,10 +1,10 @@ -use serde_json::{Map, Value}; use sqlparser::{ ast::Statement, dialect::MySqlDialect, parser::{Parser, ParserError}, }; use thiserror::Error; +use wlf_core::{value, Value}; #[derive(Debug, Error)] pub enum Error { @@ -30,20 +30,22 @@ impl SqlAnalyzer { let st = ast.remove(0); // extract info - let mut properties: Map = Map::new(); - properties.insert("statement".to_string(), sql.into()); - match st { + let properties = match st { Statement::Insert { table_name, .. } => { - properties.insert("type".to_string(), "insert".into()); - properties.insert("table".to_string(), table_name.to_string().into()); + value!({ + "type": "insert", + "table": table_name.to_string() + }) } Statement::CreateTable { name, .. } => { - properties.insert("type".to_string(), "table-create".into()); - properties.insert("table".to_string(), name.to_string().into()); + value!({ + "type": "table-create", + "table": name.to_string() + }) } - _ => {} - } + _ => value!({}), + }; - Ok(properties.into()) + Ok(properties) } } diff --git a/demos/src/bin/mysql_table_kafka.rs b/demos/src/bin/mysql_table_kafka.rs deleted file mode 100644 index 31ce9fe..0000000 --- a/demos/src/bin/mysql_table_kafka.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::sync::Arc; - -use wlf_binlog_collector::{BinlogCollector, BinlogOptions, ReplicaOptions, SslMode}; -use wlf_binlog_filter::{BinlogFilter, BinlogFilterRules}; -use wlf_core::event_router::{EventRouter, EventRouterApi}; -use wlf_kafka_dispatcher::KafkaDispatcher; - -#[tokio::main(flavor = "current_thread")] -async fn main() { - tracing_subscriber::fmt::init(); - // create an event router - let mut router = EventRouter::new(); - - // create a collector, a transformer, and a dispatcher first - let options = ReplicaOptions { - username: String::from("root"), - password: String::from("password"), - blocking: true, - ssl_mode: SslMode::Disabled, - binlog: BinlogOptions::from_end(), - ..Default::default() - }; - let collector = BinlogCollector::new("binlog_collector", "binlog_parser", options); - - let rules = BinlogFilterRules::new() - .exclude("d1", None) - .include("d1", Some("t1")); - let transformer = BinlogFilter::new("binlog_parser", "kafka_dispatcher", rules); - - let mut dispatcher = - KafkaDispatcher::new("kafka_dispatcher", vec!["127.0.0.1:9092".to_string()]); - dispatcher.set_topic_template(r"logFlex.%{/meta/database}.%{/sql/table}"); - - // register them in the router - router.register_component(&collector); - router.register_component(&transformer); - router.register_component(&dispatcher); - - // start all the components - let router = Arc::new(router); - let router_arc = Arc::clone(&router); - tokio::task::spawn(async move { - collector - .start_collecting(router_arc) - .await - .expect("collector exit unexpectedly"); - }); - let router_arc = Arc::clone(&router); - tokio::spawn(async move { - transformer - .start_filtering(router_arc) - .await - .expect("filter exit unexpectedly"); - }); - let router_arc = Arc::clone(&router); - let handle = tokio::spawn(async move { - dispatcher - .start_dispatching(router_arc) - .await - .expect("kafka dispatcher exit unexpectedly"); - }); - - // should output mysql table event is forwarded to ... - - handle.await.expect("failed"); -} diff --git a/demos/src/lib.rs b/demos/src/lib.rs deleted file mode 100644 index 2a951f9..0000000 --- a/demos/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -//! See `bin` for all the demo binaries diff --git a/dispatchers/wlf-kafka-dispatcher/Cargo.toml b/dispatchers/wlf-kafka-dispatcher/Cargo.toml index bf81344..033b3c2 100644 --- a/dispatchers/wlf-kafka-dispatcher/Cargo.toml +++ b/dispatchers/wlf-kafka-dispatcher/Cargo.toml @@ -14,3 +14,5 @@ thiserror = "1.0.40" serde_json = "1.0.99" chrono = "0.4.26" utils = { path = "../../utils" } +serde = { version = "1.0", features = ["derive"] } +async-trait = "0.1.68" diff --git a/dispatchers/wlf-kafka-dispatcher/src/lib.rs b/dispatchers/wlf-kafka-dispatcher/src/lib.rs index 1887d28..65409a5 100644 --- a/dispatchers/wlf-kafka-dispatcher/src/lib.rs +++ b/dispatchers/wlf-kafka-dispatcher/src/lib.rs @@ -1,5 +1,6 @@ use std::{collections::BTreeMap, sync::Arc}; +use async_trait::async_trait; use chrono::Utc; use rskafka::{ client::{ @@ -8,8 +9,9 @@ use rskafka::{ }, record::Record, }; +use serde::Deserialize; use thiserror::Error; -use tracing::{error, info}; +use tracing::{error, info, warn}; use utils::substitute_with_event; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, @@ -24,16 +26,17 @@ pub enum Error { KafkaClient(#[from] rskafka::client::error::Error), #[error("serialize/deserialize error, {0}")] Serde(#[from] serde_json::Error), - #[error("failed to generate topic name, {0}")] - TopicName(String), } +#[derive(Deserialize, Debug)] pub struct KafkaDispatcher { id: String, + #[serde(default = "default_topic")] topic: String, bootstrap_brokers: Vec, } +#[async_trait] impl ComponentApi for KafkaDispatcher { fn id(&self) -> &str { self.id.as_str() @@ -42,23 +45,8 @@ impl ComponentApi for KafkaDispatcher { fn kind(&self) -> ComponentKind { ComponentKind::Dispatcher } -} - -impl KafkaDispatcher { - pub fn new(id: impl Into, bootstrap_brokers: Vec) -> Self { - Self { - id: id.into(), - topic: default_topic().to_string(), - bootstrap_brokers, - } - } - pub fn set_topic_template(&mut self, topic: impl Into) -> &mut Self { - self.topic = topic.into(); - self - } - - pub async fn start_dispatching(self, router: Arc) -> Result<(), Error> { + async fn run(&self, router: Arc) -> Result<(), Box> { let client = ClientBuilder::new(self.bootstrap_brokers.clone()) .build() .await?; @@ -67,8 +55,10 @@ impl KafkaDispatcher { while let Ok(event) = router.poll_event(self.id()).await { info!("receive new event:\n{event:#?}"); // get the topic - let topic_name = - substitute_with_event(&self.topic, &event).map_err(Error::TopicName)?; + let Ok(topic_name) = substitute_with_event(&self.topic, &event) else { + warn!("can't generate topic_name for event"); + continue; + }; // create the topic in kafka if topic does not exist if !topics_cache.iter().any(|topic| topic.name == topic_name) { @@ -101,6 +91,6 @@ impl KafkaDispatcher { } } -const fn default_topic() -> &'static str { - "wasm-log-flex" +fn default_topic() -> String { + "wasm-log-flex".to_string() } diff --git a/dispatchers/wlf-redis-dispatcher/Cargo.toml b/dispatchers/wlf-redis-dispatcher/Cargo.toml index c156123..48997e9 100644 --- a/dispatchers/wlf-redis-dispatcher/Cargo.toml +++ b/dispatchers/wlf-redis-dispatcher/Cargo.toml @@ -13,6 +13,9 @@ thiserror = "1.0.40" utils = { path = "../../utils" } tracing = "0.1.37" serde_json = "1.0.99" +serde = { version = "1.0", features = ["derive"] } +async-trait = "0.1.68" + [dev-dependencies] tracing-subscriber = "0.3.17" diff --git a/dispatchers/wlf-redis-dispatcher/src/lib.rs b/dispatchers/wlf-redis-dispatcher/src/lib.rs index c76e913..3d30d78 100644 --- a/dispatchers/wlf-redis-dispatcher/src/lib.rs +++ b/dispatchers/wlf-redis-dispatcher/src/lib.rs @@ -1,11 +1,13 @@ use std::sync::Arc; +use async_trait::async_trait; use redis::{ AsyncCommands, ConnectionAddr, ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisError, }; +use serde::Deserialize; use thiserror::Error; -use tracing::info; +use tracing::{info, warn}; use utils::substitute_with_event; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, @@ -16,22 +18,28 @@ use wlf_core::{ pub enum Error { #[error("redis error, {0}")] Redis(#[from] RedisError), - #[error("failed to generate redis key, {0}")] - TopicName(String), #[error("serialize/deserialize error, {0}")] Serde(#[from] serde_json::Error), } +#[derive(Deserialize, Debug)] pub struct RedisDispatcher { id: String, + #[serde(default)] mode: Mode, + // TODO: use default here after https://github.com/serde-rs/serde/issues/1626 is fixed + #[serde(flatten)] config: Config, } +#[derive(Deserialize, Debug, Clone)] pub struct Config { + #[serde(default = "default_host")] host: String, + #[serde(default = "default_port")] port: u16, auth: Option, + #[serde(default = "default_database_number")] database_number: u8, } @@ -51,14 +59,28 @@ impl IntoConnectionInfo for Config { impl Default for Config { fn default() -> Self { Self { - host: "localhost".to_string(), - port: 6379, + host: default_host(), + port: default_port(), auth: None, - database_number: 0, + database_number: default_database_number(), } } } +pub fn default_host() -> String { + "localhost".to_string() +} + +pub fn default_port() -> u16 { + 6379 +} + +pub fn default_database_number() -> u8 { + 0 +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "type")] pub enum Mode { LPush { key: String }, RPush { key: String }, @@ -92,9 +114,20 @@ impl RedisDispatcher { self.config.auth = Some(password.into()); self } +} + +#[async_trait] +impl ComponentApi for RedisDispatcher { + fn id(&self) -> &str { + self.id.as_str() + } + + fn kind(&self) -> ComponentKind { + ComponentKind::Dispatcher + } - pub async fn start_dispatching(self, router: Arc) -> Result<(), Error> { - let mut redis_client = redis::Client::open(self.config)? + async fn run(&self, router: Arc) -> Result<(), Box> { + let mut redis_client = redis::Client::open(self.config.clone())? .get_async_connection() .await?; @@ -103,27 +136,42 @@ impl RedisDispatcher { match &self.mode { Mode::LPush { key } => { - let key = substitute_with_event(key, &event).map_err(Error::TopicName)?; + let Ok(key) = substitute_with_event(key, &event) else { + warn!("can't generate key for event"); + continue; + }; let value = serde_json::to_string(&event)?; - redis_client.lpush(key, value).await?; + redis_client.lpush(&key, value).await?; + info!("event is dispatched to list {key}"); } Mode::RPush { key } => { - let key = substitute_with_event(key, &event).map_err(Error::TopicName)?; + let Ok(key) = substitute_with_event(key, &event) else { + warn!("can't generate key for event"); + continue; + }; let value = serde_json::to_string(&event)?; - redis_client.rpush(key, value).await?; + redis_client.rpush(&key, value).await?; + info!("event is dispatched to list {key}"); } Mode::Pub { channel } => { - let channel = - substitute_with_event(channel, &event).map_err(Error::TopicName)?; + let Ok(channel) = substitute_with_event(channel, &event) else { + warn!("can't generate channel for event"); + continue; + }; let value = serde_json::to_string(&event)?; - redis_client.publish(channel, value).await?; + redis_client.publish(&channel, value).await?; + info!("event is dispatched to channel {channel}"); } Mode::XADD { key } => { - let key = substitute_with_event(key, &event).map_err(Error::TopicName)?; + let Ok(key) = substitute_with_event(key, &event) else { + warn!("can't generate key for event"); + continue; + }; let value = serde_json::to_string(&event)?; redis_client - .xadd(key, "*", &[("event".to_string(), value)]) + .xadd(&key, "*", &[("event".to_string(), value)]) .await?; + info!("event is dispatched to stream {key}"); } } } @@ -132,16 +180,6 @@ impl RedisDispatcher { } } -impl ComponentApi for RedisDispatcher { - fn id(&self) -> &str { - self.id.as_str() - } - - fn kind(&self) -> ComponentKind { - ComponentKind::Dispatcher - } -} - #[cfg(test)] mod tests { use std::{error::Error, time::Duration}; @@ -204,7 +242,7 @@ mod tests { } }); - dispatcher.start_dispatching(router).await?; + dispatcher.run(router).await?; Ok(()) } diff --git a/demos/env/mysql_table_kafka.docker-compose.yaml b/examples/binlog_to_kafka_redis.docker-compose.yaml similarity index 88% rename from demos/env/mysql_table_kafka.docker-compose.yaml rename to examples/binlog_to_kafka_redis.docker-compose.yaml index 35ab5a9..fc6a77a 100644 --- a/demos/env/mysql_table_kafka.docker-compose.yaml +++ b/examples/binlog_to_kafka_redis.docker-compose.yaml @@ -17,3 +17,8 @@ services: network_mode: host environment: - MYSQL_ROOT_PASSWORD=password + + redis: + container_name: redis + image: redis + network_mode: host diff --git a/examples/binlog_to_kafka_redis.yaml b/examples/binlog_to_kafka_redis.yaml new file mode 100644 index 0000000..ef284f9 --- /dev/null +++ b/examples/binlog_to_kafka_redis.yaml @@ -0,0 +1,31 @@ +collectors: + - id: binlog_collector + type: BinlogCollector + destination: filter + user: root + password: password +transformers: + - id: filter + type: BinlogFilter + destination: replicator + rules: + - exclude: + database: d1 + - include: + database: d1 + table: t1 + - id: replicator + type: EventReplicator + destinations: + - redis + - kafka +dispatchers: + - id: kafka + type: KafkaDispatcher + topic: logFlex.%{/meta/database}.%{/sql/table} + bootstrap_brokers: ["127.0.0.1:9092"] + - id: redis + type: RedisDispatcher + mode: + type: Pub + channel: logFlex.%{/meta/database}.%{/sql/table} diff --git a/transformers/wlf-binlog-filter/Cargo.toml b/transformers/wlf-binlog-filter/Cargo.toml index 8d6d248..a28d2e1 100644 --- a/transformers/wlf-binlog-filter/Cargo.toml +++ b/transformers/wlf-binlog-filter/Cargo.toml @@ -10,4 +10,5 @@ wlf-core = { path = "../../wlf-core" } tokio_wasi = { version = "1", features = ["rt", "time", "test-util", "macros"] } tracing = "0.1.37" thiserror = "1.0.40" -serde_json = "1.0.99" +async-trait = "0.1.68" +serde = { version = "1.0", features = ["derive"] } diff --git a/transformers/wlf-binlog-filter/src/lib.rs b/transformers/wlf-binlog-filter/src/lib.rs index ac2a6b6..594fc48 100644 --- a/transformers/wlf-binlog-filter/src/lib.rs +++ b/transformers/wlf-binlog-filter/src/lib.rs @@ -1,16 +1,19 @@ use std::sync::Arc; -use serde_json::Value; +use async_trait::async_trait; +use serde::Deserialize; use thiserror::Error; use tracing::info; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, - ComponentApi, ComponentKind, Event, + ComponentApi, ComponentKind, Event, Value, }; +#[derive(Deserialize, Debug)] pub struct BinlogFilter { id: String, destination: String, + #[serde(flatten)] rules: BinlogFilterRules, } @@ -20,6 +23,7 @@ pub enum Error { EventRouter(#[from] wlf_core::event_router::Error), } +#[async_trait] impl ComponentApi for BinlogFilter { fn id(&self) -> &str { self.id.as_str() @@ -28,21 +32,8 @@ impl ComponentApi for BinlogFilter { fn kind(&self) -> ComponentKind { ComponentKind::Transformer } -} -impl BinlogFilter { - pub fn new( - id: impl Into, - destination: impl Into, - rules: BinlogFilterRules, - ) -> Self { - Self { - id: id.into(), - destination: destination.into(), - rules, - } - } - pub async fn start_filtering(self, router: Arc) -> Result<(), Error> { + async fn run(&self, router: Arc) -> Result<(), Box> { while let Ok(event) = router.poll_event(self.id()).await { info!("{} receives new event:\n{event:#?}", self.id); @@ -56,11 +47,13 @@ impl BinlogFilter { } } -#[derive(Default)] +#[derive(Default, Deserialize, Debug)] pub struct BinlogFilterRules { rules: Vec, } +#[derive(Deserialize, Debug)] +#[serde(rename_all = "lowercase")] enum BinlogFilterRule { Include { database: String, diff --git a/transformers/wlf-event-replicator/Cargo.toml b/transformers/wlf-event-replicator/Cargo.toml new file mode 100644 index 0000000..1afcdb1 --- /dev/null +++ b/transformers/wlf-event-replicator/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "wlf-event-replicator" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +wlf-core = { path = "../../wlf-core" } +tokio_wasi = { version = "1", features = ["rt", "time", "test-util", "macros"] } +tracing = "0.1.37" +thiserror = "1.0.40" +async-trait = "0.1.68" +serde = { version = "1.0", features = ["derive"] } diff --git a/transformers/wlf-event-replicator/src/lib.rs b/transformers/wlf-event-replicator/src/lib.rs new file mode 100644 index 0000000..ff19b03 --- /dev/null +++ b/transformers/wlf-event-replicator/src/lib.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde::Deserialize; +use thiserror::Error; +use tracing::info; +use wlf_core::{ + event_router::{EventRouter, EventRouterApi}, + ComponentApi, ComponentKind, +}; + +#[derive(Deserialize, Debug)] +pub struct EventReplicator { + id: String, + destinations: Vec, +} + +#[derive(Error, Debug)] +pub enum Error { + #[error("event router error, {0}")] + EventRouter(#[from] wlf_core::event_router::Error), +} + +#[async_trait] +impl ComponentApi for EventReplicator { + fn id(&self) -> &str { + self.id.as_str() + } + + fn kind(&self) -> ComponentKind { + ComponentKind::Transformer + } + + async fn run(&self, router: Arc) -> Result<(), Box> { + while let Ok(event) = router.poll_event(self.id()).await { + info!("{} receives new event:\n{event:#?}", self.id); + + for d in &self.destinations { + router.send_event(event.clone(), d).await?; + } + } + Ok(()) + } +} diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 9a1c797..8efd8eb 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" wlf-core = { path = "../wlf-core" } regex = "1.8.4" once_cell = "1.18.0" +async-trait = "0.1.68" diff --git a/utils/src/test_utils.rs b/utils/src/test_utils.rs index 4d7abae..a362874 100644 --- a/utils/src/test_utils.rs +++ b/utils/src/test_utils.rs @@ -1,6 +1,9 @@ //! Utilities for testing -use wlf_core::{ComponentApi, ComponentKind}; +use std::sync::Arc; + +use async_trait::async_trait; +use wlf_core::{event_router::EventRouter, ComponentApi, ComponentKind}; pub struct DummyComponent { id: String, @@ -16,6 +19,7 @@ impl DummyComponent { } } +#[async_trait] impl ComponentApi for DummyComponent { fn id(&self) -> &str { &self.id @@ -24,4 +28,8 @@ impl ComponentApi for DummyComponent { fn kind(&self) -> ComponentKind { self.kind } + + async fn run(&self, _router: Arc) -> Result<(), Box> { + Ok(()) + } } diff --git a/demos/Cargo.toml b/wlf-aio/Cargo.toml similarity index 62% rename from demos/Cargo.toml rename to wlf-aio/Cargo.toml index 60d0610..b2536fa 100644 --- a/demos/Cargo.toml +++ b/wlf-aio/Cargo.toml @@ -1,15 +1,15 @@ [package] -name = "demos" +name = "wlf-aio" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] wlf-core = { path = "../wlf-core" } wlf-binlog-collector = { path = "../collectors/wlf-binlog-collector" } wlf-binlog-filter = { path = "../transformers/wlf-binlog-filter" } +wlf-event-replicator = { path = "../transformers/wlf-event-replicator" } wlf-kafka-dispatcher = { path = "../dispatchers/wlf-kafka-dispatcher" } +wlf-redis-dispatcher = { path = "../dispatchers/wlf-redis-dispatcher" } tokio_wasi = { version = "1", features = [ "rt", "time", @@ -19,3 +19,7 @@ tokio_wasi = { version = "1", features = [ ] } futures-core = { version = "0.3", default-features = false } tracing-subscriber = "0.3.17" +clap = { version = "4.3.15", features = ["derive"] } +serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9.24" +tracing = "0.1.37" diff --git a/wlf-aio/src/config.rs b/wlf-aio/src/config.rs new file mode 100644 index 0000000..ebe2b4c --- /dev/null +++ b/wlf-aio/src/config.rs @@ -0,0 +1,63 @@ +use serde::Deserialize; +use wlf_binlog_collector::BinlogCollector; +use wlf_binlog_filter::BinlogFilter; +use wlf_core::ComponentApi; +use wlf_event_replicator::EventReplicator; +use wlf_kafka_dispatcher::KafkaDispatcher; +use wlf_redis_dispatcher::RedisDispatcher; + +#[derive(Deserialize, Debug)] +pub(crate) struct Config { + #[serde(default)] + pub(crate) collectors: Vec, + #[serde(default)] + pub(crate) transformers: Vec, + #[serde(default)] + pub(crate) dispatchers: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "type")] +pub(crate) enum Collector { + BinlogCollector(BinlogCollector), +} + +impl Collector { + pub(crate) fn as_component(&self) -> &dyn ComponentApi { + match self { + Collector::BinlogCollector(c) => c, + } + } +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "type")] +pub(crate) enum Transformer { + BinlogFilter(BinlogFilter), + EventReplicator(EventReplicator), +} + +impl Transformer { + pub(crate) fn as_component(&self) -> &dyn ComponentApi { + match self { + Transformer::BinlogFilter(t) => t, + Transformer::EventReplicator(t) => t, + } + } +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "type")] +pub(crate) enum Dispatcher { + KafkaDispatcher(KafkaDispatcher), + RedisDispatcher(RedisDispatcher), +} + +impl Dispatcher { + pub(crate) fn as_component(&self) -> &dyn ComponentApi { + match self { + Dispatcher::KafkaDispatcher(d) => d, + Dispatcher::RedisDispatcher(d) => d, + } + } +} diff --git a/wlf-aio/src/main.rs b/wlf-aio/src/main.rs new file mode 100644 index 0000000..f1686da --- /dev/null +++ b/wlf-aio/src/main.rs @@ -0,0 +1,71 @@ +use std::{error::Error, fs::File, io::BufReader, path::PathBuf, sync::Arc}; + +use clap::{command, Parser}; +use tracing::error; +use wlf_core::event_router::{EventRouter, EventRouterApi}; + +use crate::config::Config; + +mod config; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + #[arg(short, long, value_name = "FILE")] + config: PathBuf, +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let cli = Cli::parse(); + let config: Config = serde_yaml::from_reader(BufReader::new(File::open(cli.config)?))?; + + let mut router = EventRouter::new(); + for c in &config.collectors { + router.register_component(c.as_component()); + } + for t in &config.transformers { + router.register_component(t.as_component()); + } + for d in &config.dispatchers { + router.register_component(d.as_component()); + } + let router = Arc::new(router); + + let mut handles = vec![]; + for c in config.collectors { + let router_c = Arc::clone(&router); + let handle = tokio::spawn(async move { + if let Err(e) = c.as_component().run(router_c).await { + error!(e); + } + }); + handles.push(handle); + } + for t in config.transformers { + let router_c = Arc::clone(&router); + let handle = tokio::spawn(async move { + if let Err(e) = t.as_component().run(router_c).await { + error!(e); + } + }); + handles.push(handle); + } + for d in config.dispatchers { + let router_c = Arc::clone(&router); + let handle = tokio::spawn(async move { + if let Err(e) = d.as_component().run(router_c).await { + error!(e); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("something went wrong"); + } + + Ok(()) +} diff --git a/wlf-core/src/event_router.rs b/wlf-core/src/event_router.rs index 55feaa7..8696d98 100644 --- a/wlf-core/src/event_router.rs +++ b/wlf-core/src/event_router.rs @@ -11,7 +11,7 @@ use tracing::error; pub trait EventRouterApi { async fn send_event(&self, event: Event, component_id: &str) -> Result<(), Error>; async fn poll_event(&self, component_id: &str) -> Result; - fn register_component(&mut self, collector: &impl ComponentApi); + fn register_component(&mut self, collector: &dyn ComponentApi); } #[derive(Debug, Error)] @@ -79,7 +79,7 @@ impl EventRouterApi for EventRouter { }; Ok(rx.recv_async().await?) } - fn register_component(&mut self, component: &impl ComponentApi) { + fn register_component(&mut self, component: &dyn ComponentApi) { if self.registry.contains_key(component.id()) { error!("component {} has already been registered", component.id()); return; diff --git a/wlf-core/src/lib.rs b/wlf-core/src/lib.rs index 5b035d3..b08be21 100644 --- a/wlf-core/src/lib.rs +++ b/wlf-core/src/lib.rs @@ -1,7 +1,12 @@ mod event; pub mod event_router; +use std::{error::Error, sync::Arc}; + +use async_trait::async_trait; pub use event::{Event, EventMeta}; +use event_router::EventRouter; +pub use serde_json::json as value; pub use serde_json::Value; #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -11,7 +16,9 @@ pub enum ComponentKind { Dispatcher, } +#[async_trait] pub trait ComponentApi: 'static + Send + Sync { fn id(&self) -> &str; fn kind(&self) -> ComponentKind; + async fn run(&self, router: Arc) -> Result<(), Box>; }