Skip to content

Commit b5f37f9

Browse files
pavlosptCopilot
andcommitted
Implement JsonSerde for all datatypes
- Fix typo in FIELD_NAME_SCALE constant - Implement JsonSerde for DataField - Complete serialize_json and deserialize_json for all DataType variants - Add comprehensive test for DataType json serialization/deserialization Resolves issue #39 Amp-Thread-ID: https://ampcode.com/threads/T-12d3046f-c788-4b3e-b1ae-6830b05aa926 Co-authored-by: Amp <[email protected]> Update crates/fluss/src/metadata/json_serde.rs Co-authored-by: Copilot <[email protected]> Update crates/fluss/src/metadata/json_serde.rs Co-authored-by: Copilot <[email protected]> Update crates/fluss/src/metadata/json_serde.rs Co-authored-by: Copilot <[email protected]> Update crates/fluss/src/metadata/json_serde.rs Co-authored-by: Copilot <[email protected]>
1 parent 86efc93 commit b5f37f9

File tree

1 file changed

+203
-32
lines changed

1 file changed

+203
-32
lines changed

crates/fluss/src/metadata/json_serde.rs

Lines changed: 203 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::error::Error::{InvalidTableError, JsonSerdeError};
1919
use crate::error::Result;
20-
use crate::metadata::datatype::{DataType, DataTypes};
20+
use crate::metadata::datatype::{DataField, DataType, DataTypes};
2121
use crate::metadata::table::{Column, Schema, TableDescriptor};
2222
use serde_json::{Value, json};
2323
use std::collections::HashMap;
@@ -58,10 +58,8 @@ impl DataType {
5858
const FIELD_NAME_TYPE_NAME: &'static str = "type";
5959
const FIELD_NAME_NULLABLE: &'static str = "nullable";
6060
const FIELD_NAME_LENGTH: &'static str = "length";
61-
#[allow(dead_code)]
6261
const FIELD_NAME_PRECISION: &'static str = "precision";
63-
#[allow(dead_code)]
64-
const FILED_NAME_SCALE: &'static str = "scale";
62+
const FIELD_NAME_SCALE: &'static str = "scale";
6563
#[allow(dead_code)]
6664
const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type";
6765
#[allow(dead_code)]
@@ -111,21 +109,42 @@ impl JsonSerde for DataType {
111109
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
112110
}
113111
DataType::Decimal(_type) => {
114-
todo!()
112+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
113+
obj.insert(Self::FIELD_NAME_SCALE.to_string(), json!(_type.scale()));
115114
}
116-
117115
DataType::Time(_type) => {
118-
todo!()
116+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
119117
}
120118
DataType::Timestamp(_type) => {
121-
todo!()
119+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
122120
}
123121
DataType::TimestampLTz(_type) => {
124-
todo!()
122+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
123+
}
124+
DataType::Array(_type) => {
125+
obj.insert(
126+
Self::FIELD_NAME_ELEMENT_TYPE.to_string(),
127+
_type.get_element_type().serialize_json()?,
128+
);
129+
}
130+
DataType::Map(_type) => {
131+
obj.insert(
132+
Self::FIELD_NAME_KEY_TYPE.to_string(),
133+
_type.key_type().serialize_json()?,
134+
);
135+
obj.insert(
136+
Self::FIELD_NAME_VALUE_TYPE.to_string(),
137+
_type.value_type().serialize_json()?,
138+
);
139+
}
140+
DataType::Row(_type) => {
141+
let fields: Vec<Value> = _type
142+
.fields()
143+
.iter()
144+
.map(|field| field.serialize_json())
145+
.collect::<Result<_>>()?;
146+
obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields));
125147
}
126-
DataType::Array(_type) => todo!(),
127-
DataType::Map(_type) => todo!(),
128-
DataType::Row(_type) => todo!(),
129148
}
130149
Ok(Value::Object(obj))
131150
}
@@ -150,18 +169,85 @@ impl JsonSerde for DataType {
150169
"BIGINT" => DataTypes::bigint(),
151170
"FLOAT" => DataTypes::float(),
152171
"DOUBLE" => DataTypes::double(),
153-
"CHAR" => todo!(),
172+
"CHAR" => {
173+
let length = node
174+
.get(Self::FIELD_NAME_LENGTH)
175+
.and_then(|v| v.as_u64())
176+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_LENGTH)))? as u32;
177+
DataTypes::char(length)
178+
}
154179
"STRING" => DataTypes::string(),
155-
"DECIMAL" => todo!(),
180+
"DECIMAL" => {
181+
let precision = node
182+
.get(Self::FIELD_NAME_PRECISION)
183+
.and_then(|v| v.as_u64())
184+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_PRECISION)))? as u32;
185+
let scale = node
186+
.get(Self::FIELD_NAME_SCALE)
187+
.and_then(|v| v.as_u64())
188+
.unwrap_or(0) as u32;
189+
DataTypes::decimal(precision, scale)
190+
}
156191
"DATE" => DataTypes::date(),
157-
"TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
158-
"TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
159-
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set separately
192+
"TIME_WITHOUT_TIME_ZONE" => {
193+
let precision = node
194+
.get(Self::FIELD_NAME_PRECISION)
195+
.and_then(|v| v.as_u64())
196+
.unwrap_or(0) as u32;
197+
DataTypes::time_with_precision(precision)
198+
},
199+
"TIMESTAMP_WITHOUT_TIME_ZONE" => {
200+
let precision = node
201+
.get(Self::FIELD_NAME_PRECISION)
202+
.and_then(|v| v.as_u64())
203+
.unwrap_or(6) as u32;
204+
DataTypes::timestamp_with_precision(precision)
205+
},
206+
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
207+
let precision = node
208+
.get(Self::FIELD_NAME_PRECISION)
209+
.and_then(|v| v.as_u64())
210+
.unwrap_or(6) as u32;
211+
DataTypes::timestamp_ltz_with_precision(precision)
212+
},
160213
"BYTES" => DataTypes::bytes(),
161-
"BINARY" => todo!(),
162-
"ARRAY" => todo!(),
163-
"MAP" => todo!(),
164-
"ROW" => todo!(),
214+
"BINARY" => {
215+
let length = node
216+
.get(Self::FIELD_NAME_LENGTH)
217+
.and_then(|v| v.as_u64())
218+
.unwrap_or(1) as usize;
219+
DataTypes::binary(length)
220+
},
221+
"ARRAY" => {
222+
let element_type_node = node
223+
.get(Self::FIELD_NAME_ELEMENT_TYPE)
224+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_ELEMENT_TYPE)))?;
225+
let element_type = DataType::deserialize_json(element_type_node)?;
226+
DataTypes::array(element_type)
227+
},
228+
"MAP" => {
229+
let key_type_node = node
230+
.get(Self::FIELD_NAME_KEY_TYPE)
231+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_KEY_TYPE)))?;
232+
let key_type = DataType::deserialize_json(key_type_node)?;
233+
let value_type_node = node
234+
.get(Self::FIELD_NAME_VALUE_TYPE)
235+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_VALUE_TYPE)))?;
236+
let value_type = DataType::deserialize_json(value_type_node)?;
237+
DataTypes::map(key_type, value_type)
238+
},
239+
"ROW" => {
240+
let fields_node = node
241+
.get(Self::FIELD_NAME_FIELDS)
242+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_FIELDS)))?
243+
.as_array()
244+
.ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::FIELD_NAME_FIELDS)))?;
245+
let mut fields = Vec::with_capacity(fields_node.len());
246+
for field_node in fields_node {
247+
fields.push(DataField::deserialize_json(field_node)?);
248+
}
249+
DataTypes::row(fields)
250+
},
165251
_ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))),
166252
};
167253

