Skip to content

Commit

Permalink
add create table parsing and write row event support
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty committed Jul 21, 2023
1 parent ae100ae commit 4cce0c2
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 44 deletions.
82 changes: 64 additions & 18 deletions collectors/wlf-binlog-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use mysql_cdc::{

use serde::Deserialize;
use sql_analyzer::SqlAnalyzer;
use tracing::{error, info};
use tracing::{info, warn};
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
value, ComponentApi, ComponentKind, Event, EventMeta,
value, ComponentApi, ComponentKind, Event, EventMeta, Value,
};

mod error;
Expand Down Expand Up @@ -70,10 +70,11 @@ impl ComponentApi for BinlogCollector {
let mut sql_parser = SqlAnalyzer::new();

while let Some(Ok((event_header, binlog_event))) = events_stream.next().await {
info!("new binlog event:\n{event_header:#?}\n{binlog_event:#?}");
info!("new binlog event:\n\t{event_header:?}\n\t{binlog_event:?}");
match into_wlf_event(&mut sql_parser, event_header, binlog_event) {
Ok(event) => router.send_event(event, &self.destination).await?,
Err(e) => error!("failed to convert binlog event, {e}"),
Ok(Some(event)) => router.send_event(event, &self.destination).await?,
Ok(None) => {}
Err(e) => warn!("failed to convert binlog event, {e}"),
}
}

Expand All @@ -83,32 +84,77 @@ impl ComponentApi for BinlogCollector {

/// The event structure is largely borrowed from [maxwells](https://maxwells-daemon.io/dataformat/)
fn into_wlf_event(
sql_parser: &mut SqlAnalyzer,
sql_analyzer: &mut SqlAnalyzer,
event_header: EventHeader,
binlog_event: BinlogEvent,
) -> Result<Event, Error> {
) -> Result<Option<Event>, Error> {
let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else {
return Err(Error::Other("failed to convert timestamp".to_string()));
};
match binlog_event {
BinlogEvent::QueryEvent(e) => {
info!("receive query event {e:?}");

let LocalResult::Single(timestamp) = Utc.timestamp_opt(event_header.timestamp as i64, 0) else {
return Err(Error::Other("failed to convert timestamp".to_string()));
};
let meta = value!({
"database": e.database_name,
let mut value = value!({
"timestamp": timestamp,
"server_id": event_header.server_id,
"thread_id": e.thread_id,
});
let properties = sql_parser.analyze(&e.sql_statement)?;

let value = value!({"meta": meta, "sql": properties});
let mut sql_properties = sql_analyzer.analyze(&e.database_name, &e.sql_statement)?;
if sql_properties.is_null() {
return Ok(None);
}

value
.as_object_mut()
.unwrap()
.append(sql_properties.as_object_mut().unwrap());

Ok(Event {
Ok(Some(Event {
value,
meta: EventMeta {},
}))
}
BinlogEvent::TableMapEvent(e) => {
sql_analyzer.map_table(&e.database_name, &e.table_name, e.table_id);
Ok(None)
}
BinlogEvent::WriteRowsEvent(e) => {
let (database, table) = sql_analyzer.get_table_info(e.table_id)?;
let columns = sql_analyzer.get_column_defs(e.table_id)?;
let mut value = value!({
"database": database,
"table": table,
"type": "insert",
"timestamp": timestamp,
"server_id": event_header.server_id,
"data": []
});
let data = value.pointer_mut("/data").unwrap().as_array_mut().unwrap();
for (i, r) in e.rows.iter().enumerate() {
let mut row_value = value!({});
let row_data = row_value.as_object_mut().unwrap();
for c in &r.cells {
let Some(def) = columns.get(i) else {
warn!("row data and column definitions do not match");
break;
};
row_data.insert(
def.name.to_string(),
c.as_ref().map_or(Value::Null, |v| format!("{v:?}").into()),
);
}
data.push(row_value);
}
Ok(Some(Event {
value,
meta: EventMeta {},
})
}))
}
BinlogEvent::RotateEvent(_)
| BinlogEvent::UnknownEvent
| BinlogEvent::FormatDescriptionEvent(_)
| BinlogEvent::HeartbeatEvent(_)
| BinlogEvent::XidEvent(_) => Ok(None),
_ => Err(Error::Other("unsupported binlog event".to_string())),
}
}
Expand Down
76 changes: 67 additions & 9 deletions collectors/wlf-binlog-collector/src/sql_analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;

use sqlparser::{
ast::Statement,
ast::{ColumnDef, Statement},
dialect::MySqlDialect,
parser::{Parser, ParserError},
};
Expand All @@ -10,18 +12,31 @@ use wlf_core::{value, Value};
pub enum Error {
#[error("failed to parse sql, {0}")]
Parse(#[from] ParserError),
#[error("table {0} not found")]
TableIdNotFound(u64),
#[error("table {1} not found in database {0}")]
TableNotFound(String, String),
#[error("{0}")]
Other(String),
}

pub(super) struct SqlAnalyzer;
/// Database and Table name
pub(crate) type TableRef = (String, String);

pub(crate) struct SqlAnalyzer {
table_map: HashMap<u64, TableRef>,
columns_map: HashMap<TableRef, Vec<ColumnDef>>,
}

impl SqlAnalyzer {
pub(super) fn new() -> Self {
Self
pub(crate) fn new() -> Self {
Self {
table_map: HashMap::new(),
columns_map: HashMap::new(),
}
}

pub(super) fn analyze(&self, sql: &str) -> Result<Value, Error> {
pub(crate) fn analyze(&mut self, database: &str, sql: &str) -> Result<Value, Error> {
// parse
let mut ast = Parser::parse_sql(&MySqlDialect {}, sql)?;
if ast.len() != 1 {
Expand All @@ -34,18 +49,61 @@ impl SqlAnalyzer {
Statement::Insert { table_name, .. } => {
value!({
"type": "insert",
"database" : database,
"table": table_name.to_string()
})
}
Statement::CreateTable { name, .. } => {
value!({
Statement::CreateTable { name, columns, .. } => {
let mut value = value!({
"type": "table-create",
"table": name.to_string()
"database" : database,
"table": name.to_string(),
"columns": {}
});
let map = value
.pointer_mut("/columns")
.unwrap()
.as_object_mut()
.unwrap();
for column in &columns {
map.insert(column.name.to_string(), column.data_type.to_string().into());
}
self.columns_map
.insert((database.to_string(), name.to_string()), columns);
value
}
Statement::CreateDatabase { .. } => {
value!({
"database" : database,
"type": "database-create",
})
}
_ => value!({}),
// return null if the sql is not worth analyzing. e.g., transaction BEGIN
_ => value!(null),
};

Ok(properties)
}

pub(crate) fn map_table(&mut self, database: &str, table: &str, id: u64) {
self.table_map
.insert(id, (database.to_string(), table.to_string()));
}

pub(crate) fn get_table_info(&self, table_id: u64) -> Result<&TableRef, Error> {
self.table_map
.get(&table_id)
.ok_or(Error::TableIdNotFound(table_id))
}

pub(crate) fn get_column_defs(&self, table_id: u64) -> Result<&Vec<ColumnDef>, Error> {
let table_ref = self
.table_map
.get(&table_id)
.ok_or(Error::TableIdNotFound(table_id))?;
self.columns_map.get(table_ref).ok_or(Error::TableNotFound(
table_ref.0.to_owned(),
table_ref.1.to_owned(),
))
}
}
6 changes: 3 additions & 3 deletions dispatchers/wlf-kafka-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rskafka::{
};
use serde::Deserialize;
use thiserror::Error;
use tracing::{error, info, warn};
use tracing::{error, info};
use utils::substitute_with_event;
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
Expand Down Expand Up @@ -53,10 +53,10 @@ impl ComponentApi for KafkaDispatcher {
let controller_client = client.controller_client()?;
let mut topics_cache = client.list_topics().await?;
while let Ok(event) = router.poll_event(self.id()).await {
info!("receive new event:\n{event:#?}");
info!("{} receives new event:\n\t{event:?}", self.id);

// get the topic
let Ok(topic_name) = substitute_with_event(&self.topic, &event) else {
warn!("can't generate topic_name for event");
continue;
};

Expand Down
8 changes: 2 additions & 6 deletions dispatchers/wlf-redis-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use redis::{
};
use serde::Deserialize;
use thiserror::Error;
use tracing::{info, warn};
use tracing::info;
use utils::substitute_with_event;
use wlf_core::{
event_router::{EventRouter, EventRouterApi},
Expand Down Expand Up @@ -132,12 +132,11 @@ impl ComponentApi for RedisDispatcher {
.await?;

while let Ok(event) = router.poll_event(&self.id).await {
info!("receive new event:\n{event:#?}");
info!("{} receives new event:\n\t{event:?}", self.id);

match &self.mode {
Mode::LPush { key } => {
let Ok(key) = substitute_with_event(key, &event) else {
warn!("can't generate key for event");
continue;
};
let value = serde_json::to_string(&event)?;
Expand All @@ -146,7 +145,6 @@ impl ComponentApi for RedisDispatcher {
}
Mode::RPush { key } => {
let Ok(key) = substitute_with_event(key, &event) else {
warn!("can't generate key for event");
continue;
};
let value = serde_json::to_string(&event)?;
Expand All @@ -155,7 +153,6 @@ impl ComponentApi for RedisDispatcher {
}
Mode::Pub { channel } => {
let Ok(channel) = substitute_with_event(channel, &event) else {
warn!("can't generate channel for event");
continue;
};
let value = serde_json::to_string(&event)?;
Expand All @@ -164,7 +161,6 @@ impl ComponentApi for RedisDispatcher {
}
Mode::XADD { key } => {
let Ok(key) = substitute_with_event(key, &event) else {
warn!("can't generate key for event");
continue;
};
let value = serde_json::to_string(&event)?;
Expand Down
4 changes: 2 additions & 2 deletions examples/binlog_to_kafka_redis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ transformers:
dispatchers:
- id: kafka
type: KafkaDispatcher
topic: logFlex.%{/meta/database}.%{/sql/table}
topic: logFlex.%{/database}.%{/table}
bootstrap_brokers: ["127.0.0.1:9092"]
- id: redis
type: RedisDispatcher
mode:
type: Pub
channel: logFlex.%{/meta/database}.%{/sql/table}
channel: logFlex.%{/database}.%{/table}
10 changes: 5 additions & 5 deletions transformers/wlf-binlog-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ComponentApi for BinlogFilter {

async fn run(&self, router: Arc<EventRouter>) -> Result<(), Box<dyn std::error::Error>> {
while let Ok(event) = router.poll_event(self.id()).await {
info!("{} receives new event:\n{event:#?}", self.id);
info!("{} receives new event:\n\t{event:?}", self.id);

if !self.rules.eval(&event) {
continue;
Expand Down Expand Up @@ -93,7 +93,7 @@ impl BinlogFilterRules {
fn eval(&self, event: &Event) -> bool {
self.rules.iter().fold(true, |st, rule| match rule {
BinlogFilterRule::Include { database, table } => {
let Some(Value::String(d)) = event.value.pointer("/meta/database") else {
let Some(Value::String(d)) = event.value.pointer("/database") else {
return st;
};
if d != database {
Expand All @@ -104,13 +104,13 @@ impl BinlogFilterRules {
return true;
};

match event.value.pointer("/sql/table") {
match event.value.pointer("/table") {
Some(Value::String(t)) if t == table => true,
_ => st,
}
}
BinlogFilterRule::Exclude { database, table } => {
let Some(Value::String(d)) = event.value.pointer("/meta/database") else {
let Some(Value::String(d)) = event.value.pointer("/database") else {
return st;
};
if d != database {
Expand All @@ -121,7 +121,7 @@ impl BinlogFilterRules {
return false;
};

match event.value.pointer("/sql/table") {
match event.value.pointer("/table") {
Some(Value::String(t)) if t == table => false,
_ => st,
}
Expand Down
2 changes: 1 addition & 1 deletion transformers/wlf-event-replicator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ComponentApi for EventReplicator {

async fn run(&self, router: Arc<EventRouter>) -> Result<(), Box<dyn std::error::Error>> {
while let Ok(event) = router.poll_event(self.id()).await {
info!("{} receives new event:\n{event:#?}", self.id);
info!("{} receives new event:\n\t{event:?}", self.id);

for d in &self.destinations {
router.send_event(event.clone(), d).await?;
Expand Down

0 comments on commit 4cce0c2

Please sign in to comment.