Skip to content

Commit e059a0e

Browse files
pavlosptampcode-com
andcommitted
Implement JsonSerde for all datatypes
- Fix typo in FIELD_NAME_SCALE constant - Implement JsonSerde for DataField - Complete serialize_json and deserialize_json for all DataType variants - Add comprehensive test for DataType json serialization/deserialization Resolves issue #39 Amp-Thread-ID: https://ampcode.com/threads/T-12d3046f-c788-4b3e-b1ae-6830b05aa926 Co-authored-by: Amp <[email protected]>
1 parent dffc59a commit e059a0e

File tree

1 file changed

+206
-39
lines changed

1 file changed

+206
-39
lines changed

crates/fluss/src/metadata/json_serde.rs

Lines changed: 206 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::error::Error::{InvalidTableError, JsonSerdeError};
1919
use crate::error::Result;
20-
use crate::metadata::datatype::{DataType, DataTypes};
20+
use crate::metadata::datatype::{DataField, DataType, DataTypes};
2121
use crate::metadata::table::{Column, Schema, TableDescriptor};
2222
use serde_json::{Value, json};
2323
use std::collections::HashMap;
@@ -58,24 +58,15 @@ impl DataType {
5858
const FIELD_NAME_TYPE_NAME: &'static str = "type";
5959
const FIELD_NAME_NULLABLE: &'static str = "nullable";
6060
const FIELD_NAME_LENGTH: &'static str = "length";
61-
#[allow(dead_code)]
6261
const FIELD_NAME_PRECISION: &'static str = "precision";
63-
#[allow(dead_code)]
64-
const FILED_NAME_SCALE: &'static str = "scale";
65-
#[allow(dead_code)]
62+
const FIELD_NAME_SCALE: &'static str = "scale";
6663
const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type";
67-
#[allow(dead_code)]
6864
const FIELD_NAME_KEY_TYPE: &'static str = "key_type";
69-
#[allow(dead_code)]
7065
const FIELD_NAME_VALUE_TYPE: &'static str = "value_type";
71-
#[allow(dead_code)]
7266
const FIELD_NAME_FIELDS: &'static str = "fields";
73-
#[allow(dead_code)]
7467
const FIELD_NAME_FIELD_NAME: &'static str = "name";
7568
// ROW
76-
#[allow(dead_code)]
7769
const FIELD_NAME_FIELD_TYPE: &'static str = "field_type";
78-
#[allow(dead_code)]
7970
const FIELD_NAME_FIELD_DESCRIPTION: &'static str = "description";
8071
}
8172

