Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: json conversion #4893

Merged
merged 13 commits into from
Oct 29, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
jsonb = { git = "https://github.com/datafuselabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4"
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid JSON text: {}", value))]
InvalidJson {
value: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Value exceeds the precision {} bound", precision))]
ValueExceedsPrecision {
precision: u8,
Expand Down Expand Up @@ -222,7 +229,8 @@ impl ErrorExt for Error {
| DefaultValueType { .. }
| DuplicateMeta { .. }
| InvalidTimestampPrecision { .. }
| InvalidPrecisionOrScale { .. } => StatusCode::InvalidArguments,
| InvalidPrecisionOrScale { .. }
| InvalidJson { .. } => StatusCode::InvalidArguments,

ValueExceedsPrecision { .. }
| CastType { .. }
Expand Down
62 changes: 62 additions & 0 deletions src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,36 @@ impl BinaryVector {
pub(crate) fn as_arrow(&self) -> &dyn Array {
&self.array
}

/// Creates a new binary vector of JSONB from a binary vector.
/// The binary vector must contain valid JSON strings.
pub fn convert_binary_to_json(&self) -> Result<BinaryVector> {
let arrow_array = self.to_arrow_array();
let mut vector = vec![];
for binary in arrow_array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
{
let jsonb = if let Some(binary) = binary {
match jsonb::from_slice(binary) {
Ok(jsonb) => Some(jsonb.to_vec()),
Err(_) => {
let s = String::from_utf8_lossy(binary);
return error::InvalidJsonSnafu {
value: s.to_string(),
}
.fail();
}
}
} else {
None
};
vector.push(jsonb);
}
Ok(BinaryVector::from(vector))
}
}

impl From<BinaryArray> for BinaryVector {
Expand Down Expand Up @@ -383,4 +413,36 @@ mod tests {
assert_eq!(b"four", vector.get_data(3).unwrap());
assert_eq!(builder.len(), 4);
}

#[test]
fn test_binary_json_conversion() {
let json_strings = vec![
b"{\"hello\": \"world\"}".to_vec(),
b"{\"foo\": 1}".to_vec(),
b"123".to_vec(),
];
let json_vector = BinaryVector::from(json_strings.clone())
.convert_binary_to_json()
.unwrap();
let jsonbs = json_strings
.iter()
.map(|v| jsonb::parse_value(v).unwrap().to_vec())
.collect::<Vec<_>>();
for i in 0..3 {
assert_eq!(
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
jsonbs.get(i).unwrap().as_slice()
);
}

let json_vector = BinaryVector::from(jsonbs.clone())
.convert_binary_to_json()
.unwrap();
for i in 0..3 {
assert_eq!(
json_vector.get_ref(i).as_binary().unwrap().unwrap(),
jsonbs.get(i).unwrap().as_slice()
);
}
}
}
8 changes: 8 additions & 0 deletions src/datatypes/src/vectors/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod find_unique;
mod replicate;
mod take;

use std::sync::Arc;

use common_base::BitVec;

use crate::error::{self, Result};
Expand Down Expand Up @@ -89,6 +91,12 @@ macro_rules! impl_scalar_vector_op {
}

fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
if to_type == &ConcreteDataType::json_datatype() {
if let Some(vector) = self.as_any().downcast_ref::<BinaryVector>() {
let json_vector = vector.convert_binary_to_json()?;
return Ok(Arc::new(json_vector) as VectorRef);
}
}
cast::cast_non_constant!(self, to_type)
}

Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ pub(super) fn parameters_to_scalar_values(
if let Some(server_type) = &server_type {
match server_type {
ConcreteDataType::Binary(_) => {
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
}
_ => {
return Err(invalid_parameter_error(
Expand All @@ -971,7 +971,7 @@ pub(super) fn parameters_to_scalar_values(
}
}
} else {
ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec()))
ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
}
}
_ => Err(invalid_parameter_error(
Expand Down
35 changes: 30 additions & 5 deletions tests-integration/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.unwrap();

sqlx::query(
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)",
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null)",
)
.execute(&pool)
.await
Expand All @@ -158,18 +158,30 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
let jsons = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(jsons)
.execute(&pool)
.await
.unwrap();
}

let rows = sqlx::query("select i, d, dt, b from demo")
let rows = sqlx::query("select i, d, dt, b, j from demo")
.fetch_all(&pool)
.await
.unwrap();
Expand All @@ -180,6 +192,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let d: NaiveDate = row.get("d");
let dt: DateTime<Utc> = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
Expand All @@ -194,6 +207,18 @@ pub async fn test_mysql_crud(store_type: StorageType) {
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
);
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json, expected_j);
}

let rows = sqlx::query("select i from demo where i=?")
Expand Down Expand Up @@ -396,7 +421,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis();
let bytes = "hello".as_bytes();
let json = serde_json::json!({
"code": 200,
"code": i,
"success": true,
"payload": {
"features": [
Expand Down Expand Up @@ -444,7 +469,7 @@ pub async fn test_postgres_crud(store_type: StorageType) {
assert_eq!("hello".as_bytes(), bytes);

let expected_j = serde_json::json!({
"code": 200,
"code": i,
"success": true,
"payload": {
"features": [
Expand Down