diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs index 1c7604c..447b0f9 100644 --- a/crates/fluss/src/metadata/json_serde.rs +++ b/crates/fluss/src/metadata/json_serde.rs @@ -17,7 +17,7 @@ use crate::error::Error::{InvalidTableError, JsonSerdeError}; use crate::error::Result; -use crate::metadata::datatype::{DataType, DataTypes}; +use crate::metadata::datatype::{DataField, DataType, DataTypes}; use crate::metadata::table::{Column, Schema, TableDescriptor}; use serde_json::{Value, json}; use std::collections::HashMap; @@ -58,10 +58,8 @@ impl DataType { const FIELD_NAME_TYPE_NAME: &'static str = "type"; const FIELD_NAME_NULLABLE: &'static str = "nullable"; const FIELD_NAME_LENGTH: &'static str = "length"; - #[allow(dead_code)] const FIELD_NAME_PRECISION: &'static str = "precision"; - #[allow(dead_code)] - const FILED_NAME_SCALE: &'static str = "scale"; + const FIELD_NAME_SCALE: &'static str = "scale"; #[allow(dead_code)] const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type"; #[allow(dead_code)] @@ -111,21 +109,54 @@ impl JsonSerde for DataType { obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length())); } DataType::Decimal(_type) => { - todo!() + obj.insert( + Self::FIELD_NAME_PRECISION.to_string(), + json!(_type.precision()), + ); + obj.insert(Self::FIELD_NAME_SCALE.to_string(), json!(_type.scale())); } - DataType::Time(_type) => { - todo!() + obj.insert( + Self::FIELD_NAME_PRECISION.to_string(), + json!(_type.precision()), + ); } DataType::Timestamp(_type) => { - todo!() + obj.insert( + Self::FIELD_NAME_PRECISION.to_string(), + json!(_type.precision()), + ); } DataType::TimestampLTz(_type) => { - todo!() + obj.insert( + Self::FIELD_NAME_PRECISION.to_string(), + json!(_type.precision()), + ); + } + DataType::Array(_type) => { + obj.insert( + Self::FIELD_NAME_ELEMENT_TYPE.to_string(), + _type.get_element_type().serialize_json()?, + ); + } + DataType::Map(_type) => { + obj.insert( + Self::FIELD_NAME_KEY_TYPE.to_string(), + _type.key_type().serialize_json()?, + ); + obj.insert( + Self::FIELD_NAME_VALUE_TYPE.to_string(), + _type.value_type().serialize_json()?, + ); + } + DataType::Row(_type) => { + let fields: Vec = _type + .fields() + .iter() + .map(|field| field.serialize_json()) + .collect::>()?; + obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields)); } - DataType::Array(_type) => todo!(), - DataType::Map(_type) => todo!(), - DataType::Row(_type) => todo!(), } Ok(Value::Object(obj)) } @@ -150,18 +181,112 @@ impl JsonSerde for DataType { "BIGINT" => DataTypes::bigint(), "FLOAT" => DataTypes::float(), "DOUBLE" => DataTypes::double(), - "CHAR" => todo!(), + "CHAR" => { + let length = node + .get(Self::FIELD_NAME_LENGTH) + .and_then(|v| v.as_u64()) + .ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_LENGTH + )) + })? as u32; + DataTypes::char(length) + } "STRING" => DataTypes::string(), - "DECIMAL" => todo!(), + "DECIMAL" => { + let precision = node + .get(Self::FIELD_NAME_PRECISION) + .and_then(|v| v.as_u64()) + .ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_PRECISION + )) + })? as u32; + let scale = node + .get(Self::FIELD_NAME_SCALE) + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32; + DataTypes::decimal(precision, scale) + } "DATE" => DataTypes::date(), - "TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately - "TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately - "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set separately + "TIME_WITHOUT_TIME_ZONE" => { + let precision = node + .get(Self::FIELD_NAME_PRECISION) + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32; + DataTypes::time_with_precision(precision) + } + "TIMESTAMP_WITHOUT_TIME_ZONE" => { + let precision = node + .get(Self::FIELD_NAME_PRECISION) + .and_then(|v| v.as_u64()) + .unwrap_or(6) as u32; + DataTypes::timestamp_with_precision(precision) + } + "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => { + let precision = node + .get(Self::FIELD_NAME_PRECISION) + .and_then(|v| v.as_u64()) + .unwrap_or(6) as u32; + DataTypes::timestamp_ltz_with_precision(precision) + } "BYTES" => DataTypes::bytes(), - "BINARY" => todo!(), - "ARRAY" => todo!(), - "MAP" => todo!(), - "ROW" => todo!(), + "BINARY" => { + let length = node + .get(Self::FIELD_NAME_LENGTH) + .and_then(|v| v.as_u64()) + .unwrap_or(1) as usize; + DataTypes::binary(length) + } + "ARRAY" => { + let element_type_node = + node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_ELEMENT_TYPE + )) + })?; + let element_type = DataType::deserialize_json(element_type_node)?; + DataTypes::array(element_type) + } + "MAP" => { + let key_type_node = node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_KEY_TYPE + )) + })?; + let key_type = DataType::deserialize_json(key_type_node)?; + let value_type_node = node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_VALUE_TYPE + )) + })?; + let value_type = DataType::deserialize_json(value_type_node)?; + DataTypes::map(key_type, value_type) + } + "ROW" => { + let fields_node = node + .get(Self::FIELD_NAME_FIELDS) + .ok_or_else(|| { + JsonSerdeError(format!( + "Missing required field: {}", + Self::FIELD_NAME_FIELDS + )) + })? + .as_array() + .ok_or_else(|| { + JsonSerdeError(format!("{} must be an array", Self::FIELD_NAME_FIELDS)) + })?; + let mut fields = Vec::with_capacity(fields_node.len()); + for field_node in fields_node { + fields.push(DataField::deserialize_json(field_node)?); + } + DataTypes::row(fields) + } _ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))), }; @@ -175,6 +300,51 @@ impl JsonSerde for DataType { } } +impl DataField { + const NAME: &'static str = "name"; + const FIELD_TYPE: &'static str = "field_type"; + const DESCRIPTION: &'static str = "description"; +} + +impl JsonSerde for DataField { + fn serialize_json(&self) -> Result { + let mut obj = serde_json::Map::new(); + + obj.insert(Self::NAME.to_string(), json!(self.name())); + obj.insert( + Self::FIELD_TYPE.to_string(), + self.data_type.serialize_json()?, + ); + + if let Some(description) = &self.description { + obj.insert(Self::DESCRIPTION.to_string(), json!(description)); + } + + Ok(Value::Object(obj)) + } + + fn deserialize_json(node: &Value) -> Result { + let name = node + .get(Self::NAME) + .and_then(|v| v.as_str()) + .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))? + .to_string(); + + let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| { + JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE)) + })?; + + let data_type = DataType::deserialize_json(field_type_node)?; + + let description = node + .get(Self::DESCRIPTION) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + Ok(DataField::new(name, data_type, description)) + } +} + impl Column { const NAME: &'static str = "name"; const DATA_TYPE: &'static str = "data_type"; @@ -203,7 +373,7 @@ impl JsonSerde for Column { let name = node .get(Self::NAME) .and_then(|v| v.as_str()) - .unwrap_or_else(|| panic!("{}", format!("Missing required field: {}", Self::NAME))) + .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))? .to_string(); let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| { @@ -263,7 +433,7 @@ impl JsonSerde for Schema { JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME)) })? .as_array() - .ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::COLUMNS_NAME)))?; + .ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?; let mut columns = Vec::with_capacity(columns_node.len()); for col_node in columns_node { @@ -275,14 +445,16 @@ impl JsonSerde for Schema { if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) { let pk_array = pk_node .as_array() - .ok_or_else(|| InvalidTableError("Primary key is not an array".to_string()))?; + .ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?; let mut primary_keys = Vec::with_capacity(pk_array.len()); for name_node in pk_array { primary_keys.push( name_node .as_str() - .ok_or_else(|| InvalidTableError("Primary key is not string".to_string()))? + .ok_or_else(|| { + InvalidTableError("Primary key element must be a string".to_string()) + })? .to_string(), ); } @@ -308,7 +480,7 @@ impl TableDescriptor { fn deserialize_properties(node: &Value) -> Result> { let obj = node .as_object() - .ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?; + .ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?; let mut properties = HashMap::with_capacity(obj.len()); for (key, value) in obj { @@ -316,7 +488,7 @@ impl TableDescriptor { key.clone(), value .as_str() - .ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))? + .ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))? .to_owned(), ); } @@ -383,9 +555,7 @@ impl JsonSerde for TableDescriptor { if let Some(comment_node) = node.get(Self::COMMENT_NAME) { let comment = comment_node .as_str() - .ok_or_else(|| { - JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME)) - })? + .ok_or_else(|| JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME)))? .to_owned(); builder = builder.comment(comment.as_str()); } @@ -400,7 +570,7 @@ impl JsonSerde for TableDescriptor { })? .as_array() .ok_or_else(|| { - JsonSerdeError(format!("{} should be an array", Self::PARTITION_KEY_NAME)) + JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME)) })?; let mut partition_keys = Vec::with_capacity(partition_node.len()); @@ -409,7 +579,10 @@ impl JsonSerde for TableDescriptor { key_node .as_str() .ok_or_else(|| { - JsonSerdeError(format!("{} should be a string", Self::PARTITION_KEY_NAME)) + JsonSerdeError(format!( + "{} element must be a string", + Self::PARTITION_KEY_NAME + )) })? .to_owned(), ); @@ -420,14 +593,14 @@ impl JsonSerde for TableDescriptor { let mut bucket_keys = vec![]; if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) { let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| { - JsonSerdeError(format!("{} should be an array", Self::BUCKET_COUNT_NAME)) + JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME)) })?; for key_node in bucket_key_node { bucket_keys.push( key_node .as_str() - .ok_or_else(|| JsonSerdeError("Bucket key should be a string".to_string()))? + .ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))? .to_owned(), ); } @@ -462,3 +635,47 @@ impl JsonSerde for TableDescriptor { builder.build() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + + #[test] + fn test_datatype_json_serde() { + let data_types = vec![ + DataTypes::boolean(), + DataTypes::tinyint(), + DataTypes::smallint(), + DataTypes::int().as_non_nullable(), + DataTypes::bigint(), + DataTypes::float(), + DataTypes::double(), + DataTypes::char(10), + DataTypes::string(), + DataTypes::decimal(10, 2), + DataTypes::date(), + DataTypes::time(), + DataTypes::timestamp(), + DataTypes::timestamp_ltz(), + DataTypes::bytes(), + DataTypes::binary(100), + DataTypes::array(DataTypes::int()), + DataTypes::map(DataTypes::string(), DataTypes::int()), + DataTypes::row(vec![ + DataField::new("f1".to_string(), DataTypes::int(), None), + DataField::new( + "f2".to_string(), + DataTypes::string(), + Some("desc".to_string()), + ), + ]), + ]; + + for dt in data_types { + let json = dt.serialize_json().unwrap(); + let deserialized = DataType::deserialize_json(&json).unwrap(); + assert_eq!(dt, deserialized); + } + } +}