diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 8158fdca547d9..092a1af94aab9 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -12,25 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use anyhow::anyhow; use futures_async_stream::try_stream; -use risingwave_common::cast::{ - str_to_date, str_to_time, str_to_timestamp, str_with_time_zone_to_timestamptz, -}; -use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; +use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; +use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::iter_util::ZipEqFast; use simd_json::{BorrowedValue, StaticNode, ValueAccess}; +use crate::impl_common_parser_logic; use crate::parser::canal::operators::*; -use crate::parser::common::json_object_smart_get_value; +use crate::parser::common::{do_parse_simd_json_value, json_object_smart_get_value}; use crate::parser::util::at_least_one_ok; use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContextRef}; -use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic}; +use crate::source::{SourceColumnDesc, SourceContextRef, SourceFormat}; const AFTER: &str = "data"; const BEFORE: &str = "old"; @@ -104,7 +99,6 @@ impl CanalJsonParser { }) }) .collect::>>(); - at_least_one_ok(results) } CANAL_UPDATE_EVENT => { @@ -152,7 +146,6 @@ impl CanalJsonParser { }) }) .collect::>>(); - at_least_one_ok(results) } CANAL_DELETE_EVENT => { @@ -195,55 +188,24 @@ fn cannal_simd_json_parse_value( ) -> Result { match value { None | Some(BorrowedValue::Static(StaticNode::Null)) => Ok(None), - Some(v) => Ok(Some(cannal_do_parse_simd_json_value(dtype, v).map_err( - |e| { + Some(v) => Ok(Some( + do_parse_simd_json_value(&SourceFormat::CanalJson, dtype, v).map_err(|e| { tracing::warn!("failed to parse type '{}' from json: {}", dtype, e); anyhow!("failed to parse type '{}' from json: {}", dtype, e) - }, - )?)), + })?, + )), } } -#[inline] -fn cannal_do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result { - let v = match dtype { - // mysql use tinyint to represent boolean - DataType::Boolean => ScalarImpl::Bool(ensure_rust_type!(v, i16) != 0), - DataType::Int16 => ScalarImpl::Int16(ensure_rust_type!(v, i16)), - DataType::Int32 => ScalarImpl::Int32(ensure_rust_type!(v, i32)), - DataType::Int64 => ScalarImpl::Int64(ensure_rust_type!(v, i64)), - DataType::Float32 => ScalarImpl::Float32(ensure_rust_type!(v, f32).into()), - DataType::Float64 => ScalarImpl::Float64(ensure_rust_type!(v, f64).into()), - // FIXME: decimal should have more precision than f64 - DataType::Decimal => Decimal::from_str(ensure_str!(v, "string")) - .map_err(|_| anyhow!("parse decimal from string err {}", v))? - .into(), - DataType::Varchar => ensure_str!(v, "varchar").to_string().into(), - DataType::Date => str_to_date(ensure_str!(v, "date"))?.into(), - DataType::Time => str_to_time(ensure_str!(v, "time"))?.into(), - DataType::Timestamp => str_to_timestamp(ensure_str!(v, "string"))?.into(), - DataType::Timestamptz => { - str_with_time_zone_to_timestamptz(ensure_str!(v, "string"))?.into() - } - _ => { - return Err(RwError::from(InternalError(format!( - "cannal data source not support type {}", - dtype - )))) - } - }; - Ok(v) -} - #[cfg(test)] mod tests { - use std::str::FromStr; use risingwave_common::array::Op; use risingwave_common::cast::str_to_timestamp; use risingwave_common::row::Row; - use risingwave_common::types::{DataType, Decimal, ScalarImpl, ToOwnedDatum}; + use risingwave_common::types::{DataType, Decimal, JsonbVal, ScalarImpl, ToOwnedDatum}; + use serde_json::Value; use super::*; use crate::parser::SourceStreamChunkBuilder; @@ -251,7 +213,7 @@ mod tests { #[tokio::test] async fn test_data_types() { - let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1"}]}"#; + let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7,"json":12},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp","json":"json"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1","json":"{\"a\": 1, \"b\": 2}"}]}"#; let descs = vec![ SourceColumnDesc::simple("id", DataType::Int32, 0.into()), SourceColumnDesc::simple("date", DataType::Date, 1.into()), @@ -259,6 +221,8 @@ mod tests { SourceColumnDesc::simple("time", DataType::Time, 3.into()), SourceColumnDesc::simple("timestamp", DataType::Timestamp, 4.into()), SourceColumnDesc::simple("char", DataType::Varchar, 5.into()), + SourceColumnDesc::simple("binary", DataType::Bytea, 6.into()), + SourceColumnDesc::simple("json", DataType::Jsonb, 7.into()), ]; let parser = CanalJsonParser::new(descs.clone(), Default::default()).unwrap(); @@ -299,6 +263,18 @@ mod tests { row.datum_at(5).to_owned_datum(), Some(ScalarImpl::Utf8(Box::from("Kathleen".to_string()))) ); + assert_eq!( + row.datum_at(6).to_owned_datum(), + Some(ScalarImpl::Bytea(Box::from( + "Joseph\u{0}\u{0}\u{0}\u{0}".as_bytes() + ))) + ); + assert_eq!( + row.datum_at(7).to_owned_datum(), + Some(ScalarImpl::Jsonb(JsonbVal::from(Value::from( + "{\"a\": 1, \"b\": 2}".to_string() + )))) + ); } #[tokio::test] diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index a1854a5f6a5d7..61583432c26e4 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -30,7 +30,9 @@ use simd_json::value::StaticNode; use simd_json::{BorrowedValue, ValueAccess}; use crate::source::SourceFormat; -use crate::{ensure_i16, ensure_i32, ensure_i64, ensure_str, simd_json_ensure_float}; +use crate::{ + ensure_i16, ensure_i32, ensure_i64, ensure_rust_type, ensure_str, simd_json_ensure_float, +}; pub(crate) fn json_object_smart_get_value<'a, 'b>( v: &'b simd_json::BorrowedValue<'a>, key: Cow<'b, str>, @@ -47,13 +49,14 @@ pub(crate) fn json_object_smart_get_value<'a, 'b>( None } -fn do_parse_simd_json_value( +pub(crate) fn do_parse_simd_json_value( format: &SourceFormat, dtype: &DataType, v: &BorrowedValue<'_>, ) -> Result { - let v = match dtype { - DataType::Boolean => match v { + let v = match (dtype, format) { + (DataType::Boolean, SourceFormat::CanalJson) => (ensure_rust_type!(v, i16) != 0).into(), + (DataType::Boolean, _) => match v { BorrowedValue::Static(StaticNode::Bool(b)) => (*b).into(), // debezium converts bool to int, false -> 0, true -> 1, for mysql and postgres BorrowedValue::Static(v) => match v.as_i64() { @@ -63,13 +66,18 @@ fn do_parse_simd_json_value( }, _ => anyhow::bail!("expect bool, but found {v}"), }, - DataType::Int16 => ensure_i16!(v, i16).into(), - DataType::Int32 => ensure_i32!(v, i32).into(), - DataType::Int64 => ensure_i64!(v, i64).into(), - DataType::Int256 => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(), - DataType::Serial => anyhow::bail!("serial should not be parsed"), + (DataType::Int16, SourceFormat::CanalJson) => ensure_rust_type!(v, i16).into(), + (DataType::Int16, _) => ensure_i16!(v, i16).into(), + (DataType::Int32, SourceFormat::CanalJson) => ensure_rust_type!(v, i32).into(), + (DataType::Int32, _) => ensure_i32!(v, i32).into(), + (DataType::Int64, SourceFormat::CanalJson) => ensure_rust_type!(v, i64).into(), + (DataType::Int64, _) => ensure_i64!(v, i64).into(), + (DataType::Int256, _) => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(), + (DataType::Serial, _) => anyhow::bail!("serial should not be parsed"), + // if the value is too large, str parsing to f32 will fail + (DataType::Float32, SourceFormat::CanalJson) => ensure_rust_type!(v, f32).into(), // when f32 overflows, the value is converted to `inf` which is inappropriate - DataType::Float32 => { + (DataType::Float32, _) => { let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into()); if let ScalarImpl::Float32(f) = scalar_val { if f.0.is_infinite() { @@ -78,13 +86,17 @@ fn do_parse_simd_json_value( } scalar_val } - DataType::Float64 => simd_json_ensure_float!(v, f64).into(), + (DataType::Float64, SourceFormat::CanalJson) => ensure_rust_type!(v, f64).into(), + (DataType::Float64, _) => simd_json_ensure_float!(v, f64).into(), + (DataType::Decimal, SourceFormat::CanalJson) => Decimal::from_str(ensure_str!(v, "string")) + .map_err(|_| anyhow!("parse decimal from string err {}", v))? + .into(), // FIXME: decimal should have more precision than f64 - DataType::Decimal => Decimal::try_from(simd_json_ensure_float!(v, Decimal)) + (DataType::Decimal, _) => Decimal::try_from(simd_json_ensure_float!(v, Decimal)) .map_err(|_| anyhow!("expect decimal"))? .into(), - DataType::Varchar => ensure_str!(v, "varchar").to_string().into(), - DataType::Bytea => match format { + (DataType::Varchar, _) => ensure_str!(v, "varchar").to_string().into(), + (DataType::Bytea, _) => match format { // debezium converts postgres bytea to base64 format SourceFormat::DebeziumJson => ScalarImpl::Bytea( base64::engine::general_purpose::STANDARD @@ -94,7 +106,7 @@ fn do_parse_simd_json_value( ), _ => ScalarImpl::Bytea(str_to_bytea(ensure_str!(v, "bytea")).map_err(|e| anyhow!(e))?), }, - DataType::Date => match v { + (DataType::Date, _) => match v { BorrowedValue::String(s) => str_to_date(s).map_err(|e| anyhow!(e))?.into(), BorrowedValue::Static(_) => { // debezium converts date to i32 for mysql and postgres @@ -102,7 +114,7 @@ fn do_parse_simd_json_value( } _ => anyhow::bail!("expect date, but found {v}"), }, - DataType::Time => { + (DataType::Time, _) => { match v { BorrowedValue::String(s) => str_to_time(s).map_err(|e| anyhow!(e))?.into(), BorrowedValue::Static(_) => { @@ -123,14 +135,14 @@ fn do_parse_simd_json_value( _ => anyhow::bail!("expect time, but found {v}"), } } - DataType::Timestamp => match v { + (DataType::Timestamp, _) => match v { BorrowedValue::String(s) => str_to_timestamp(s).map_err(|e| anyhow!(e))?.into(), BorrowedValue::Static(_) => i64_to_timestamp(ensure_i64!(v, i64)) .map_err(|e| anyhow!(e))? .into(), _ => anyhow::bail!("expect timestamp, but found {v}"), }, - DataType::Timestamptz => match v { + (DataType::Timestamptz, _) => match v { BorrowedValue::String(s) => str_with_time_zone_to_timestamptz(s) .map_err(|e| anyhow!(e))? .into(), @@ -139,7 +151,7 @@ fn do_parse_simd_json_value( .into(), _ => anyhow::bail!("expect timestamptz, but found {v}"), }, - DataType::Jsonb => { + (DataType::Jsonb, _) => { // jsonb will be output as a string in debezium format if *format == SourceFormat::DebeziumJson { ScalarImpl::Jsonb(JsonbVal::from_str(ensure_str!(v, "jsonb"))?) @@ -149,7 +161,7 @@ fn do_parse_simd_json_value( ScalarImpl::Jsonb(JsonbVal::from_serde(v)) } } - DataType::Struct(struct_type_info) => { + (DataType::Struct(struct_type_info), _) => { let fields: Vec> = struct_type_info .field_names .iter() @@ -164,7 +176,7 @@ fn do_parse_simd_json_value( .collect::>>()?; ScalarImpl::Struct(StructValue::new(fields)) } - DataType::List(item_type) => { + (DataType::List(item_type), _) => { if let BorrowedValue::Array(values) = v { let values = values .iter() @@ -179,7 +191,7 @@ fn do_parse_simd_json_value( return Err(anyhow!(err_msg)); } } - DataType::Interval => match format { + (DataType::Interval, _) => match format { SourceFormat::DebeziumJson => { ScalarImpl::Interval(Interval::from_iso_8601(ensure_str!(v, "interval"))?) }