Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions crates/fluss/src/row/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum Datum<'a> {
#[display("{0}")]
Bool(bool),
#[display("{0}")]
Int8(i8),
#[display("{0}")]
Int16(i16),
#[display("{0}")]
Int32(i32),
Expand Down Expand Up @@ -78,6 +80,13 @@ impl Datum<'_> {
_ => panic!("not a string: {self:?}"),
}
}

pub fn as_blob(&self) -> &[u8] {
match self {
Self::Blob(blob) => blob.as_ref(),
_ => panic!("not a blob: {self:?}"),
}
}
}

// ----------- implement from
Expand All @@ -95,6 +104,20 @@ impl<'a> From<i64> for Datum<'a> {
}
}

impl<'a> From<i8> for Datum<'a> {
#[inline]
fn from(i: i8) -> Datum<'a> {
Datum::Int8(i)
}
}

impl<'a> From<i16> for Datum<'a> {
#[inline]
fn from(i: i16) -> Datum<'a> {
Datum::Int16(i)
}
}

impl<'a> From<&'a str> for Datum<'a> {
#[inline]
fn from(s: &'a str) -> Datum<'a> {
Expand Down Expand Up @@ -134,6 +157,18 @@ impl TryFrom<&Datum<'_>> for i32 {
}
}

impl TryFrom<&Datum<'_>> for i16 {
type Error = ();

#[inline]
fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
match from {
Datum::Int16(i) => Ok(*i),
_ => Err(()),
}
}
}

impl TryFrom<&Datum<'_>> for i64 {
type Error = ();

Expand All @@ -146,6 +181,42 @@ impl TryFrom<&Datum<'_>> for i64 {
}
}

impl TryFrom<&Datum<'_>> for f32 {
type Error = ();

#[inline]
fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
match from {
Datum::Float32(f) => Ok(f.into_inner()),
_ => Err(()),
}
}
}

impl TryFrom<&Datum<'_>> for f64 {
type Error = ();

#[inline]
fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
match from {
Datum::Float64(f) => Ok(f.into_inner()),
_ => Err(()),
}
}
}

impl TryFrom<&Datum<'_>> for bool {
type Error = ();

#[inline]
fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
match from {
Datum::Bool(b) => Ok(*b),
_ => Err(()),
}
}
}

impl<'a> TryFrom<&Datum<'a>> for &'a str {
type Error = ();

Expand All @@ -158,6 +229,25 @@ impl<'a> TryFrom<&Datum<'a>> for &'a str {
}
}

impl TryFrom<&Datum<'_>> for i8 {
type Error = ();

#[inline]
fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
match from {
Datum::Int8(i) => Ok(*i),
_ => Err(()),
}
}
}

impl<'a> From<bool> for Datum<'a> {
#[inline]
fn from(b: bool) -> Datum<'a> {
Datum::Bool(b)
}
}

pub trait ToArrow {
fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
}
Expand All @@ -184,6 +274,7 @@ impl Datum<'_> {

match self {
Datum::Null => {
append_null_to_arrow!(Int8Builder);
append_null_to_arrow!(BooleanBuilder);
append_null_to_arrow!(Int16Builder);
append_null_to_arrow!(Int32Builder);
Expand All @@ -194,6 +285,7 @@ impl Datum<'_> {
append_null_to_arrow!(BinaryBuilder);
}
Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v),
Datum::Int32(v) => append_value_to_arrow!(Int32Builder, *v),
Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
Expand Down
40 changes: 24 additions & 16 deletions crates/fluss/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ impl<'a> InternalRow for GenericRow<'a> {
false
}

fn get_boolean(&self, _pos: usize) -> bool {
todo!()
fn get_boolean(&self, pos: usize) -> bool {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_byte(&self, _pos: usize) -> i8 {
todo!()
fn get_byte(&self, pos: usize) -> i8 {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_short(&self, _pos: usize) -> i16 {
todo!()
fn get_short(&self, pos: usize) -> i16 {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_int(&self, pos: usize) -> i32 {
Expand All @@ -105,28 +105,36 @@ impl<'a> InternalRow for GenericRow<'a> {
self.values.get(_pos).unwrap().try_into().unwrap()
}

fn get_float(&self, _pos: usize) -> f32 {
todo!()
fn get_float(&self, pos: usize) -> f32 {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_double(&self, _pos: usize) -> f64 {
todo!()
fn get_double(&self, pos: usize) -> f64 {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_char(&self, _pos: usize, _length: usize) -> String {
todo!()
fn get_char(&self, pos: usize, length: usize) -> String {
let value = self.get_string(pos);
if value.len() != length {
panic!(
"Length mismatch for fixed-size char: expected {}, got {}",
length,
value.len()
);
}
value.to_string()
}

fn get_string(&self, pos: usize) -> &str {
self.values.get(pos).unwrap().try_into().unwrap()
}

fn get_binary(&self, _pos: usize, _length: usize) -> Vec<u8> {
todo!()
fn get_binary(&self, pos: usize, _length: usize) -> Vec<u8> {
self.values.get(pos).unwrap().as_blob().to_vec()
}

fn get_bytes(&self, _pos: usize) -> Vec<u8> {
todo!()
fn get_bytes(&self, pos: usize) -> Vec<u8> {
self.values.get(pos).unwrap().as_blob().to_vec()
}
}

Expand Down
Loading