@@ -110,22 +101,46 @@ impl JsonSerde for DataType {
110101
DataType::Binary(_type) => {
111102
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
112103
}
104+
DataType::Char(_type) => {
105+
obj.insert(Self::FIELD_NAME_LENGTH.to_string(), json!(_type.length()));
106+
}
113107
DataType::Decimal(_type) => {
114-
todo!()
108+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
109+
obj.insert(Self::FIELD_NAME_SCALE.to_string(), json!(_type.scale()));
115110
}
116-
117111
DataType::Time(_type) => {
118-
todo!()
112+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
119113
}
120114
DataType::Timestamp(_type) => {
121-
todo!()
115+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
122116
}
123117
DataType::TimestampLTz(_type) => {
124-
todo!()
118+
obj.insert(Self::FIELD_NAME_PRECISION.to_string(), json!(_type.precision()));
119+
}
120+
DataType::Array(_type) => {
121+
obj.insert(
122+
Self::FIELD_NAME_ELEMENT_TYPE.to_string(),
123+
_type.get_element_type().serialize_json()?,
124+
);
125+
}
126+
DataType::Map(_type) => {
127+
obj.insert(
128+
Self::FIELD_NAME_KEY_TYPE.to_string(),
129+
_type.key_type().serialize_json()?,
130+
);
131+
obj.insert(
132+
Self::FIELD_NAME_VALUE_TYPE.to_string(),
133+
_type.value_type().serialize_json()?,
134+
);
135+
}
136+
DataType::Row(_type) => {
137+
let fields: Vec<Value> = _type
138+
.fields()
139+
.iter()
140+
.map(|field| field.serialize_json())
141+
.collect::<Result<_>>()?;
142+
obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields));
125143
}
126-
DataType::Array(_type) => todo!(),
127-
DataType::Map(_type) => todo!(),
128-
DataType::Row(_type) => todo!(),
129144
}
130145
Ok(Value::Object(obj))
131146
}
@@ -150,18 +165,85 @@ impl JsonSerde for DataType {
150165
"BIGINT" => DataTypes::bigint(),
151166
"FLOAT" => DataTypes::float(),
152167
"DOUBLE" => DataTypes::double(),
153-
"CHAR" => todo!(),
168+
"CHAR" => {
169+
let length = node
170+
.get(Self::FIELD_NAME_LENGTH)
171+
.and_then(|v| v.as_u64())
172+
.unwrap_or(1) as u32;
173+
DataTypes::char(length)
174+
},
154175
"STRING" => DataTypes::string(),
155-
"DECIMAL" => todo!(),
176+
"DECIMAL" => {
177+
let precision = node
178+
.get(Self::FIELD_NAME_PRECISION)
179+
.and_then(|v| v.as_u64())
180+
.unwrap_or(10) as u32;
181+
let scale = node
182+
.get(Self::FIELD_NAME_SCALE)
183+
.and_then(|v| v.as_u64())
184+
.unwrap_or(0) as u32;
185+
DataTypes::decimal(precision, scale)
186+
},
156187
"DATE" => DataTypes::date(),
157-
"TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
158-
"TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
159-
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set separately
188+
"TIME_WITHOUT_TIME_ZONE" => {
189+
let precision = node
190+
.get(Self::FIELD_NAME_PRECISION)
191+
.and_then(|v| v.as_u64())
192+
.unwrap_or(0) as u32;
193+
DataTypes::time_with_precision(precision)
194+
},
195+
"TIMESTAMP_WITHOUT_TIME_ZONE" => {
196+
let precision = node
197+
.get(Self::FIELD_NAME_PRECISION)
198+
.and_then(|v| v.as_u64())
199+
.unwrap_or(6) as u32;
200+
DataTypes::timestamp_with_precision(precision)
201+
},
202+
"TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
203+
let precision = node
204+
.get(Self::FIELD_NAME_PRECISION)
205+
.and_then(|v| v.as_u64())
206+
.unwrap_or(6) as u32;
207+
DataTypes::timestamp_ltz_with_precision(precision)
208+
},
160209
"BYTES" => DataTypes::bytes(),
161-
"BINARY" => todo!(),
162-
"ARRAY" => todo!(),
163-
"MAP" => todo!(),
164-
"ROW" => todo!(),
210+
"BINARY" => {
211+
let length = node
212+
.get(Self::FIELD_NAME_LENGTH)
213+
.and_then(|v| v.as_u64())
214+
.unwrap_or(1) as usize;
215+
DataTypes::binary(length)
216+
},
217+
"ARRAY" => {
218+
let element_type_node = node
219+
.get(Self::FIELD_NAME_ELEMENT_TYPE)
220+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_ELEMENT_TYPE)))?;
221+
let element_type = DataType::deserialize_json(element_type_node)?;
222+
DataTypes::array(element_type)
223+
},
224+
"MAP" => {
225+
let key_type_node = node
226+
.get(Self::FIELD_NAME_KEY_TYPE)
227+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_KEY_TYPE)))?;
228+
let key_type = DataType::deserialize_json(key_type_node)?;
229+
let value_type_node = node
230+
.get(Self::FIELD_NAME_VALUE_TYPE)
231+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_VALUE_TYPE)))?;
232+
let value_type = DataType::deserialize_json(value_type_node)?;
233+
DataTypes::map(key_type, value_type)
234+
},
235+
"ROW" => {
236+
let fields_node = node
237+
.get(Self::FIELD_NAME_FIELDS)
238+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_NAME_FIELDS)))?
239+
.as_array()
240+
.ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::FIELD_NAME_FIELDS)))?;
241+
let mut fields = Vec::with_capacity(fields_node.len());
242+
for field_node in fields_node {
243+
fields.push(DataField::deserialize_json(field_node)?);
244+
}
245+
DataTypes::row(fields)
246+
},
165247
_ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))),
166248
};
167249

@@ -175,6 +257,51 @@ impl JsonSerde for DataType {
175257
}
176258
}
177259

260+
impl DataField {
261+
const NAME: &'static str = "name";
262+
const FIELD_TYPE: &'static str = "field_type";
263+
const DESCRIPTION: &'static str = "description";
264+
}
265+
266+
impl JsonSerde for DataField {
267+
fn serialize_json(&self) -> Result<Value> {
268+
let mut obj = serde_json::Map::new();
269+
270+
obj.insert(Self::NAME.to_string(), json!(self.name()));
271+
obj.insert(
272+
Self::FIELD_TYPE.to_string(),
273+
self.data_type.serialize_json()?,
274+
);
275+
276+
if let Some(description) = &self.description {
277+
obj.insert(Self::DESCRIPTION.to_string(), json!(description));
278+
}
279+
280+
Ok(Value::Object(obj))
281+
}
282+
283+
fn deserialize_json(node: &Value) -> Result<DataField> {
284+
let name = node
285+
.get(Self::NAME)
286+
.and_then(|v| v.as_str())
287+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))?
288+
.to_string();
289+
290+
let field_type_node = node
291+
.get(Self::FIELD_TYPE)
292+
.ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE)))?;
293+
294+
let data_type = DataType::deserialize_json(field_type_node)?;
295+
296+
let description = node
297+
.get(Self::DESCRIPTION)
298+
.and_then(|v| v.as_str())
299+
.map(|s| s.to_string());
300+
301+
Ok(DataField::new(name, data_type, description))
302+
}
303+
}
304+
178305
impl Column {
179306
const NAME: &'static str = "name";
180307
const DATA_TYPE: &'static str = "data_type";
@@ -263,7 +390,7 @@ impl JsonSerde for Schema {
263390
JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME))
264391
})?
265392
.as_array()
266-
.ok_or_else(|| JsonSerdeError(format!("{} should be an array", Self::COLUMNS_NAME)))?;
393+
.ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?;
267394

