Skip to content

Commit

Permalink
add wlf-aio
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty committed Jul 20, 2023
1 parent 83e3610 commit ae100ae
Show file tree
Hide file tree
Showing 23 changed files with 415 additions and 192 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ members = [
"collectors/wlf-binlog-collector",
# transformers
"transformers/wlf-binlog-filter",
"transformers/wlf-event-replicator",
# dispatchers
"dispatchers/wlf-kafka-dispatcher",
"dispatchers/wlf-redis-dispatcher",
# others
"demos",
"utils",
"wlf-aio",
]

[patch.crates-io]
Expand Down
3 changes: 2 additions & 1 deletion collectors/wlf-binlog-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3.28" }
chrono = "0.4.26"
sqlparser = { version = "0.35.0", features = ["visitor"] }
serde_json = "1.0.99"
serde = { version = "1.0", features = ["derive"] }
async-trait = "0.1.68"
85 changes: 48 additions & 37 deletions collectors/wlf-binlog-collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{LocalResult, TimeZone, Utc};
use futures_util::{pin_mut, StreamExt};
use mysql_cdc::{
binlog_client::BinlogClient,
events::{binlog_event::BinlogEvent, event_header::EventHeader},
};

use serde_json::json;
use serde::Deserialize;
use sql_analyzer::SqlAnalyzer;
use tracing::{error, info};
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
ComponentApi, ComponentKind, Event, EventMeta,
value, ComponentApi, ComponentKind, Event, EventMeta,
};

mod error;
Expand All @@ -23,37 +24,45 @@ pub use mysql_cdc::binlog_options::BinlogOptions;
pub use mysql_cdc::replica_options::ReplicaOptions;
pub use mysql_cdc::ssl_mode::SslMode;

#[derive(Deserialize, Debug)]
pub struct BinlogCollector {
id: String,
destination: String,
replica_options: ReplicaOptions,
pub id: String,
pub destination: String,
#[serde(default = "default_host")]
pub host: String,
pub user: String,
pub password: String,
#[serde(default = "default_port")]
pub port: u16,
}

pub fn default_host() -> String {
"localhost".to_string()
}

pub const fn default_port() -> u16 {
3306
}

#[async_trait]
impl ComponentApi for BinlogCollector {
fn id(&self) -> &str {
self.id.as_str()
}
fn kind(&self) -> ComponentKind {
ComponentKind::Collector
}
}

