|
| 1 | +use arrow_array::{ |
| 2 | + builder::{ |
| 3 | + FixedSizeListBuilder, Float64Builder, Int64Builder, StringBuilder, |
| 4 | + TimestampNanosecondBuilder, UInt8Builder, |
| 5 | + }, |
| 6 | + Array, RecordBatch, |
| 7 | +}; |
| 8 | +use arrow_schema::{DataType, Field, TimeUnit}; |
| 9 | +use chrono::{DateTime, Utc}; |
| 10 | +use narrow::{array::StructArray, ArrayType}; |
| 11 | +use rand::{prelude::SmallRng, Rng, SeedableRng}; |
| 12 | +use std::sync::Arc; |
| 13 | + |
| 14 | +#[derive(ArrayType, Clone, Debug)] |
| 15 | +struct LineItem { |
| 16 | + l_orderkey: i64, |
| 17 | + l_partkey: i64, |
| 18 | + l_suppkey: i64, |
| 19 | + l_linenumber: i64, |
| 20 | + l_quantity: f64, |
| 21 | + l_extendedprice: f64, |
| 22 | + l_discount: f64, |
| 23 | + l_tax: f64, |
| 24 | + l_returnflag: u8, |
| 25 | + l_linestatus: u8, |
| 26 | + l_shipdate: DateTime<Utc>, |
| 27 | + l_commitdate: DateTime<Utc>, |
| 28 | + l_receiptdate: DateTime<Utc>, |
| 29 | + l_shipinstruct: [u8; 25], |
| 30 | + l_shipmode: [u8; 10], |
| 31 | + l_comment: String, |
| 32 | +} |
| 33 | + |
| 34 | +// Convert from an iterator of rows to an Arrow RecordBatch: |
| 35 | +fn make_recordbatch_narrow(rows: impl Iterator<Item = LineItem>) -> RecordBatch { |
| 36 | + rows.into_iter().collect::<StructArray<LineItem>>().into() |
| 37 | +} |
| 38 | + |
| 39 | +// Convert from an iterator of rows to an Arrow RecordBatch: |
| 40 | +struct LineItemBuilder { |
| 41 | + l_orderkey: Int64Builder, |
| 42 | + l_partkey: Int64Builder, |
| 43 | + l_suppkey: Int64Builder, |
| 44 | + l_linenumber: Int64Builder, |
| 45 | + l_quantity: Float64Builder, |
| 46 | + l_extendedprice: Float64Builder, |
| 47 | + l_discount: Float64Builder, |
| 48 | + l_tax: Float64Builder, |
| 49 | + l_returnflag: UInt8Builder, |
| 50 | + l_linestatus: UInt8Builder, |
| 51 | + l_shipdate: TimestampNanosecondBuilder, |
| 52 | + l_commitdate: TimestampNanosecondBuilder, |
| 53 | + l_receiptdate: TimestampNanosecondBuilder, |
| 54 | + l_shipinstruct: FixedSizeListBuilder<UInt8Builder>, |
| 55 | + l_shipmode: FixedSizeListBuilder<UInt8Builder>, |
| 56 | + l_comment: StringBuilder, |
| 57 | +} |
| 58 | + |
| 59 | +impl Default for LineItemBuilder { |
| 60 | + fn default() -> Self { |
| 61 | + Self { |
| 62 | + l_orderkey: Default::default(), |
| 63 | + l_partkey: Default::default(), |
| 64 | + l_suppkey: Default::default(), |
| 65 | + l_linenumber: Default::default(), |
| 66 | + l_quantity: Default::default(), |
| 67 | + l_extendedprice: Default::default(), |
| 68 | + l_discount: Default::default(), |
| 69 | + l_tax: Default::default(), |
| 70 | + l_returnflag: Default::default(), |
| 71 | + l_linestatus: Default::default(), |
| 72 | + l_shipdate: Default::default(), |
| 73 | + l_commitdate: Default::default(), |
| 74 | + l_receiptdate: Default::default(), |
| 75 | + l_shipinstruct: FixedSizeListBuilder::new(Default::default(), 25), |
| 76 | + l_shipmode: FixedSizeListBuilder::new(Default::default(), 10), |
| 77 | + l_comment: Default::default(), |
| 78 | + } |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +impl LineItemBuilder { |
| 83 | + fn append(&mut self, row: LineItem) { |
| 84 | + self.l_orderkey.append_value(row.l_orderkey); |
| 85 | + self.l_partkey.append_value(row.l_partkey); |
| 86 | + self.l_suppkey.append_value(row.l_suppkey); |
| 87 | + self.l_linenumber.append_value(row.l_linenumber); |
| 88 | + self.l_quantity.append_value(row.l_quantity); |
| 89 | + self.l_extendedprice.append_value(row.l_extendedprice); |
| 90 | + self.l_discount.append_value(row.l_discount); |
| 91 | + self.l_tax.append_value(row.l_tax); |
| 92 | + self.l_returnflag.append_value(row.l_returnflag); |
| 93 | + self.l_linestatus.append_value(row.l_linestatus); |
| 94 | + self.l_shipdate |
| 95 | + .append_option(row.l_shipdate.timestamp_nanos_opt()); |
| 96 | + self.l_commitdate |
| 97 | + .append_option(row.l_commitdate.timestamp_nanos_opt()); |
| 98 | + self.l_receiptdate |
| 99 | + .append_option(row.l_receiptdate.timestamp_nanos_opt()); |
| 100 | + self.l_shipinstruct |
| 101 | + .values() |
| 102 | + .append_values(&row.l_shipinstruct, &[true; 25]); |
| 103 | + self.l_shipinstruct.append(true); |
| 104 | + self.l_shipmode |
| 105 | + .values() |
| 106 | + .append_values(&row.l_shipmode, &[true; 10]); |
| 107 | + self.l_shipmode.append(true); |
| 108 | + self.l_comment.append_value(row.l_comment); |
| 109 | + } |
| 110 | + |
| 111 | + fn finish(mut self) -> RecordBatch { |
| 112 | + let utc: Arc<str> = Arc::from("UTC"); |
| 113 | + let schema = arrow_schema::Schema::new(vec![ |
| 114 | + // There is no API to build non-nullable arrays, or convert nullable arrays |
| 115 | + // to non-nullable arrays, so we just use nullable here. |
| 116 | + Field::new("l_orderkey", DataType::Int64, true), |
| 117 | + Field::new("l_partkey", DataType::Int64, true), |
| 118 | + Field::new("l_suppkey", DataType::Int64, true), |
| 119 | + Field::new("l_linenumber", DataType::Int64, true), |
| 120 | + Field::new("l_quantity", DataType::Float64, true), |
| 121 | + Field::new("l_extendedprice", DataType::Float64, true), |
| 122 | + Field::new("l_discount", DataType::Float64, true), |
| 123 | + Field::new("l_tax", DataType::Float64, true), |
| 124 | + Field::new("l_returnflag", DataType::UInt8, true), |
| 125 | + Field::new("l_linestatus", DataType::UInt8, true), |
| 126 | + Field::new( |
| 127 | + "l_shipdate", |
| 128 | + DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())), |
| 129 | + true, |
| 130 | + ), |
| 131 | + Field::new( |
| 132 | + "l_commitdate", |
| 133 | + DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())), |
| 134 | + true, |
| 135 | + ), |
| 136 | + Field::new( |
| 137 | + "l_receiptdate", |
| 138 | + DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())), |
| 139 | + true, |
| 140 | + ), |
| 141 | + Field::new( |
| 142 | + "l_shipinstruct", |
| 143 | + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt8, true)), 25), |
| 144 | + true, |
| 145 | + ), |
| 146 | + Field::new( |
| 147 | + "l_shipmode", |
| 148 | + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt8, true)), 10), |
| 149 | + true, |
| 150 | + ), |
| 151 | + Field::new("l_comment", DataType::Utf8, true), |
| 152 | + ]); |
| 153 | + |
| 154 | + let columns: Vec<Arc<dyn Array>> = vec![ |
| 155 | + Arc::new(self.l_orderkey.finish()), |
| 156 | + Arc::new(self.l_partkey.finish()), |
| 157 | + Arc::new(self.l_suppkey.finish()), |
| 158 | + Arc::new(self.l_linenumber.finish()), |
| 159 | + Arc::new(self.l_quantity.finish()), |
| 160 | + Arc::new(self.l_extendedprice.finish()), |
| 161 | + Arc::new(self.l_discount.finish()), |
| 162 | + Arc::new(self.l_tax.finish()), |
| 163 | + Arc::new(self.l_returnflag.finish()), |
| 164 | + Arc::new(self.l_linestatus.finish()), |
| 165 | + Arc::new(self.l_shipdate.with_timezone(utc.clone()).finish()), |
| 166 | + Arc::new(self.l_commitdate.with_timezone(utc.clone()).finish()), |
| 167 | + Arc::new(self.l_receiptdate.with_timezone(utc.clone()).finish()), |
| 168 | + Arc::new(self.l_shipinstruct.finish()), |
| 169 | + Arc::new(self.l_shipmode.finish()), |
| 170 | + Arc::new(self.l_comment.finish()), |
| 171 | + ]; |
| 172 | + |
| 173 | + RecordBatch::try_new(Arc::new(schema), columns).unwrap(/* typically handle errors here too */) |
| 174 | + } |
| 175 | +} |
| 176 | + |
| 177 | +fn make_recordbatch_arrow(rows: impl Iterator<Item = LineItem>) -> RecordBatch { |
| 178 | + let mut builder = LineItemBuilder::default(); |
| 179 | + rows.for_each(|row| builder.append(row)); |
| 180 | + builder.finish() |
| 181 | +} |
| 182 | + |
| 183 | +// Create some dummy rows of a given size. |
| 184 | +fn make_native_row_oriented(size: usize) -> Vec<LineItem> { |
| 185 | + let mut rng = SmallRng::seed_from_u64(0); |
| 186 | + |
| 187 | + (0..size) |
| 188 | + .map(|_| LineItem { |
| 189 | + l_orderkey: rng.gen_range(0..i64::MAX), |
| 190 | + l_partkey: rng.gen_range(0..i64::MAX), |
| 191 | + l_suppkey: rng.gen_range(0..i64::MAX), |
| 192 | + l_linenumber: rng.gen_range(0..i64::MAX), |
| 193 | + l_quantity: rng.gen_range(0f64..42f64), |
| 194 | + l_extendedprice: rng.gen_range(0f64..1337f64), |
| 195 | + l_discount: rng.gen_range(0f64..0.1), |
| 196 | + l_tax: rng.gen_range(0f64..0.3), |
| 197 | + l_returnflag: rng.gen_range(0..u8::MAX), |
| 198 | + l_linestatus: rng.gen_range(0..u8::MAX), |
| 199 | + l_shipdate: DateTime::from_timestamp_nanos(rng.gen_range(0..i64::MAX)), |
| 200 | + l_commitdate: DateTime::from_timestamp_nanos(rng.gen_range(0..i64::MAX)), |
| 201 | + l_receiptdate: DateTime::from_timestamp_nanos(rng.gen_range(0..i64::MAX)), |
| 202 | + l_shipinstruct: [rng.gen_range(0..u8::MAX); 25], |
| 203 | + l_shipmode: [rng.gen_range(0..u8::MAX); 10], |
| 204 | + l_comment: String::from_iter( |
| 205 | + (0..rng.gen_range(0..44)).map(|_| rng.gen_range('a'..='z')), |
| 206 | + ), |
| 207 | + }) |
| 208 | + .collect() |
| 209 | +} |
| 210 | + |
| 211 | +const NUM_ROWS: usize = 1 << 20; |
| 212 | + |
| 213 | +#[rustversion::attr(nightly, allow(non_local_definitions))] |
| 214 | +fn main() { |
| 215 | + let input = make_native_row_oriented(NUM_ROWS); |
| 216 | + |
| 217 | + let narrow = make_recordbatch_narrow(input.clone().into_iter()); |
| 218 | + let arrow = make_recordbatch_arrow(input.into_iter()); |
| 219 | + |
| 220 | + // Since nullability differs in the schemas, we can't really compare the entire |
| 221 | + // RecordBatch without doing additional work in removing nullability. |
| 222 | + // assert_eq!(narrow, arrow); |
| 223 | + |
| 224 | + assert_eq!(narrow.num_rows(), arrow.num_rows()); |
| 225 | + assert_eq!(narrow.num_columns(), arrow.num_columns()); |
| 226 | +} |
0 commit comments