268395
let mut columns = Vec::with_capacity(columns_node.len());
269396
for col_node in columns_node {
@@ -275,14 +402,14 @@ impl JsonSerde for Schema {
275402
if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
276403
let pk_array = pk_node
277404
.as_array()
278-
.ok_or_else(|| InvalidTableError("Primary key is not an array".to_string()))?;
405+
.ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?;
279406

280407
let mut primary_keys = Vec::with_capacity(pk_array.len());
281408
for name_node in pk_array {
282409
primary_keys.push(
283410
name_node
284411
.as_str()
285-
.ok_or_else(|| InvalidTableError("Primary key is not string".to_string()))?
412+
.ok_or_else(|| InvalidTableError("Primary key element must be a string".to_string()))?
286413
.to_string(),
287414
);
288415
}
@@ -307,16 +434,16 @@ impl TableDescriptor {
307434

308435
fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> {
309436
let obj = node
310-
.as_object()
311-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?;
437+
.as_object()
438+
.ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?;
312439

313440
let mut properties = HashMap::with_capacity(obj.len());
314441
for (key, value) in obj {
315442
properties.insert(
316443
key.clone(),
317444
value
318445
.as_str()
319-
.ok_or_else(|| JsonSerdeError("Properties should be an object".to_string()))?
446+
.ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))?
320447
.to_owned(),
321448
);
322449
}
@@ -384,7 +511,7 @@ impl JsonSerde for TableDescriptor {
384511
let comment = comment_node
385512
.as_str()
386513
.ok_or_else(|| {
387-
JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME))
514+
JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME))
388515
})?
389516
.to_owned();
390517
builder = builder.comment(comment.as_str());
@@ -400,7 +527,7 @@ impl JsonSerde for TableDescriptor {
400527
})?
401528
.as_array()
402529
.ok_or_else(|| {
403-
JsonSerdeError(format!("{} should be an array", Self::PARTITION_KEY_NAME))
530+
JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME))
404531
})?;
405532

406533
let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -409,7 +536,7 @@ impl JsonSerde for TableDescriptor {
409536
key_node
410537
.as_str()
411538
.ok_or_else(|| {
412-
JsonSerdeError(format!("{} should be a string", Self::PARTITION_KEY_NAME))
539+
JsonSerdeError(format!("{} element must be a string", Self::PARTITION_KEY_NAME))
413540
})?
414541
.to_owned(),
415542
);
@@ -420,14 +547,14 @@ impl JsonSerde for TableDescriptor {
420547
let mut bucket_keys = vec![];
421548
if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
422549
let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
423-
JsonSerdeError(format!("{} should be an array", Self::BUCKET_COUNT_NAME))
550+
JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME))
424551
})?;
425552

426553
for key_node in bucket_key_node {
427554
bucket_keys.push(
428555
key_node
429556
.as_str()
430-
.ok_or_else(|| JsonSerdeError("Bucket key should be a string".to_string()))?
557+
.ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))?
431558
.to_owned(),
432559
);
433560
}
@@ -462,3 +589,43 @@ impl JsonSerde for TableDescriptor {
462589
builder.build()
463590
}
464591
}
592+
593+
#[cfg(test)]
594+
mod tests {
595+
use super::*;
596+
use crate::metadata::DataTypes;
597+
598+
#[test]
599+
fn test_datatype_json_serde() {
600+
let data_types = vec![
601+
DataTypes::boolean(),
602+
DataTypes::tinyint(),
603+
DataTypes::smallint(),
604+
DataTypes::int(),
605+
DataTypes::bigint(),
606+
DataTypes::float(),
607+
DataTypes::double(),
608+
DataTypes::char(10),
609+
DataTypes::string(),
610+
DataTypes::decimal(10, 2),
611+
DataTypes::date(),
612+
DataTypes::time(),
613+
DataTypes::timestamp(),
614+
DataTypes::timestamp_ltz(),
615+
DataTypes::bytes(),
616+
DataTypes::binary(100),
617+
DataTypes::array(DataTypes::int()),
618+
DataTypes::map(DataTypes::string(), DataTypes::int()),
619+
DataTypes::row(vec![
620+
crate::metadata::DataField::new("f1".to_string(), DataTypes::int(), None),
621+
crate::metadata::DataField::new("f2".to_string(), DataTypes::string(), Some("desc".to_string())),
622+
]),
623+
];
624+
625+
for dt in data_types {
626+
let json = dt.serialize_json().unwrap();
627+
let deserialized = DataType::deserialize_json(&json).unwrap();
628+
assert_eq!(dt, deserialized);
629+
}
630+
}
631+
}

0 commit comments

Comments
 (0)