diff --git a/connectorx/src/destinations/arrow2/arrow_assoc.rs b/connectorx/src/destinations/arrow2/arrow_assoc.rs index db6d5854b..a7f0c3659 100644 --- a/connectorx/src/destinations/arrow2/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow2/arrow_assoc.rs @@ -289,6 +289,57 @@ impl ArrowAssoc for NaiveDate { } } +impl ArrowAssoc for Vec { + type Builder = MutableListArray>; + + fn builder(nrows: usize) -> Self::Builder { + MutableListArray::with_capacity(nrows) + } + + fn push(builder: &mut Self::Builder, value: Vec) { + let mut date_array: Vec> = vec![]; + for sub_value in value { + date_array.push(Some(naive_date_to_date32(sub_value))) + } + builder.try_push(Some(date_array)).unwrap(); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Date32, true) + } + +} + + +impl ArrowAssoc for Option> { + type Builder = MutableListArray>; + + fn builder(nrows: usize) -> Self::Builder { + MutableListArray::with_capacity(nrows) + } + + fn push(builder: &mut Self::Builder, value: Self) { + let mut date_array: Vec> = vec![]; + match value { + Some(value) => { + for sub_value in value { + date_array.push(Some(naive_date_to_date32(sub_value))) + } + let _ = builder.try_push(Some(date_array)); + } + None => { + let _ = builder.try_push(Some(date_array)); + } + }; + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Date32, false) + } +} + + + impl ArrowAssoc for Option { type Builder = MutablePrimitiveArray; diff --git a/connectorx/src/destinations/arrow2/typesystem.rs b/connectorx/src/destinations/arrow2/typesystem.rs index ffb222bce..9c23f871a 100644 --- a/connectorx/src/destinations/arrow2/typesystem.rs +++ b/connectorx/src/destinations/arrow2/typesystem.rs @@ -24,6 +24,7 @@ pub enum Arrow2TypeSystem { Float32Array(bool), Float64Array(bool), Utf8Array(bool), + DateArray32(bool), } impl_typesystem! { @@ -50,5 +51,6 @@ impl_typesystem! { { Float32Array => Vec } { Float64Array => Vec } { Utf8Array => Vec } + { DateArray32 => Vec} } } diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 0799d17af..4be8a3058 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -478,10 +478,11 @@ impl_produce!( NaiveTime, // NaiveDateTime, // DateTime, - // NaiveDate, + NaiveDate, Uuid, Value, Vec, + Vec, ); impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'a> { @@ -550,38 +551,6 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresBinarySourcePartitio } } -impl<'r, 'a> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'a> { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> NaiveDate { - let (ridx, cidx) = self.next_loc()?; - let row = &self.rowbuf[ridx]; - let val = row.try_get(cidx)?; - match val { - postgres::types::Date::PosInfinity => NaiveDate::MAX, - postgres::types::Date::NegInfinity => NaiveDate::MIN, - postgres::types::Date::Value(t) => t, - } - } -} - -impl<'r, 'a> Produce<'r, Option> for PostgresBinarySourcePartitionParser<'a> { - type Error = PostgresSourceError; - - #[throws(PostgresSourceError)] - fn produce(&'r mut self) -> Option { - let (ridx, cidx) = self.next_loc()?; - let row = &self.rowbuf[ridx]; - let val = row.try_get(cidx)?; - match val { - Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX), - Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN), - Some(postgres::types::Date::Value(t)) => t, - None => None - } - } -} impl<'r, 'a> Produce<'r, HashMap>> for PostgresBinarySourcePartitionParser<'a> @@ -806,6 +775,59 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { } } +impl<'r, 'a> Produce<'r, Vec> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec{ + let (ridx, cidx) = self.next_loc()?; + let s = &self.rowbuf[ridx][cidx][..]; + let naive_date_array: Vec = match s { + "{}" => vec![], + s => { + let mut date_array: Vec = vec![]; + let elements = s[1..s.len()-1].split(','); + for e in elements { + match NaiveDate::parse_from_str(e,"%Y-%m-%d") { + Ok(date) => {date_array.push(date)} + Err(_) => throw!(ConnectorXError::cannot_produce::(Some(s.into()))) + } + } + date_array + + } + }; + naive_date_array +} +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option>{ + let (ridx, cidx) = self.next_loc()?; + let s = &self.rowbuf[ridx][cidx][..]; + let naive_date_array: Vec = match s { + "{}" => vec![], + s => { + let mut date_array: Vec = vec![]; + let elements = s[1..s.len()-1].split(','); + for e in elements { + match NaiveDate::parse_from_str(e,"%Y-%m-%d") { + Ok(date) => {date_array.push(date)} + Err(_) => throw!(ConnectorXError::cannot_produce::(Some(s.into()))) + } + } + date_array + + } + }; + Some(naive_date_array) +} +} + + impl<'r, 'a> Produce<'r, Vec> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; @@ -1301,6 +1323,54 @@ impl<'r, 'a> Produce<'r, NaiveDate> for PostgresRawSourceParser<'a> { } } +impl<'r, 'a> Produce<'r, Vec> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Vec { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val: Vec> = row.try_get(cidx)?; + let mut values: Vec = vec![]; + for v in val { + match v { + postgres::types::Date::PosInfinity => {values.push(NaiveDate::MAX)} + postgres::types::Date::NegInfinity => {values.push(NaiveDate::MIN)} + postgres::types::Date::Value(t) => {values.push(t)} + + } + } + + + values + } + +} + +impl<'r, 'a> Produce<'r, Option>> for PostgresRawSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&'r mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let row = &self.rowbuf[ridx]; + let val: Vec> = row.try_get(cidx)?; + let mut values: Vec = vec![]; + for v in val { + match v { + postgres::types::Date::PosInfinity => {values.push(NaiveDate::MAX)} + postgres::types::Date::NegInfinity => {values.push(NaiveDate::MIN)} + postgres::types::Date::Value(t) => {values.push(t)} + + } + } + + + Some(values) + } + +} + impl<'r, 'a> Produce<'r, Option> for PostgresRawSourceParser<'a> { type Error = PostgresSourceError; @@ -1702,7 +1772,7 @@ macro_rules! impl_simple_vec_produce { )+ }; } -impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,); +impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,NaiveDate,); impl<'r> Produce<'r, Vec> for PostgresSimpleSourceParser { type Error = PostgresSourceError; diff --git a/connectorx/src/sources/postgres/typesystem.rs b/connectorx/src/sources/postgres/typesystem.rs index 5119c8dbf..21900281f 100644 --- a/connectorx/src/sources/postgres/typesystem.rs +++ b/connectorx/src/sources/postgres/typesystem.rs @@ -23,6 +23,7 @@ pub enum PostgresTypeSystem { Int8Array(bool), VarcharArray(bool), TextArray(bool), + DateArray(bool), Date(bool), Char(bool), BpChar(bool), @@ -57,6 +58,7 @@ impl_typesystem! { { Float8Array => Vec } { NumericArray => Vec } { VarcharArray | TextArray => Vec} + { DateArray => Vec} { Bool => bool } { Char => i8 } { Text | BpChar | VarChar | Enum | Name => &'r str } { ByteA => Vec } @@ -89,6 +91,7 @@ impl<'a> From<&'a Type> for PostgresTypeSystem { "_numeric" => NumericArray(true), "_varchar" => VarcharArray(true), "_text" => TextArray(true), + "_date" => DateArray(true), "bool" => Bool(true), "char" => Char(true), "text" | "citext" | "ltree" | "lquery" | "ltxtquery" | "name" => Text(true), diff --git a/connectorx/src/transports/postgres_arrow2.rs b/connectorx/src/transports/postgres_arrow2.rs index 7d3f31af0..1bcd20ce3 100644 --- a/connectorx/src/transports/postgres_arrow2.rs +++ b/connectorx/src/transports/postgres_arrow2.rs @@ -69,8 +69,9 @@ macro_rules! impl_postgres_transport { { Float4Array[Vec] => Float64Array[Vec] | conversion auto_vec } { Float8Array[Vec] => Float64Array[Vec] | conversion auto } { NumericArray[Vec] => Float64Array[Vec] | conversion option } - { VarcharArray[Vec] => Utf8Array[Vec] | conversion none } - { TextArray[Vec] => Utf8Array[Vec] | conversion auto } + { VarcharArray[Vec] => Utf8Array[Vec] | conversion none } + { TextArray[Vec] => Utf8Array[Vec] | conversion auto } + { DateArray[Vec] => DateArray32[Vec] | conversion option } } ); @@ -86,6 +87,15 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector); impl_postgres_transport!(SimpleProtocol, NoTls); impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); + +impl TypeConversion, Vec> for PostgresArrow2Transport { + fn convert(val: Vec) -> Vec { + val + } +} + + + impl TypeConversion for PostgresArrow2Transport { fn convert(val: Uuid) -> String { val.to_string() diff --git a/connectorx/tests/test_polars.rs b/connectorx/tests/test_polars.rs index 8fc63ebf3..fdf061905 100644 --- a/connectorx/tests/test_polars.rs +++ b/connectorx/tests/test_polars.rs @@ -1,3 +1,4 @@ +use chrono::NaiveDate; use connectorx::{ constants::RECORD_BATCH_SIZE, destinations::arrow2::Arrow2Destination, @@ -297,3 +298,34 @@ fn test_pg_pl_name() { println!("{:?}", df); assert_eq!(df, test_df); } + +fn test_pg_pl_date_array() { + let _ = env_logger::builder().is_test(true).try_init(); + let dburl = env::var("POSTGRES_URL").unwrap(); + let queries = [CXQuery::naked("select test_datearray from test_types")]; + let url = Url::parse(dburl.as_str()).unwrap(); + let (config, _tls) = rewrite_tls_args(&url).unwrap(); + let builder = PostgresSource::::new(config, NoTls, 2).unwrap(); + let mut destination = Arrow2Destination::new(); + let dispatcher = Dispatcher::<_, _, PostgresArrow2Transport>::new( + builder, + &mut destination, + &queries, + Some(format!("select * from test_types")), + ); + + dispatcher.run().expect("run dispatcher"); + + let s1: Vec = vec![NaiveDate::new("1970-01-01"), NaiveDate::new("2000-02-28")]; + let s2 = vec![NaiveDate::new("2038-01-18"), NaiveDate::new("2038-02-18")]; + let s3 = vec![]; + let s4 = vec![NaiveDate::new(""), NaiveDate::new("2038-01-18")]; + + let df: DataFrame = destination.polars().unwrap(); + let test_df: DataFrame = df!( + "test_textdatearray" => &[s1,s2,s3,s4] + ) + .unwrap(); + + assert_eq!(df, test_df); +} diff --git a/scripts/postgres.sql b/scripts/postgres.sql index 9b9bb1b04..0de222d20 100644 --- a/scripts/postgres.sql +++ b/scripts/postgres.sql @@ -85,12 +85,13 @@ CREATE TABLE IF NOT EXISTS test_types( test_varchararray VARCHAR[], test_textarray TEXT[], test_name NAME + test_datearray DATE[] ); -INSERT INTO test_types VALUES ('1970-01-01', '1970-01-01 00:00:01', '1970-01-01 00:00:01-00', 0, -9223372036854775808, NULL, NULL, 'a', 'a', NULL, '86b494cc-96b2-11eb-9298-3e22fbb9fe9d', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer","qty": 6}}', '{"product": "Beer","qty": 6}', NULL, 'happy','{}', '{}', '{}', '{true, false}', '{-1, 0, 1}', '{-1, 0, 1123}', '{-9223372036854775808, 9223372036854775807}', 'str_citext', 'A.B.C.D', '*.B.*', 'A & B*',ARRAY['str1','str2'],ARRAY['text1','text2'],'0'); -INSERT INTO test_types VALUES ('2000-02-28', '2000-02-28 12:00:10', '2000-02-28 12:00:10-04', 1, 0, 3.1415926535, 521.34, 'bb', 'b', 'bb', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d', NULL, '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}', '{"product": "Diaper","qty": 24}', 'Здра́вствуйте', 'very happy', NULL, NULL, NULL, '{}', '{}', '{}', '{}', '', 'A.B.E', 'A.*', 'A | B','{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}','{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}','21'); -INSERT INTO test_types VALUES ('2038-01-18', '2038-01-18 23:59:59', '2038-01-18 23:59:59+08', 2, 9223372036854775807, 2.71, '1e-130', 'ccc', NULL, 'c', '86b49c42-96b2-11eb-9298-3e22fbb9fe9d', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}', '{"product": "Toy Car","qty": 1}', '', 'ecstatic', '{123.123}', '{-1e-307, 1e308}', '{521.34}', '{true}', '{-32768, 32767}', '{-2147483648, 2147483647}', '{0}', 's', 'A', '*', 'A@',ARRAY['',' '],ARRAY['',' '],'someName'); -INSERT INTO test_types VALUES (NULL, NULL, NULL, 3, NULL, 0.00, -1e-37, NULL, 'd', 'defghijklm', NULL, '18:30:00', '3 year', NULL, NULL, '😜', NULL, '{-1e-37, 1e37}', '{0.000234, -12.987654321}', '{0.12, 333.33, 22.22}', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,'{}','{}','101203203-1212323-22131235'); +INSERT INTO test_types VALUES ('1970-01-01', '1970-01-01 00:00:01', '1970-01-01 00:00:01-00', 0, -9223372036854775808, NULL, NULL, 'a', 'a', NULL, '86b494cc-96b2-11eb-9298-3e22fbb9fe9d', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer","qty": 6}}', '{"product": "Beer","qty": 6}', NULL, 'happy','{}', '{}', '{}', '{true, false}', '{-1, 0, 1}', '{-1, 0, 1123}', '{-9223372036854775808, 9223372036854775807}', 'str_citext', 'A.B.C.D', '*.B.*', 'A & B*',ARRAY['str1','str2'],ARRAY['text1','text2'],'0',ARRAY['1970-01-01','2000-02-28']); +INSERT INTO test_types VALUES ('2000-02-28', '2000-02-28 12:00:10', '2000-02-28 12:00:10-04', 1, 0, 3.1415926535, 521.34, 'bb', 'b', 'bb', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d', NULL, '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}', '{"product": "Diaper","qty": 24}', 'Здра́вствуйте', 'very happy', NULL, NULL, NULL, '{}', '{}', '{}', '{}', '', 'A.B.E', 'A.*', 'A | B','{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}','{"0123456789","abcdefghijklmnopqrstuvwxyz","!@#$%^&*()_-+=~`:;<>?/"}','21','{"2038-01-18","2038-02-18"}'); +INSERT INTO test_types VALUES ('2038-01-18', '2038-01-18 23:59:59', '2038-01-18 23:59:59+08', 2, 9223372036854775807, 2.71, '1e-130', 'ccc', NULL, 'c', '86b49c42-96b2-11eb-9298-3e22fbb9fe9d', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}', '{"product": "Toy Car","qty": 1}', '', 'ecstatic', '{123.123}', '{-1e-307, 1e308}', '{521.34}', '{true}', '{-32768, 32767}', '{-2147483648, 2147483647}', '{0}', 's', 'A', '*', 'A@',ARRAY['',' '],ARRAY['',' '],'someName','{}'); +INSERT INTO test_types VALUES (NULL, NULL, NULL, 3, NULL, 0.00, -1e-37, NULL, 'd', 'defghijklm', NULL, '18:30:00', '3 year', NULL, NULL, '😜', NULL, '{-1e-37, 1e37}', '{0.000234, -12.987654321}', '{0.12, 333.33, 22.22}', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,'{}','{}','101203203-1212323-22131235',ARRAY[NULL, '2038-01-18']); CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ BEGIN