Skip to content

Commit

Permalink
feat: support more types for canal json (#10053)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored and lmatz committed May 31, 2023
1 parent f67b9b5 commit fd246bd
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 72 deletions.
76 changes: 26 additions & 50 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use anyhow::anyhow;
use futures_async_stream::try_stream;
use risingwave_common::cast::{
str_to_date, str_to_time, str_to_timestamp, str_with_time_zone_to_timestamptz,
};
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::iter_util::ZipEqFast;
use simd_json::{BorrowedValue, StaticNode, ValueAccess};

use crate::impl_common_parser_logic;
use crate::parser::canal::operators::*;
use crate::parser::common::json_object_smart_get_value;
use crate::parser::common::{do_parse_simd_json_value, json_object_smart_get_value};
use crate::parser::util::at_least_one_ok;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::{SourceColumnDesc, SourceContextRef};
use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic};
use crate::source::{SourceColumnDesc, SourceContextRef, SourceFormat};

const AFTER: &str = "data";
const BEFORE: &str = "old";
Expand Down Expand Up @@ -104,7 +99,6 @@ impl CanalJsonParser {
})
})
.collect::<Vec<Result<_>>>();

at_least_one_ok(results)
}
CANAL_UPDATE_EVENT => {
Expand Down Expand Up @@ -152,7 +146,6 @@ impl CanalJsonParser {
})
})
.collect::<Vec<Result<_>>>();

at_least_one_ok(results)
}
CANAL_DELETE_EVENT => {
Expand Down Expand Up @@ -195,70 +188,41 @@ fn cannal_simd_json_parse_value(
) -> Result<Datum> {
match value {
None | Some(BorrowedValue::Static(StaticNode::Null)) => Ok(None),
Some(v) => Ok(Some(cannal_do_parse_simd_json_value(dtype, v).map_err(
|e| {
Some(v) => Ok(Some(
do_parse_simd_json_value(&SourceFormat::CanalJson, dtype, v).map_err(|e| {
tracing::warn!("failed to parse type '{}' from json: {}", dtype, e);
anyhow!("failed to parse type '{}' from json: {}", dtype, e)
},
)?)),
})?,
)),
}
}

#[inline]
fn cannal_do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result<ScalarImpl> {
let v = match dtype {
// mysql use tinyint to represent boolean
DataType::Boolean => ScalarImpl::Bool(ensure_rust_type!(v, i16) != 0),
DataType::Int16 => ScalarImpl::Int16(ensure_rust_type!(v, i16)),
DataType::Int32 => ScalarImpl::Int32(ensure_rust_type!(v, i32)),
DataType::Int64 => ScalarImpl::Int64(ensure_rust_type!(v, i64)),
DataType::Float32 => ScalarImpl::Float32(ensure_rust_type!(v, f32).into()),
DataType::Float64 => ScalarImpl::Float64(ensure_rust_type!(v, f64).into()),
// FIXME: decimal should have more precision than f64
DataType::Decimal => Decimal::from_str(ensure_str!(v, "string"))
.map_err(|_| anyhow!("parse decimal from string err {}", v))?
.into(),
DataType::Varchar => ensure_str!(v, "varchar").to_string().into(),
DataType::Date => str_to_date(ensure_str!(v, "date"))?.into(),
DataType::Time => str_to_time(ensure_str!(v, "time"))?.into(),
DataType::Timestamp => str_to_timestamp(ensure_str!(v, "string"))?.into(),
DataType::Timestamptz => {
str_with_time_zone_to_timestamptz(ensure_str!(v, "string"))?.into()
}
_ => {
return Err(RwError::from(InternalError(format!(
"cannal data source not support type {}",
dtype
))))
}
};
Ok(v)
}

#[cfg(test)]
mod tests {

use std::str::FromStr;

use risingwave_common::array::Op;
use risingwave_common::cast::str_to_timestamp;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Decimal, ScalarImpl, ToOwnedDatum};
use risingwave_common::types::{DataType, Decimal, JsonbVal, ScalarImpl, ToOwnedDatum};
use serde_json::Value;

use super::*;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceColumnDesc;

#[tokio::test]
async fn test_data_types() {
let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1"}]}"#;
let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7,"json":12},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp","json":"json"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1","json":"{\"a\": 1, \"b\": 2}"}]}"#;
let descs = vec![
SourceColumnDesc::simple("id", DataType::Int32, 0.into()),
SourceColumnDesc::simple("date", DataType::Date, 1.into()),
SourceColumnDesc::simple("datetime", DataType::Timestamp, 2.into()),
SourceColumnDesc::simple("time", DataType::Time, 3.into()),
SourceColumnDesc::simple("timestamp", DataType::Timestamp, 4.into()),
SourceColumnDesc::simple("char", DataType::Varchar, 5.into()),
SourceColumnDesc::simple("binary", DataType::Bytea, 6.into()),
SourceColumnDesc::simple("json", DataType::Jsonb, 7.into()),
];
let parser = CanalJsonParser::new(descs.clone(), Default::default()).unwrap();