impl BinlogCollector {
pub fn new(
id: impl Into<String>,
destination: impl Into<String>,
replica_options: ReplicaOptions,
) -> Self {
Self {
id: id.into(),
destination: destination.into(),
replica_options,
}
}

pub async fn start_collecting(self, router: Arc<EventRouter>) -> Result<(), Error> {
async fn run(&self, router: Arc<EventRouter>) -> Result<(), Box<dyn std::error::Error>> {
// create the binlog client
let mut client = BinlogClient::new(self.replica_options);
let mut client = BinlogClient::new(ReplicaOptions {
username: self.user.clone(),
password: self.password.clone(),
ssl_mode: SslMode::Disabled,
binlog: BinlogOptions::from_end(),
..Default::default()
});

let events_stream = client.replicate().await?;
pin_mut!(events_stream);

Expand Down Expand Up @@ -85,15 +94,15 @@ fn into_wlf_event(
let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else {
return Err(Error::Other("failed to convert timestamp".to_string()));
};
let meta = json!({
let meta = value!({
"database": e.database_name,
"timestamp": timestamp,
"server_id": event_header.server_id,
"thread_id": e.thread_id,
});
let properties = sql_parser.analyze(&e.sql_statement)?;

let value = json!({"meta": meta, "sql": properties});
let value = value!({"meta": meta, "sql": properties});

Ok(Event {
value,
Expand All @@ -108,28 +117,24 @@ fn into_wlf_event(
mod tests {
use std::sync::Arc;

use mysql_cdc::{
binlog_options::BinlogOptions, replica_options::ReplicaOptions, ssl_mode::SslMode,
};
use utils::test_utils::DummyComponent;
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
ComponentKind,
ComponentApi, ComponentKind,
};

use crate::BinlogCollector;
use crate::{default_host, default_port, BinlogCollector};

#[tokio::test]
async fn collect() {
let options = ReplicaOptions {
username: String::from("root"),
password: String::from("password"),
blocking: true,
ssl_mode: SslMode::Disabled,
binlog: BinlogOptions::from_start(),
..Default::default()
let collector = BinlogCollector {
id: "binlog_collector".to_string(),
user: "root".to_string(),
destination: "dispatcher".to_string(),
host: default_host(),
password: "password".to_string(),
port: default_port(),
};
let collector = BinlogCollector::new("binlog_collector", "test", options);

let dummy_dispatcher = DummyComponent::new("dispatcher", ComponentKind::Dispatcher);

Expand All @@ -138,7 +143,13 @@ mod tests {
router.register_component(&dummy_dispatcher);
let router = Arc::new(router);

tokio::spawn(collector.start_collecting(Arc::clone(&router)));
let router_c = Arc::clone(&router);
tokio::spawn(async move {
collector
.run(Arc::clone(&router_c))
.await
.expect("failed to run collector");
});

while let Ok(event) = router.poll_event("dispatcher").await {
println!("{event:#?}");
Expand Down
24 changes: 13 additions & 11 deletions collectors/wlf-binlog-collector/src/sql_analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde_json::{Map, Value};
use sqlparser::{
ast::Statement,
dialect::MySqlDialect,
parser::{Parser, ParserError},
};
use thiserror::Error;
use wlf_core::{value, Value};

#[derive(Debug, Error)]
pub enum Error {
Expand All @@ -30,20 +30,22 @@ impl SqlAnalyzer {
let st = ast.remove(0);

// extract info
let mut properties: Map<String, Value> = Map::new();
properties.insert("statement".to_string(), sql.into());
match st {
let properties = match st {
Statement::Insert { table_name, .. } => {
properties.insert("type".to_string(), "insert".into());
properties.insert("table".to_string(), table_name.to_string().into());
value!({
"type": "insert",
"table": table_name.to_string()
})
}
Statement::CreateTable { name, .. } => {
properties.insert("type".to_string(), "table-create".into());
properties.insert("table".to_string(), name.to_string().into());
value!({
"type": "table-create",
"table": name.to_string()
})
}
_ => {}
}
_ => value!({}),
};

Ok(properties.into())
Ok(properties)
}
}
66 changes: 0 additions & 66 deletions demos/src/bin/mysql_table_kafka.rs

This file was deleted.

1 change: 0 additions & 1 deletion demos/src/lib.rs

This file was deleted.

2 changes: 2 additions & 0 deletions dispatchers/wlf-kafka-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ thiserror = "1.0.40"
serde_json = "1.0.99"
chrono = "0.4.26"
utils = { path = "../../utils" }
serde = { version = "1.0", features = ["derive"] }
async-trait = "0.1.68"
36 changes: 13 additions & 23 deletions dispatchers/wlf-kafka-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use chrono::Utc;
use rskafka::{
client::{
Expand All @@ -8,8 +9,9 @@ use rskafka::{
},
record::Record,
};
use serde::Deserialize;
use thiserror::Error;
use tracing::{error, info};
use tracing::{error, info, warn};
use utils::substitute_with_event;
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
Expand All @@ -24,16 +26,17 @@ pub enum Error {
KafkaClient(#[from] rskafka::client::error::Error),
#[error("serialize/deserialize error, {0}")]
Serde(#[from] serde_json::Error),
#[error("failed to generate topic name, {0}")]
TopicName(String),
}

#[derive(Deserialize, Debug)]
pub struct KafkaDispatcher {
id: String,
#[serde(default = "default_topic")]
topic: String,
bootstrap_brokers: Vec<String>,
}

#[async_trait]
impl ComponentApi for KafkaDispatcher {
fn id(&self) -> &str {
self.id.as_str()
Expand All @@ -42,23 +45,8 @@ impl ComponentApi for KafkaDispatcher {
fn kind(&self) -> ComponentKind {
ComponentKind::Dispatcher
}
}

impl KafkaDispatcher {
pub fn new(id: impl Into<String>, bootstrap_brokers: Vec<String>) -> Self {
Self {
id: id.into(),
topic: default_topic().to_string(),
bootstrap_brokers,
}
}

pub fn set_topic_template(&mut self, topic: impl Into<String>) -> &mut Self {
self.topic = topic.into();
self
}

pub async fn start_dispatching(self, router: Arc<EventRouter>) -> Result<(), Error> {
async fn run(&self, router: Arc<EventRouter>) -> Result<(), Box<dyn std::error::Error>> {
let client = ClientBuilder::new(self.bootstrap_brokers.clone())
.build()
.await?;
Expand All @@ -67,8 +55,10 @@ impl KafkaDispatcher {
while let Ok(event) = router.poll_event(self.id()).await {
info!("receive new event:\n{event:#?}");
// get the topic
let topic_name =
substitute_with_event(&self.topic, &event).map_err(Error::TopicName)?;
let Ok(topic_name) = substitute_with_event(&self.topic, &event) else {
warn!("can't generate topic_name for event");
continue;
};

// create the topic in kafka if topic does not exist
if !topics_cache.iter().any(|topic| topic.name == topic_name) {
Expand Down Expand Up @@ -101,6 +91,6 @@ impl KafkaDispatcher {
}
}

const fn default_topic() -> &'static str {
"wasm-log-flex"
fn default_topic() -> String {
"wasm-log-flex".to_string()
}
3 changes: 3 additions & 0 deletions dispatchers/wlf-redis-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ thiserror = "1.0.40"
utils = { path = "../../utils" }
tracing = "0.1.37"
serde_json = "1.0.99"
serde = { version = "1.0", features = ["derive"] }
async-trait = "0.1.68"


[dev-dependencies]
tracing-subscriber = "0.3.17"
Loading

0 comments on commit ae100ae

Please sign in to comment.