Skip to content

Commit 219c595

Browse files
pavlosptampcode-com
andcommitted
Implement JsonSerde for all datatypes
- Fix typo in FIELD_NAME_SCALE constant - Implement JsonSerde for DataField - Complete serialize_json and deserialize_json for all DataType variants - Add comprehensive test for DataType json serialization/deserialization Resolves issue #39 Amp-Thread-ID: https://ampcode.com/threads/T-12d3046f-c788-4b3e-b1ae-6830b05aa926 Co-authored-by: Amp <[email protected]>
1 parent 86efc93 commit 219c595

File tree

1 file changed

+206
-32
lines changed

1 file changed

+206
-32
lines changed

crates/fluss/src/metadata/json_serde.rs

Lines changed: 206 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::error::Error::{InvalidTableError, JsonSerdeError};
1919
use crate::error::Result;
20-
use crate::metadata::datatype::{DataType, DataTypes};
20+
use crate::metadata::datatype::{DataField, DataType, DataTypes};
2121
use crate::metadata::table::{Column, Schema, TableDescriptor};
2222
use serde_json::{Value, json};
2323
use std::collections::HashMap;
@@ -58,10 +58,8 @@ impl DataType {
5858
const FIELD_NAME_TYPE_NAME: &'static str = "type";
5959
const FIELD_NAME_NULLABLE: &'static str = "nullable";
6060
const FIELD_NAME_LENGTH: &'static str = "length";
61-
#[allow(dead_code)]
6261
const FIELD_NAME_PRECISION: &'static str = "precision";
63-
#[allow(dead_code)]
64-
const FILED_NAME_SCALE: &'static str = "scale";
62+
const FIELD_NAME_SCALE: &'static str = "scale";
6563
#[allow(dead_code)]
6664
const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type";
6765
#[allow(dead_code)]
@@ -110,22 +108,46 @@ impl JsonSerde for DataType {
110108
DataType::Binary(_type) => {
111109
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
112110
}
111+
DataType::Char(_type) => {
112+
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
113+
}
113114
DataType::Decimal(_type) => {
114-
todo!()
115+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
116+
obj.insert(Self::FIELD_NAME_SCALE.to_string(), json!(_type.scale()));
115117
}
116-
117118
DataType::Time(_type) => {
118-
todo!()
119+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
119120
}
120121
DataType::Timestamp(_type) => {
121-
todo!()
122+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
122123
}
123124
DataType::TimestampLTz(_type) => {
124-
todo!()
125+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
126+
}
127+
DataType::Array(_type) => {
128+
obj.insert(
129+
Self::FIELD_NAME_ELEMENT_TYPE.to_string(),
130+
_type.get_element_type().serialize_json()?,
131+
);
132+
}
133+
DataType::Map(_type) => {
134+
obj.insert(
135+
Self::FIELD_NAME_KEY_TYPE.to_string(),
136+
_type.key_type().serialize_json()?,
137+
);
138+
obj.insert(
139+
Self::FIELD_NAME_VALUE_TYPE.to_string(),
140+
_type.value_type().serialize_json()?,
141+
);
142+
}
143+
DataType::Row(_type) => {
144+
let fields: Vec<Value> = _type
145+
.fields()
146+
.iter()
147+
.map(|field| field.serialize_json())
148+
.collect::<Result<_>>()?;
149+
obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields));
125150
}
126-
DataType::Array(_type) => todo!(),
127-
DataType::Map(_type) => todo!(),
128-
DataType::Row(_type) => todo!(),
129151
}
130152
Ok(Value::Object(obj))
131153
}
@@ -150,18 +172,85 @@ impl JsonSerde for DataType {
150172
"BIGINT" => DataTypes::bigint(),
151173
"FLOAT" => DataTypes::float(),
152174
"DOUBLE" => DataTypes::double(),
153-
"CHAR" => todo!(),
175+
"CHAR" => {
176+
let length = node
177+
.get(Self::FIELD_NAME_LENGTH)
178+
.and_then(|v| v.as_u64())
179+
.unwrap_or(1) as u32;
180+
DataTypes::char(length)
181+
},
154182
"STRING" => DataTypes::string(),
155-
"DECIMAL" => todo!(),
183+
"DECIMAL" => {
184+
let precision = node
185+
.get(Self::FIELD_NAME_PRECISION)
186+
.and_then(|v| v.as_u64())
187+
.unwrap_or(10) as u32;
188+
let scale = node
189+
.get(Self::FIELD_NAME_SCALE)
190+
.and_then(|v| v.as_u64())
191+
.unwrap_or(0) as u32;
192+
DataTypes::decimal(precision, scale)
193+
},
156194
"DATE" => DataTypes::date(),
157-
"TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
158-
"TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
159-
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set separately
195+
"TIME_WITHOUT_TIME_ZONE" => {
196+
let precision = node
197+
.get(Self::FIELD_NAME_PRECISION)
198+
.and_then(|v| v.as_u64())
199+
.unwrap_or(0) as u32;
200+
DataTypes::time_with_precision(precision)
201+
},
202+
"TIMESTAMP_WITHOUT_TIME_ZONE" => {
203+
let precision = node
204+
.get(Self::FIELD_NAME_PRECISION)
205+
.and_then(|v| v.as_u64())
206+
.unwrap_or(6) as u32;
207+
DataTypes::timestamp_with_precision(precision)
208+
},
209+
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
210+
let precision = node
211+
.get(Self::FIELD_NAME_PRECISION)
212+
.and_then(|v| v.as_u64())
213+
.unwrap_or(6) as u32;
214+
DataTypes::timestamp_ltz_with_precision(precision)
215+
},
160216
"BYTES" => DataTypes::bytes(),
161-
"BINARY" => todo!(),
162-
"ARRAY" => todo!(),
163-
"MAP" => todo!(),
164-
"ROW" => todo!(),
217+
"BINARY" => {
218+
let length = node
219+
.get(Self::FIELD_NAME_LENGTH)
220+
.and_then(|v| v.as_u64())
221+
.unwrap_or(1) as usize;
222+
DataTypes::binary(length)
223+
},
224+
"ARRAY" => {
225+
let element_type_node = node
226+
.get(Self::FIELD_NAME_ELEMENT_TYPE)
227+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_ELEMENT_TYPE)))?;
228+
let element_type = DataType::deserialize_json(element_type_node)?;
229+
DataTypes::array(element_type)
230+
},
231+
"MAP" => {
232+
let key_type_node = node
233+
.get(Self::FIELD_NAME_KEY_TYPE)
234+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_KEY_TYPE)))?;
235+
let key_type = DataType::deserialize_json(key_type_node)?;
236+
let value_type_node = node
237+
.get(Self::FIELD_NAME_VALUE_TYPE)
238+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_VALUE_TYPE)))?;
239+
let value_type = DataType::deserialize_json(value_type_node)?;
240+
DataTypes::map(key_type, value_type)
241+
},
242+
"ROW" => {
243+
let fields_node = node
244+
.get(Self::FIELD_NAME_FIELDS)
245+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_FIELDS)))?
246+
.as_array()
247+
.ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::FIELD_NAME_FIELDS)))?;
248+
let mut fields = Vec::with_capacity(fields_node.len());
249+
for field_node in fields_node {
250+
fields.push(DataField::deserialize_json(field_node)?);
251+
}
252+
DataTypes::row(fields)
253+
},
165254
_ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))),
166255
};
167256

