From 49d0fad7041728d85c660ddbd16104780550fd68 Mon Sep 17 00:00:00 2001
From: Pavlos-Petros Tournaris
Date: Sun, 26 Oct 2025 21:18:11 +0200
Subject: [PATCH] Implement JsonSerde for all datatypes
- Fix typo in FIELD_NAME_SCALE constant
- Implement JsonSerde for DataField
- Complete serialize_json and deserialize_json for all DataType variants
- Add comprehensive test for DataType json serialization/deserialization
Resolves issue #39
Amp-Thread-ID: https://ampcode.com/threads/T-12d3046f-c788-4b3e-b1ae-6830b05aa926
Co-authored-by: Amp
Update crates/fluss/src/metadata/json_serde.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Update crates/fluss/src/metadata/json_serde.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Update crates/fluss/src/metadata/json_serde.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Update crates/fluss/src/metadata/json_serde.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
crates/fluss/src/metadata/json_serde.rs | 285 +++++++++++++++++++++---
1 file changed, 251 insertions(+), 34 deletions(-)
diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs
index 1c7604c..447b0f9 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -17,7 +17,7 @@
use crate::error::Error::{InvalidTableError, JsonSerdeError};
use crate::error::Result;
-use crate::metadata::datatype::{DataType, DataTypes};
+use crate::metadata::datatype::{DataField, DataType, DataTypes};
use crate::metadata::table::{Column, Schema, TableDescriptor};
use serde_json::{Value, json};
use std::collections::HashMap;
@@ -58,10 +58,8 @@ impl DataType {
const FIELD_NAME_TYPE_NAME: &'static str = "type";
const FIELD_NAME_NULLABLE: &'static str = "nullable";
const FIELD_NAME_LENGTH: &'static str = "length";
- #[allow(dead_code)]
const FIELD_NAME_PRECISION: &'static str = "precision";
- #[allow(dead_code)]
- const FILED_NAME_SCALE: &'static str = "scale";
+ const FIELD_NAME_SCALE: &'static str = "scale";
#[allow(dead_code)]
const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type";
#[allow(dead_code)]
@@ -111,21 +109,54 @@ impl JsonSerde for DataType {
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
}
DataType::Decimal(_type) => {
- todo!()
+ obj.insert(
+ Self::FIELD_NAME_PRECISION.to_string(),
+ json!(_type.precision()),
+ );
+ obj.insert(Self::FIELD_NAME_SCALE.to_string(), json!(_type.scale()));
}
-
DataType::Time(_type) => {
- todo!()
+ obj.insert(
+ Self::FIELD_NAME_PRECISION.to_string(),
+ json!(_type.precision()),
+ );
}
DataType::Timestamp(_type) => {
- todo!()
+ obj.insert(
+ Self::FIELD_NAME_PRECISION.to_string(),
+ json!(_type.precision()),
+ );
}
DataType::TimestampLTz(_type) => {
- todo!()
+ obj.insert(
+ Self::FIELD_NAME_PRECISION.to_string(),
+ json!(_type.precision()),
+ );
+ }
+ DataType::Array(_type) => {
+ obj.insert(
+ Self::FIELD_NAME_ELEMENT_TYPE.to_string(),
+ _type.get_element_type().serialize_json()?,
+ );
+ }
+ DataType::Map(_type) => {
+ obj.insert(
+ Self::FIELD_NAME_KEY_TYPE.to_string(),
+ _type.key_type().serialize_json()?,
+ );
+ obj.insert(
+ Self::FIELD_NAME_VALUE_TYPE.to_string(),
+ _type.value_type().serialize_json()?,
+ );
+ }
+ DataType::Row(_type) => {
+ let fields: Vec = _type
+ .fields()
+ .iter()
+ .map(|field| field.serialize_json())
+ .collect::>()?;
+ obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields));
}
- DataType::Array(_type) => todo!(),
- DataType::Map(_type) => todo!(),
- DataType::Row(_type) => todo!(),
}
Ok(Value::Object(obj))
}
@@ -150,18 +181,112 @@ impl JsonSerde for DataType {
"BIGINT" => DataTypes::bigint(),
"FLOAT" => DataTypes::float(),
"DOUBLE" => DataTypes::double(),
- "CHAR" => todo!(),
+ "CHAR" => {
+ let length = node
+ .get(Self::FIELD_NAME_LENGTH)
+ .and_then(|v| v.as_u64())
+ .ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_LENGTH
+ ))
+ })? as u32;
+ DataTypes::char(length)
+ }
"STRING" => DataTypes::string(),
- "DECIMAL" => todo!(),
+ "DECIMAL" => {
+ let precision = node
+ .get(Self::FIELD_NAME_PRECISION)
+ .and_then(|v| v.as_u64())
+ .ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_PRECISION
+ ))
+ })? as u32;
+ let scale = node
+ .get(Self::FIELD_NAME_SCALE)
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0) as u32;
+ DataTypes::decimal(precision, scale)
+ }
"DATE" => DataTypes::date(),
- "TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
- "TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
- "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set separately
+ "TIME_WITHOUT_TIME_ZONE" => {
+ let precision = node
+ .get(Self::FIELD_NAME_PRECISION)
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0) as u32;
+ DataTypes::time_with_precision(precision)
+ }
+ "TIMESTAMP_WITHOUT_TIME_ZONE" => {
+ let precision = node
+ .get(Self::FIELD_NAME_PRECISION)
+ .and_then(|v| v.as_u64())
+ .unwrap_or(6) as u32;
+ DataTypes::timestamp_with_precision(precision)
+ }
+ "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
+ let precision = node
+ .get(Self::FIELD_NAME_PRECISION)
+ .and_then(|v| v.as_u64())
+ .unwrap_or(6) as u32;
+ DataTypes::timestamp_ltz_with_precision(precision)
+ }
"BYTES" => DataTypes::bytes(),
- "BINARY" => todo!(),
- "ARRAY" => todo!(),
- "MAP" => todo!(),
- "ROW" => todo!(),
+ "BINARY" => {
+ let length = node
+ .get(Self::FIELD_NAME_LENGTH)
+ .and_then(|v| v.as_u64())
+ .unwrap_or(1) as usize;
+ DataTypes::binary(length)
+ }
+ "ARRAY" => {
+ let element_type_node =
+ node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_ELEMENT_TYPE
+ ))
+ })?;
+ let element_type = DataType::deserialize_json(element_type_node)?;
+ DataTypes::array(element_type)
+ }
+ "MAP" => {
+ let key_type_node = node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_KEY_TYPE
+ ))
+ })?;
+ let key_type = DataType::deserialize_json(key_type_node)?;
+ let value_type_node = node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_VALUE_TYPE
+ ))
+ })?;
+ let value_type = DataType::deserialize_json(value_type_node)?;
+ DataTypes::map(key_type, value_type)
+ }
+ "ROW" => {
+ let fields_node = node
+ .get(Self::FIELD_NAME_FIELDS)
+ .ok_or_else(|| {
+ JsonSerdeError(format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_FIELDS
+ ))
+ })?
+ .as_array()
+ .ok_or_else(|| {
+ JsonSerdeError(format!("{} must be an array", Self::FIELD_NAME_FIELDS))
+ })?;
+ let mut fields = Vec::with_capacity(fields_node.len());
+ for field_node in fields_node {
+ fields.push(DataField::deserialize_json(field_node)?);
+ }
+ DataTypes::row(fields)
+ }
_ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))),
};
@@ -175,6 +300,51 @@ impl JsonSerde for DataType {
}
}
+impl DataField {
+ const NAME: &'static str = "name";
+ const FIELD_TYPE: &'static str = "field_type";
+ const DESCRIPTION: &'static str = "description";
+}
+
+impl JsonSerde for DataField {
+ fn serialize_json(&self) -> Result {
+ let mut obj = serde_json::Map::new();
+
+ obj.insert(Self::NAME.to_string(), json!(self.name()));
+ obj.insert(
+ Self::FIELD_TYPE.to_string(),
+ self.data_type.serialize_json()?,
+ );
+
+ if let Some(description) = &self.description {
+ obj.insert(Self::DESCRIPTION.to_string(), json!(description));
+ }
+
+ Ok(Value::Object(obj))
+ }
+
+ fn deserialize_json(node: &Value) -> Result {
+ let name = node
+ .get(Self::NAME)
+ .and_then(|v| v.as_str())
+ .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
+ .to_string();
+
+ let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| {
+ JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE))
+ })?;
+
+ let data_type = DataType::deserialize_json(field_type_node)?;
+
+ let description = node
+ .get(Self::DESCRIPTION)
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+
+ Ok(DataField::new(name, data_type, description))
+ }
+}
+
impl Column {
const NAME: &'static str = "name";
const DATA_TYPE: &'static str = "data_type";
@@ -203,7 +373,7 @@ impl JsonSerde for Column {
let name = node
.get(Self::NAME)
.and_then(|v| v.as_str())
- .unwrap_or_else(|| panic!("{}", format!("Missing required field: {}", Self::NAME)))
+ .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
.to_string();
let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| {
@@ -263,7 +433,7 @@ impl JsonSerde for Schema {
JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME))
})?
.as_array()
- .ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::COLUMNS_NAME)))?;
+ .ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?;
let mut columns = Vec::with_capacity(columns_node.len());
for col_node in columns_node {
@@ -275,14 +445,16 @@ impl JsonSerde for Schema {
if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
let pk_array = pk_node
.as_array()
- .ok_or_else(|| InvalidTableError("Primary key is not an array".to_string()))?;
+ .ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?;
let mut primary_keys = Vec::with_capacity(pk_array.len());
for name_node in pk_array {
primary_keys.push(
name_node
.as_str()
- .ok_or_else(|| InvalidTableError("Primary key is not string".to_string()))?
+ .ok_or_else(|| {
+ InvalidTableError("Primary key element must be a string".to_string())
+ })?
.to_string(),
);
}
@@ -308,7 +480,7 @@ impl TableDescriptor {
fn deserialize_properties(node: &Value) -> Result> {
let obj = node
.as_object()
- .ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?;
+ .ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?;
let mut properties = HashMap::with_capacity(obj.len());
for (key, value) in obj {
@@ -316,7 +488,7 @@ impl TableDescriptor {
key.clone(),
value
.as_str()
- .ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?
+ .ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))?
.to_owned(),
);
}
@@ -383,9 +555,7 @@ impl JsonSerde for TableDescriptor {
if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
let comment = comment_node
.as_str()
- .ok_or_else(|| {
- JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME))
- })?
+ .ok_or_else(|| JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME)))?
.to_owned();
builder = builder.comment(comment.as_str());
}
@@ -400,7 +570,7 @@ impl JsonSerde for TableDescriptor {
})?
.as_array()
.ok_or_else(|| {
- JsonSerdeError(format!("{} should be an array", Self::PARTITION_KEY_NAME))
+ JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME))
})?;
let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -409,7 +579,10 @@ impl JsonSerde for TableDescriptor {
key_node
.as_str()
.ok_or_else(|| {
- JsonSerdeError(format!("{} should be a string", Self::PARTITION_KEY_NAME))
+ JsonSerdeError(format!(
+ "{} element must be a string",
+ Self::PARTITION_KEY_NAME
+ ))
})?
.to_owned(),
);
@@ -420,14 +593,14 @@ impl JsonSerde for TableDescriptor {
let mut bucket_keys = vec![];
if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
- JsonSerdeError(format!("{} should be an array", Self::BUCKET_COUNT_NAME))
+ JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME))
})?;
for key_node in bucket_key_node {
bucket_keys.push(
key_node
.as_str()
- .ok_or_else(|| JsonSerdeError("Bucket key should be a string".to_string()))?
+ .ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))?
.to_owned(),
);
}
@@ -462,3 +635,47 @@ impl JsonSerde for TableDescriptor {
builder.build()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+
+ #[test]
+ fn test_datatype_json_serde() {
+ let data_types = vec![
+ DataTypes::boolean(),
+ DataTypes::tinyint(),
+ DataTypes::smallint(),
+ DataTypes::int().as_non_nullable(),
+ DataTypes::bigint(),
+ DataTypes::float(),
+ DataTypes::double(),
+ DataTypes::char(10),
+ DataTypes::string(),
+ DataTypes::decimal(10, 2),
+ DataTypes::date(),
+ DataTypes::time(),
+ DataTypes::timestamp(),
+ DataTypes::timestamp_ltz(),
+ DataTypes::bytes(),
+ DataTypes::binary(100),
+ DataTypes::array(DataTypes::int()),
+ DataTypes::map(DataTypes::string(), DataTypes::int()),
+ DataTypes::row(vec![
+ DataField::new("f1".to_string(), DataTypes::int(), None),
+ DataField::new(
+ "f2".to_string(),
+ DataTypes::string(),
+ Some("desc".to_string()),
+ ),
+ ]),
+ ];
+
+ for dt in data_types {
+ let json = dt.serialize_json().unwrap();
+ let deserialized = DataType::deserialize_json(&json).unwrap();
+ assert_eq!(dt, deserialized);
+ }
+ }
+}