@@ -175,6 +261,51 @@ impl JsonSerde for DataType {
175261
}
176262
}
177263

264+
impl DataField {
265+
const NAME: &'static str = "name";
266+
const FIELD_TYPE: &'static str = "field_type";
267+
const DESCRIPTION: &'static str = "description";
268+
}
269+
270+
impl JsonSerde for DataField {
271+
fn serialize_json(&self) -> Result<Value> {
272+
let mut obj = serde_json::Map::new();
273+
274+
obj.insert(Self::NAME.to_string(), json!(self.name()));
275+
obj.insert(
276+
Self::FIELD_TYPE.to_string(),
277+
self.data_type.serialize_json()?,
278+
);
279+
280+
if let Some(description) = &self.description {
281+
obj.insert(Self::DESCRIPTION.to_string(), json!(description));
282+
}
283+
284+
Ok(Value::Object(obj))
285+
}
286+
287+
fn deserialize_json(node: &Value) -> Result<DataField> {
288+
let name = node
289+
.get(Self::NAME)
290+
.and_then(|v| v.as_str())
291+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
292+
.to_string();
293+
294+
let field_type_node = node
295+
.get(Self::FIELD_TYPE)
296+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE)))?;
297+
298+
let data_type = DataType::deserialize_json(field_type_node)?;
299+
300+
let description = node
301+
.get(Self::DESCRIPTION)
302+
.and_then(|v| v.as_str())
303+
.map(|s| s.to_string());
304+
305+
Ok(DataField::new(name, data_type, description))
306+
}
307+
}
308+
178309
impl Column {
179310
const NAME: &'static str = "name";
180311
const DATA_TYPE: &'static str = "data_type";
@@ -203,7 +334,7 @@ impl JsonSerde for Column {
203334
let name = node
204335
.get(Self::NAME)
205336
.and_then(|v| v.as_str())
206-
.unwrap_or_else(|| panic!("{}", format!("Missing required field: {}", Self::NAME)))
337+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
207338
.to_string();
208339

209340
let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| {
@@ -263,7 +394,7 @@ impl JsonSerde for Schema {
263394
JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME))
264395
})?
265396
.as_array()
266-
.ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::COLUMNS_NAME)))?;
397+
.ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?;
267398

