diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index ed33b8b..3e48703 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -44,6 +44,8 @@ pub enum Datum<'a> { #[display("{0}")] Bool(bool), #[display("{0}")] + Int8(i8), + #[display("{0}")] Int16(i16), #[display("{0}")] Int32(i32), @@ -78,6 +80,13 @@ impl Datum<'_> { _ => panic!("not a string: {self:?}"), } } + + pub fn as_blob(&self) -> &[u8] { + match self { + Self::Blob(blob) => blob.as_ref(), + _ => panic!("not a blob: {self:?}"), + } + } } // ----------- implement from @@ -95,6 +104,20 @@ impl<'a> From for Datum<'a> { } } +impl<'a> From for Datum<'a> { + #[inline] + fn from(i: i8) -> Datum<'a> { + Datum::Int8(i) + } +} + +impl<'a> From for Datum<'a> { + #[inline] + fn from(i: i16) -> Datum<'a> { + Datum::Int16(i) + } +} + impl<'a> From<&'a str> for Datum<'a> { #[inline] fn from(s: &'a str) -> Datum<'a> { @@ -134,6 +157,18 @@ impl TryFrom<&Datum<'_>> for i32 { } } +impl TryFrom<&Datum<'_>> for i16 { + type Error = (); + + #[inline] + fn try_from(from: &Datum) -> std::result::Result { + match from { + Datum::Int16(i) => Ok(*i), + _ => Err(()), + } + } +} + impl TryFrom<&Datum<'_>> for i64 { type Error = (); @@ -146,6 +181,42 @@ impl TryFrom<&Datum<'_>> for i64 { } } +impl TryFrom<&Datum<'_>> for f32 { + type Error = (); + + #[inline] + fn try_from(from: &Datum) -> std::result::Result { + match from { + Datum::Float32(f) => Ok(f.into_inner()), + _ => Err(()), + } + } +} + +impl TryFrom<&Datum<'_>> for f64 { + type Error = (); + + #[inline] + fn try_from(from: &Datum) -> std::result::Result { + match from { + Datum::Float64(f) => Ok(f.into_inner()), + _ => Err(()), + } + } +} + +impl TryFrom<&Datum<'_>> for bool { + type Error = (); + + #[inline] + fn try_from(from: &Datum) -> std::result::Result { + match from { + Datum::Bool(b) => Ok(*b), + _ => Err(()), + } + } +} + impl<'a> TryFrom<&Datum<'a>> for &'a str { type Error = (); @@ -158,6 +229,25 @@ impl<'a> TryFrom<&Datum<'a>> for &'a str { } } +impl TryFrom<&Datum<'_>> for i8 { + type Error = (); + + #[inline] + fn try_from(from: &Datum) -> std::result::Result { + match from { + Datum::Int8(i) => Ok(*i), + _ => Err(()), + } + } +} + +impl<'a> From for Datum<'a> { + #[inline] + fn from(b: bool) -> Datum<'a> { + Datum::Bool(b) + } +} + pub trait ToArrow { fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>; } @@ -184,6 +274,7 @@ impl Datum<'_> { match self { Datum::Null => { + append_null_to_arrow!(Int8Builder); append_null_to_arrow!(BooleanBuilder); append_null_to_arrow!(Int16Builder); append_null_to_arrow!(Int32Builder); @@ -194,6 +285,7 @@ impl Datum<'_> { append_null_to_arrow!(BinaryBuilder); } Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v), + Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v), Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v), Datum::Int32(v) => append_value_to_arrow!(Int32Builder, *v), Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v), diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index aa2c411..a3b8885 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -85,16 +85,16 @@ impl<'a> InternalRow for GenericRow<'a> { false } - fn get_boolean(&self, _pos: usize) -> bool { - todo!() + fn get_boolean(&self, pos: usize) -> bool { + self.values.get(pos).unwrap().try_into().unwrap() } - fn get_byte(&self, _pos: usize) -> i8 { - todo!() + fn get_byte(&self, pos: usize) -> i8 { + self.values.get(pos).unwrap().try_into().unwrap() } - fn get_short(&self, _pos: usize) -> i16 { - todo!() + fn get_short(&self, pos: usize) -> i16 { + self.values.get(pos).unwrap().try_into().unwrap() } fn get_int(&self, pos: usize) -> i32 { @@ -105,28 +105,36 @@ impl<'a> InternalRow for GenericRow<'a> { self.values.get(_pos).unwrap().try_into().unwrap() } - fn get_float(&self, _pos: usize) -> f32 { - todo!() + fn get_float(&self, pos: usize) -> f32 { + self.values.get(pos).unwrap().try_into().unwrap() } - fn get_double(&self, _pos: usize) -> f64 { - todo!() + fn get_double(&self, pos: usize) -> f64 { + self.values.get(pos).unwrap().try_into().unwrap() } - fn get_char(&self, _pos: usize, _length: usize) -> String { - todo!() + fn get_char(&self, pos: usize, length: usize) -> String { + let value = self.get_string(pos); + if value.len() != length { + panic!( + "Length mismatch for fixed-size char: expected {}, got {}", + length, + value.len() + ); + } + value.to_string() } fn get_string(&self, pos: usize) -> &str { self.values.get(pos).unwrap().try_into().unwrap() } - fn get_binary(&self, _pos: usize, _length: usize) -> Vec { - todo!() + fn get_binary(&self, pos: usize, _length: usize) -> Vec { + self.values.get(pos).unwrap().as_blob().to_vec() } - fn get_bytes(&self, _pos: usize) -> Vec { - todo!() + fn get_bytes(&self, pos: usize) -> Vec { + self.values.get(pos).unwrap().as_blob().to_vec() } }