@@ -175,6 +264,51 @@ impl JsonSerde for DataType {
175264
}
176265
}
177266

267+
impl DataField {
268+
const NAME: &'static str = "name";
269+
const FIELD_TYPE: &'static str = "field_type";
270+
const DESCRIPTION: &'static str = "description";
271+
}
272+
273+
impl JsonSerde for DataField {
274+
fn serialize_json(&self) -> Result<Value> {
275+
let mut obj = serde_json::Map::new();
276+
277+
obj.insert(Self::NAME.to_string(), json!(self.name()));
278+
obj.insert(
279+
Self::FIELD_TYPE.to_string(),
280+
self.data_type.serialize_json()?,
281+
);
282+
283+
if let Some(description) = &self.description {
284+
obj.insert(Self::DESCRIPTION.to_string(), json!(description));
285+
}
286+
287+
Ok(Value::Object(obj))
288+
}
289+
290+
fn deserialize_json(node: &Value) -> Result<DataField> {
291+
let name = node
292+
.get(Self::NAME)
293+
.and_then(|v| v.as_str())
294+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
295+
.to_string();
296+
297+
let field_type_node = node
298+
.get(Self::FIELD_TYPE)
299+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE)))?;
300+
301+
let data_type = DataType::deserialize_json(field_type_node)?;
302+
303+
let description = node
304+
.get(Self::DESCRIPTION)
305+
.and_then(|v| v.as_str())
306+
.map(|s| s.to_string());
307+
308+
Ok(DataField::new(name, data_type, description))
309+
}
310+
}
311+
178312
impl Column {
179313
const NAME: &'static str = "name";
180314
const DATA_TYPE: &'static str = "data_type";
@@ -263,7 +397,7 @@ impl JsonSerde for Schema {
263397
JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME))
264398
})?
265399
.as_array()
266-
.ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::COLUMNS_NAME)))?;
400+
.ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?;
267401