268399
let mut columns = Vec::with_capacity(columns_node.len());
269400
for col_node in columns_node {
@@ -275,14 +406,14 @@ impl JsonSerde for Schema {
275406
if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
276407
let pk_array = pk_node
277408
.as_array()
278-
.ok_or_else(|| InvalidTableError("Primary key is not an array".to_string()))?;
409+
.ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?;
279410

280411
let mut primary_keys = Vec::with_capacity(pk_array.len());
281412
for name_node in pk_array {
282413
primary_keys.push(
283414
name_node
284415
.as_str()
285-
.ok_or_else(|| InvalidTableError("Primary key is not string".to_string()))?
416+
.ok_or_else(|| InvalidTableError("Primary key element must be a string".to_string()))?
286417
.to_string(),
287418
);
288419
}
@@ -308,15 +439,15 @@ impl TableDescriptor {
308439
fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> {
309440
let obj = node
310441
.as_object()
311-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?;
442+
.ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?;
312443

313444
let mut properties = HashMap::with_capacity(obj.len());
314445
for (key, value) in obj {
315446
properties.insert(
316447
key.clone(),
317448
value
318449
.as_str()
319-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?
450+
.ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))?
320451
.to_owned(),
321452
);
322453
}
@@ -384,7 +515,7 @@ impl JsonSerde for TableDescriptor {
384515
let comment = comment_node
385516
.as_str()
386517
.ok_or_else(|| {
387-
JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME))
518+
JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME))
388519
})?
389520
.to_owned();
390521
builder = builder.comment(comment.as_str());
@@ -400,7 +531,7 @@ impl JsonSerde for TableDescriptor {
400531
})?
401532
.as_array()
402533
.ok_or_else(|| {
403-
JsonSerdeError(format!("{} should be an array", Self::PARTITION_KEY_NAME))
534+
JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME))
404535
})?;
405536

406537
let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -409,7 +540,7 @@ impl JsonSerde for TableDescriptor {
409540
key_node
410541
.as_str()
411542
.ok_or_else(|| {
412-
JsonSerdeError(format!("{} should be a string", Self::PARTITION_KEY_NAME))
543+
JsonSerdeError(format!("{} element must be a string", Self::PARTITION_KEY_NAME))
413544
})?
414545
.to_owned(),
415546
);
@@ -420,14 +551,14 @@ impl JsonSerde for TableDescriptor {
420551
let mut bucket_keys = vec![];
421552
if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
422553
let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
423-
JsonSerdeError(format!("{} should be an array", Self::BUCKET_COUNT_NAME))
554+
JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME))
424555
})?;
425556

426557
for key_node in bucket_key_node {
427558
bucket_keys.push(
428559
key_node
429560
.as_str()
430-
.ok_or_else(|| JsonSerdeError("Bucket key should be a string".to_string()))?
561+
.ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))?
431562
.to_owned(),
432563
);
433564
}
@@ -462,3 +593,43 @@ impl JsonSerde for TableDescriptor {
462593
builder.build()
463594
}
464595
}
596+
597+
#[cfg(test)]
598+
mod tests {
599+
use super::*;
600+
use crate::metadata::DataTypes;
601+
602+
#[test]
603+
fn test_datatype_json_serde() {
604+
let data_types = vec![
605+
DataTypes::boolean(),
606+
DataTypes::tinyint(),
607+
DataTypes::smallint(),
608+
DataTypes::int().as_non_nullable(),
609+
DataTypes::bigint(),
610+
DataTypes::float(),
611+
DataTypes::double(),
612+
DataTypes::char(10),
613+
DataTypes::string(),
614+
DataTypes::decimal(10, 2),
615+
DataTypes::date(),
616+
DataTypes::time(),
617+
DataTypes::timestamp(),
618+
DataTypes::timestamp_ltz(),
619+
DataTypes::bytes(),
620+
DataTypes::binary(100),
621+
DataTypes::array(DataTypes::int()),
622+
DataTypes::map(DataTypes::string(), DataTypes::int()),
623+
DataTypes::row(vec![
624+
DataField::new("f1".to_string(), DataTypes::int(), None),
625+
DataField::new("f2".to_string(), DataTypes::string(), Some("desc".to_string())),
626+
]),
627+
];
628+
629+
for dt in data_types {
630+
let json = dt.serialize_json().unwrap();
631+
let deserialized = DataType::deserialize_json(&json).unwrap();
632+
assert_eq!(dt, deserialized);
633+
}
634+
}
635+
}

0 commit comments

Comments
 (0)