diff --git a/collectors/wlf-binlog-collector/src/lib.rs b/collectors/wlf-binlog-collector/src/lib.rs index c37304b..fae0c34 100644 --- a/collectors/wlf-binlog-collector/src/lib.rs +++ b/collectors/wlf-binlog-collector/src/lib.rs @@ -10,10 +10,10 @@ use mysql_cdc::{ use serde::Deserialize; use sql_analyzer::SqlAnalyzer; -use tracing::{error, info}; +use tracing::{info, warn}; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, - value, ComponentApi, ComponentKind, Event, EventMeta, + value, ComponentApi, ComponentKind, Event, EventMeta, Value, }; mod error; @@ -70,10 +70,11 @@ impl ComponentApi for BinlogCollector { let mut sql_parser = SqlAnalyzer::new(); while let Some(Ok((event_header, binlog_event))) = events_stream.next().await { - info!("new binlog event:\n{event_header:#?}\n{binlog_event:#?}"); + info!("new binlog event:\n\t{event_header:?}\n\t{binlog_event:?}"); match into_wlf_event(&mut sql_parser, event_header, binlog_event) { - Ok(event) => router.send_event(event, &self.destination).await?, - Err(e) => error!("failed to convert binlog event, {e}"), + Ok(Some(event)) => router.send_event(event, &self.destination).await?, + Ok(None) => {} + Err(e) => warn!("failed to convert binlog event, {e}"), } } @@ -83,32 +84,77 @@ impl ComponentApi for BinlogCollector { /// The event structure is largely borrowed from [maxwells](https://maxwells-daemon.io/dataformat/) fn into_wlf_event( - sql_parser: &mut SqlAnalyzer, + sql_analyzer: &mut SqlAnalyzer, event_header: EventHeader, binlog_event: BinlogEvent, -) -> Result { +) -> Result, Error> { + let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else { + return Err(Error::Other("failed to convert timestamp".to_string())); + }; match binlog_event { BinlogEvent::QueryEvent(e) => { - info!("receive query event {e:?}"); - - let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else { - return Err(Error::Other("failed to convert timestamp".to_string())); - }; - let meta = value!({ - "database": e.database_name, + let mut value = value!({ "timestamp": timestamp, "server_id": event_header.server_id, "thread_id": e.thread_id, }); - let properties = sql_parser.analyze(&e.sql_statement)?; - let value = value!({"meta": meta, "sql": properties}); + let mut sql_properties = sql_analyzer.analyze(&e.database_name, &e.sql_statement)?; + if sql_properties.is_null() { + return Ok(None); + } + + value + .as_object_mut() + .unwrap() + .append(sql_properties.as_object_mut().unwrap()); - Ok(Event { + Ok(Some(Event { + value, + meta: EventMeta {}, + })) + } + BinlogEvent::TableMapEvent(e) => { + sql_analyzer.map_table(&e.database_name, &e.table_name, e.table_id); + Ok(None) + } + BinlogEvent::WriteRowsEvent(e) => { + let (database, table) = sql_analyzer.get_table_info(e.table_id)?; + let columns = sql_analyzer.get_column_defs(e.table_id)?; + let mut value = value!({ + "database": database, + "table": table, + "type": "insert", + "timestamp": timestamp, + "server_id": event_header.server_id, + "data": [] + }); + let data = value.pointer_mut("/data").unwrap().as_array_mut().unwrap(); + for (i, r) in e.rows.iter().enumerate() { + let mut row_value = value!({}); + let row_data = row_value.as_object_mut().unwrap(); + for c in &r.cells { + let Some(def) = columns.get(i) else { + warn!("row data and column definitions do not match"); + break; + }; + row_data.insert( + def.name.to_string(), + c.as_ref().map_or(Value::Null, |v| format!("{v:?}").into()), + ); + } + data.push(row_value); + } + Ok(Some(Event { value, meta: EventMeta {}, - }) + })) } + BinlogEvent::RotateEvent(_) + | BinlogEvent::UnknownEvent + | BinlogEvent::FormatDescriptionEvent(_) + | BinlogEvent::HeartbeatEvent(_) + | BinlogEvent::XidEvent(_) => Ok(None), _ => Err(Error::Other("unsupported binlog event".to_string())), } } diff --git a/collectors/wlf-binlog-collector/src/sql_analyzer.rs b/collectors/wlf-binlog-collector/src/sql_analyzer.rs index 7bd3d08..0e724f0 100644 --- a/collectors/wlf-binlog-collector/src/sql_analyzer.rs +++ b/collectors/wlf-binlog-collector/src/sql_analyzer.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; + use sqlparser::{ - ast::Statement, + ast::{ColumnDef, Statement}, dialect::MySqlDialect, parser::{Parser, ParserError}, }; @@ -10,18 +12,31 @@ use wlf_core::{value, Value}; pub enum Error { #[error("failed to parse sql, {0}")] Parse(#[from] ParserError), + #[error("table {0} not found")] + TableIdNotFound(u64), + #[error("table {1} not found in database {0}")] + TableNotFound(String, String), #[error("{0}")] Other(String), } -pub(super) struct SqlAnalyzer; +/// Database and Table name +pub(crate) type TableRef = (String, String); + +pub(crate) struct SqlAnalyzer { + table_map: HashMap, + columns_map: HashMap>, +} impl SqlAnalyzer { - pub(super) fn new() -> Self { - Self + pub(crate) fn new() -> Self { + Self { + table_map: HashMap::new(), + columns_map: HashMap::new(), + } } - pub(super) fn analyze(&self, sql: &str) -> Result { + pub(crate) fn analyze(&mut self, database: &str, sql: &str) -> Result { // parse let mut ast = Parser::parse_sql(&MySqlDialect {}, sql)?; if ast.len() != 1 { @@ -34,18 +49,61 @@ impl SqlAnalyzer { Statement::Insert { table_name, .. } => { value!({ "type": "insert", + "database" : database, "table": table_name.to_string() }) } - Statement::CreateTable { name, .. } => { - value!({ + Statement::CreateTable { name, columns, .. } => { + let mut value = value!({ "type": "table-create", - "table": name.to_string() + "database" : database, + "table": name.to_string(), + "columns": {} + }); + let map = value + .pointer_mut("/columns") + .unwrap() + .as_object_mut() + .unwrap(); + for column in &columns { + map.insert(column.name.to_string(), column.data_type.to_string().into()); + } + self.columns_map + .insert((database.to_string(), name.to_string()), columns); + value + } + Statement::CreateDatabase { .. } => { + value!({ + "database" : database, + "type": "database-create", }) } - _ => value!({}), + // return null if the sql is not worth analyzing. e.g., transaction BEGIN + _ => value!(null), }; Ok(properties) } + + pub(crate) fn map_table(&mut self, database: &str, table: &str, id: u64) { + self.table_map + .insert(id, (database.to_string(), table.to_string())); + } + + pub(crate) fn get_table_info(&self, table_id: u64) -> Result<&TableRef, Error> { + self.table_map + .get(&table_id) + .ok_or(Error::TableIdNotFound(table_id)) + } + + pub(crate) fn get_column_defs(&self, table_id: u64) -> Result<&Vec, Error> { + let table_ref = self + .table_map + .get(&table_id) + .ok_or(Error::TableIdNotFound(table_id))?; + self.columns_map.get(table_ref).ok_or(Error::TableNotFound( + table_ref.0.to_owned(), + table_ref.1.to_owned(), + )) + } } diff --git a/dispatchers/wlf-kafka-dispatcher/src/lib.rs b/dispatchers/wlf-kafka-dispatcher/src/lib.rs index 65409a5..60ea57e 100644 --- a/dispatchers/wlf-kafka-dispatcher/src/lib.rs +++ b/dispatchers/wlf-kafka-dispatcher/src/lib.rs @@ -11,7 +11,7 @@ use rskafka::{ }; use serde::Deserialize; use thiserror::Error; -use tracing::{error, info, warn}; +use tracing::{error, info}; use utils::substitute_with_event; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, @@ -53,10 +53,10 @@ impl ComponentApi for KafkaDispatcher { let controller_client = client.controller_client()?; let mut topics_cache = client.list_topics().await?; while let Ok(event) = router.poll_event(self.id()).await { - info!("receive new event:\n{event:#?}"); + info!("{} receives new event:\n\t{event:?}", self.id); + // get the topic let Ok(topic_name) = substitute_with_event(&self.topic, &event) else { - warn!("can't generate topic_name for event"); continue; }; diff --git a/dispatchers/wlf-redis-dispatcher/src/lib.rs b/dispatchers/wlf-redis-dispatcher/src/lib.rs index 3d30d78..bcd19b9 100644 --- a/dispatchers/wlf-redis-dispatcher/src/lib.rs +++ b/dispatchers/wlf-redis-dispatcher/src/lib.rs @@ -7,7 +7,7 @@ use redis::{ }; use serde::Deserialize; use thiserror::Error; -use tracing::{info, warn}; +use tracing::info; use utils::substitute_with_event; use wlf_core::{ event_router::{EventRouter, EventRouterApi}, @@ -132,12 +132,11 @@ impl ComponentApi for RedisDispatcher { .await?; while let Ok(event) = router.poll_event(&self.id).await { - info!("receive new event:\n{event:#?}"); + info!("{} receives new event:\n\t{event:?}", self.id); match &self.mode { Mode::LPush { key } => { let Ok(key) = substitute_with_event(key, &event) else { - warn!("can't generate key for event"); continue; }; let value = serde_json::to_string(&event)?; @@ -146,7 +145,6 @@ impl ComponentApi for RedisDispatcher { } Mode::RPush { key } => { let Ok(key) = substitute_with_event(key, &event) else { - warn!("can't generate key for event"); continue; }; let value = serde_json::to_string(&event)?; @@ -155,7 +153,6 @@ impl ComponentApi for RedisDispatcher { } Mode::Pub { channel } => { let Ok(channel) = substitute_with_event(channel, &event) else { - warn!("can't generate channel for event"); continue; }; let value = serde_json::to_string(&event)?; @@ -164,7 +161,6 @@ impl ComponentApi for RedisDispatcher { } Mode::XADD { key } => { let Ok(key) = substitute_with_event(key, &event) else { - warn!("can't generate key for event"); continue; }; let value = serde_json::to_string(&event)?; diff --git a/examples/binlog_to_kafka_redis.yaml b/examples/binlog_to_kafka_redis.yaml index ef284f9..c68a720 100644 --- a/examples/binlog_to_kafka_redis.yaml +++ b/examples/binlog_to_kafka_redis.yaml @@ -22,10 +22,10 @@ transformers: dispatchers: - id: kafka type: KafkaDispatcher - topic: logFlex.%{/meta/database}.%{/sql/table} + topic: logFlex.%{/database}.%{/table} bootstrap_brokers: ["127.0.0.1:9092"] - id: redis type: RedisDispatcher mode: type: Pub - channel: logFlex.%{/meta/database}.%{/sql/table} + channel: logFlex.%{/database}.%{/table} diff --git a/transformers/wlf-binlog-filter/src/lib.rs b/transformers/wlf-binlog-filter/src/lib.rs index 594fc48..a6861ab 100644 --- a/transformers/wlf-binlog-filter/src/lib.rs +++ b/transformers/wlf-binlog-filter/src/lib.rs @@ -35,7 +35,7 @@ impl ComponentApi for BinlogFilter { async fn run(&self, router: Arc) -> Result<(), Box> { while let Ok(event) = router.poll_event(self.id()).await { - info!("{} receives new event:\n{event:#?}", self.id); + info!("{} receives new event:\n\t{event:?}", self.id); if !self.rules.eval(&event) { continue; @@ -93,7 +93,7 @@ impl BinlogFilterRules { fn eval(&self, event: &Event) -> bool { self.rules.iter().fold(true, |st, rule| match rule { BinlogFilterRule::Include { database, table } => { - let Some(Value::String(d)) = event.value.pointer("/meta/database") else { + let Some(Value::String(d)) = event.value.pointer("/database") else { return st; }; if d != database { @@ -104,13 +104,13 @@ impl BinlogFilterRules { return true; }; - match event.value.pointer("/sql/table") { + match event.value.pointer("/table") { Some(Value::String(t)) if t == table => true, _ => st, } } BinlogFilterRule::Exclude { database, table } => { - let Some(Value::String(d)) = event.value.pointer("/meta/database") else { + let Some(Value::String(d)) = event.value.pointer("/database") else { return st; }; if d != database { @@ -121,7 +121,7 @@ impl BinlogFilterRules { return false; }; - match event.value.pointer("/sql/table") { + match event.value.pointer("/table") { Some(Value::String(t)) if t == table => false, _ => st, } diff --git a/transformers/wlf-event-replicator/src/lib.rs b/transformers/wlf-event-replicator/src/lib.rs index ff19b03..96888b8 100644 --- a/transformers/wlf-event-replicator/src/lib.rs +++ b/transformers/wlf-event-replicator/src/lib.rs @@ -33,7 +33,7 @@ impl ComponentApi for EventReplicator { async fn run(&self, router: Arc) -> Result<(), Box> { while let Ok(event) = router.poll_event(self.id()).await { - info!("{} receives new event:\n{event:#?}", self.id); + info!("{} receives new event:\n\t{event:?}", self.id); for d in &self.destinations { router.send_event(event.clone(), d).await?;