268402
let mut columns = Vec::with_capacity(columns_node.len());
269403
for col_node in columns_node {
@@ -275,14 +409,14 @@ impl JsonSerde for Schema {
275409
if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
276410
let pk_array = pk_node
277411
.as_array()
278-
.ok_or_else(|| InvalidTableError("Primary key is not an array".to_string()))?;
412+
.ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?;
279413

280414
let mut primary_keys = Vec::with_capacity(pk_array.len());
281415
for name_node in pk_array {
282416
primary_keys.push(
283417
name_node
284418
.as_str()
285-
.ok_or_else(|| InvalidTableError("Primary key is not string".to_string()))?
419+
.ok_or_else(|| InvalidTableError("Primary key element must be a string".to_string()))?
286420
.to_string(),
287421
);
288422
}
@@ -307,16 +441,16 @@ impl TableDescriptor {
307441

308442
fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> {
309443
let obj = node
310-
.as_object()
311-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?;
444+
.as_object()
445+
.ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?;
312446

313447
let mut properties = HashMap::with_capacity(obj.len());
314448
for (key, value) in obj {
315449
properties.insert(
316450
key.clone(),
317451
value
318452
.as_str()
319-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?
453+
.ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))?
320454
.to_owned(),
321455
);
322456
}
@@ -384,7 +518,7 @@ impl JsonSerde for TableDescriptor {
384518
let comment = comment_node
385519
.as_str()
386520
.ok_or_else(|| {
387-
JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME))
521+
JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME))
388522
})?
389523
.to_owned();
390524
builder = builder.comment(comment.as_str());
@@ -400,7 +534,7 @@ impl JsonSerde for TableDescriptor {
400534
})?
401535
.as_array()
402536
.ok_or_else(|| {
403-
JsonSerdeError(format!("{} should be an array", Self::PARTITION_KEY_NAME))
537+
JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME))
404538
})?;
405539

406540
let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -409,7 +543,7 @@ impl JsonSerde for TableDescriptor {
409543
key_node
410544
.as_str()
411545
.ok_or_else(|| {
412-
JsonSerdeError(format!("{} should be a string", Self::PARTITION_KEY_NAME))
546+
JsonSerdeError(format!("{} element must be a string", Self::PARTITION_KEY_NAME))
413547
})?
414548
.to_owned(),
415549
);
@@ -420,14 +554,14 @@ impl JsonSerde for TableDescriptor {
420554
let mut bucket_keys = vec![];
421555
if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
422556
let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
423-
JsonSerdeError(format!("{} should be an array", Self::BUCKET_COUNT_NAME))
557+
JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME))
424558
})?;
425559

426560
for key_node in bucket_key_node {
427561
bucket_keys.push(
428562
key_node
429563
.as_str()
430-
.ok_or_else(|| JsonSerdeError("Bucket key should be a string".to_string()))?
564+
.ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))?
431565
.to_owned(),
432566
);
433567
}
@@ -462,3 +596,43 @@ impl JsonSerde for TableDescriptor {
462596
builder.build()
463597
}
464598
}
599+
600+
#[cfg(test)]
601+
mod tests {
602+
use super::*;
603+
use crate::metadata::DataTypes;
604+
605+
#[test]
606+
fn test_datatype_json_serde() {
607+
let data_types = vec![
608+
DataTypes::boolean(),
609+
DataTypes::tinyint(),
610+
DataTypes::smallint(),
611+
DataTypes::int(),
612+
DataTypes::bigint(),
613+
DataTypes::float(),
614+
DataTypes::double(),
615+
DataTypes::char(10),
616+
DataTypes::string(),
617+
DataTypes::decimal(10, 2),
618+
DataTypes::date(),
619+
DataTypes::time(),
620+
DataTypes::timestamp(),
621+
DataTypes::timestamp_ltz(),
622+
DataTypes::bytes(),
623+
DataTypes::binary(100),
624+
DataTypes::array(DataTypes::int()),
625+
DataTypes::map(DataTypes::string(), DataTypes::int()),
626+
DataTypes::row(vec![
627+
crate::metadata::DataField::new("f1".to_string(), DataTypes::int(), None),
628+
crate::metadata::DataField::new("f2".to_string(), DataTypes::string(), Some("desc".to_string())),
629+
]),
630+
];
631+
632+
for dt in data_types {
633+
let json = dt.serialize_json().unwrap();
634+
let deserialized = DataType::deserialize_json(&json).unwrap();
635+
assert_eq!(dt, deserialized);
636+
}
637+
}
638+
}

0 commit comments

Comments
 (0)