Expand Down Expand Up @@ -299,6 +263,18 @@ mod tests {
row.datum_at(5).to_owned_datum(),
Some(ScalarImpl::Utf8(Box::from("Kathleen".to_string())))
);
assert_eq!(
row.datum_at(6).to_owned_datum(),
Some(ScalarImpl::Bytea(Box::from(
"Joseph\u{0}\u{0}\u{0}\u{0}".as_bytes()
)))
);
assert_eq!(
row.datum_at(7).to_owned_datum(),
Some(ScalarImpl::Jsonb(JsonbVal::from(Value::from(
"{\"a\": 1, \"b\": 2}".to_string()
))))
);
}

#[tokio::test]
Expand Down
56 changes: 34 additions & 22 deletions src/connector/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use simd_json::value::StaticNode;
use simd_json::{BorrowedValue, ValueAccess};

use crate::source::SourceFormat;
use crate::{ensure_i16, ensure_i32, ensure_i64, ensure_str, simd_json_ensure_float};
use crate::{
ensure_i16, ensure_i32, ensure_i64, ensure_rust_type, ensure_str, simd_json_ensure_float,
};
pub(crate) fn json_object_smart_get_value<'a, 'b>(
v: &'b simd_json::BorrowedValue<'a>,
key: Cow<'b, str>,
Expand All @@ -47,13 +49,14 @@ pub(crate) fn json_object_smart_get_value<'a, 'b>(
None
}

fn do_parse_simd_json_value(
pub(crate) fn do_parse_simd_json_value(
format: &SourceFormat,
dtype: &DataType,
v: &BorrowedValue<'_>,
) -> Result<ScalarImpl> {
let v = match dtype {
DataType::Boolean => match v {
let v = match (dtype, format) {
(DataType::Boolean, SourceFormat::CanalJson) => (ensure_rust_type!(v, i16) != 0).into(),
(DataType::Boolean, _) => match v {
BorrowedValue::Static(StaticNode::Bool(b)) => (*b).into(),
// debezium converts bool to int, false -> 0, true -> 1, for mysql and postgres
BorrowedValue::Static(v) => match v.as_i64() {
Expand All @@ -63,13 +66,18 @@ fn do_parse_simd_json_value(
},
_ => anyhow::bail!("expect bool, but found {v}"),
},
DataType::Int16 => ensure_i16!(v, i16).into(),
DataType::Int32 => ensure_i32!(v, i32).into(),
DataType::Int64 => ensure_i64!(v, i64).into(),
DataType::Int256 => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(),
DataType::Serial => anyhow::bail!("serial should not be parsed"),
(DataType::Int16, SourceFormat::CanalJson) => ensure_rust_type!(v, i16).into(),
(DataType::Int16, _) => ensure_i16!(v, i16).into(),
(DataType::Int32, SourceFormat::CanalJson) => ensure_rust_type!(v, i32).into(),
(DataType::Int32, _) => ensure_i32!(v, i32).into(),
(DataType::Int64, SourceFormat::CanalJson) => ensure_rust_type!(v, i64).into(),
(DataType::Int64, _) => ensure_i64!(v, i64).into(),
(DataType::Int256, _) => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(),
(DataType::Serial, _) => anyhow::bail!("serial should not be parsed"),
// if the value is too large, str parsing to f32 will fail
(DataType::Float32, SourceFormat::CanalJson) => ensure_rust_type!(v, f32).into(),
// when f32 overflows, the value is converted to `inf` which is inappropriate
DataType::Float32 => {
(DataType::Float32, _) => {
let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into());
if let ScalarImpl::Float32(f) = scalar_val {
if f.0.is_infinite() {
Expand All @@ -78,13 +86,17 @@ fn do_parse_simd_json_value(
}
scalar_val
}
DataType::Float64 => simd_json_ensure_float!(v, f64).into(),
(DataType::Float64, SourceFormat::CanalJson) => ensure_rust_type!(v, f64).into(),
(DataType::Float64, _) => simd_json_ensure_float!(v, f64).into(),
(DataType::Decimal, SourceFormat::CanalJson) => Decimal::from_str(ensure_str!(v, "string"))
.map_err(|_| anyhow!("parse decimal from string err {}", v))?
.into(),
// FIXME: decimal should have more precision than f64
DataType::Decimal => Decimal::try_from(simd_json_ensure_float!(v, Decimal))
(DataType::Decimal, _) => Decimal::try_from(simd_json_ensure_float!(v, Decimal))
.map_err(|_| anyhow!("expect decimal"))?
.into(),
DataType::Varchar => ensure_str!(v, "varchar").to_string().into(),
DataType::Bytea => match format {
(DataType::Varchar, _) => ensure_str!(v, "varchar").to_string().into(),
(DataType::Bytea, _) => match format {
// debezium converts postgres bytea to base64 format
SourceFormat::DebeziumJson => ScalarImpl::Bytea(
base64::engine::general_purpose::STANDARD
Expand All @@ -94,15 +106,15 @@ fn do_parse_simd_json_value(
),
_ => ScalarImpl::Bytea(str_to_bytea(ensure_str!(v, "bytea")).map_err(|e| anyhow!(e))?),
},
DataType::Date => match v {
(DataType::Date, _) => match v {
BorrowedValue::String(s) => str_to_date(s).map_err(|e| anyhow!(e))?.into(),
BorrowedValue::Static(_) => {
// debezium converts date to i32 for mysql and postgres
Date::with_days_since_unix_epoch(ensure_i32!(v, i32))?.into()
}
_ => anyhow::bail!("expect date, but found {v}"),
},
DataType::Time => {
(DataType::Time, _) => {
match v {
BorrowedValue::String(s) => str_to_time(s).map_err(|e| anyhow!(e))?.into(),
BorrowedValue::Static(_) => {
Expand All @@ -123,14 +135,14 @@ fn do_parse_simd_json_value(
_ => anyhow::bail!("expect time, but found {v}"),
}
}
DataType::Timestamp => match v {
(DataType::Timestamp, _) => match v {
BorrowedValue::String(s) => str_to_timestamp(s).map_err(|e| anyhow!(e))?.into(),
BorrowedValue::Static(_) => i64_to_timestamp(ensure_i64!(v, i64))
.map_err(|e| anyhow!(e))?
.into(),
_ => anyhow::bail!("expect timestamp, but found {v}"),
},
DataType::Timestamptz => match v {
(DataType::Timestamptz, _) => match v {
BorrowedValue::String(s) => str_with_time_zone_to_timestamptz(s)
.map_err(|e| anyhow!(e))?
.into(),
Expand All @@ -139,7 +151,7 @@ fn do_parse_simd_json_value(
.into(),
_ => anyhow::bail!("expect timestamptz, but found {v}"),
},
DataType::Jsonb => {
(DataType::Jsonb, _) => {
// jsonb will be output as a string in debezium format
if *format == SourceFormat::DebeziumJson {
ScalarImpl::Jsonb(JsonbVal::from_str(ensure_str!(v, "jsonb"))?)
Expand All @@ -149,7 +161,7 @@ fn do_parse_simd_json_value(
ScalarImpl::Jsonb(JsonbVal::from_serde(v))
}
}
DataType::Struct(struct_type_info) => {
(DataType::Struct(struct_type_info), _) => {
let fields: Vec<Option<ScalarImpl>> = struct_type_info
.field_names
.iter()
Expand All @@ -164,7 +176,7 @@ fn do_parse_simd_json_value(
.collect::<Result<Vec<Datum>>>()?;
ScalarImpl::Struct(StructValue::new(fields))
}
DataType::List(item_type) => {
(DataType::List(item_type), _) => {
if let BorrowedValue::Array(values) = v {
let values = values
.iter()
Expand All @@ -179,7 +191,7 @@ fn do_parse_simd_json_value(
return Err(anyhow!(err_msg));
}
}
DataType::Interval => match format {
(DataType::Interval, _) => match format {
SourceFormat::DebeziumJson => {
ScalarImpl::Interval(Interval::from_iso_8601(ensure_str!(v, "interval"))?)
}
Expand Down

0 comments on commit fd246bd

Please sign in to comment.