Skip to content

Commit 840c462

Browse files
committed
refactor: Reduced duplication
1 parent b5422d0 commit 840c462

File tree

1 file changed

+146
-130
lines changed

1 file changed

+146
-130
lines changed

datafusion/functions-nested/src/extract.rs

Lines changed: 146 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
1919
2020
use arrow::array::{
21-
cast::AsArray, Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray,
22-
GenericListViewArray, Int64Array, MutableArrayData, NullArray, NullBufferBuilder,
23-
OffsetSizeTrait,
21+
cast::AsArray, Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray,
22+
Int64Array, MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
2423
};
2524
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
2625
use arrow::datatypes::DataType;
@@ -529,6 +528,81 @@ where
529528
}
530529
}
531530

531+
/// Describes how to realize a single row's slice once the bounds and stride
532+
/// have been normalized.
533+
enum SlicePlan<O: OffsetSizeTrait> {
534+
/// No values should be produced.
535+
Empty,
536+
/// A contiguous run starting at `start` (relative to the row) with `len`
537+
/// elements can be copied in one go.
538+
Contiguous { start: O, len: O },
539+
/// Arbitrary positions (already relative to the row) must be copied in
540+
/// sequence.
541+
Indices(Vec<O>),
542+
}
543+
544+
fn compute_slice_plan<O: OffsetSizeTrait>(
545+
len: O,
546+
from_raw: i64,
547+
to_raw: i64,
548+
stride_raw: Option<i64>,
549+
) -> Result<SlicePlan<O>>
550+
where
551+
i64: TryInto<O>,
552+
{
553+
if len == O::usize_as(0) {
554+
return Ok(SlicePlan::Empty);
555+
}
556+
557+
let from_index = adjusted_from_index::<O>(from_raw, len)?;
558+
let to_index = adjusted_to_index::<O>(to_raw, len)?;
559+
560+
let (Some(from), Some(to)) = (from_index, to_index) else {
561+
return Ok(SlicePlan::Empty);
562+
};
563+
564+
let stride_value = stride_raw.unwrap_or(1);
565+
if stride_value == 0 {
566+
return exec_err!(
567+
"array_slice got invalid stride: {:?}, it cannot be 0",
568+
stride_value
569+
);
570+
}
571+
572+
if (from < to && stride_value.is_negative())
573+
|| (from > to && stride_value.is_positive())
574+
{
575+
return Ok(SlicePlan::Empty);
576+
}
577+
578+
let stride: O = stride_value.try_into().map_err(|_| {
579+
internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
580+
})?;
581+
582+
if from <= to && stride_value.is_positive() {
583+
if stride_value == 1 {
584+
let len = to - from + O::usize_as(1);
585+
Ok(SlicePlan::Contiguous { start: from, len })
586+
} else {
587+
let mut indices = Vec::new();
588+
let mut index = from;
589+
while index <= to {
590+
indices.push(index);
591+
index += stride;
592+
}
593+
Ok(SlicePlan::Indices(indices))
594+
}
595+
} else {
596+
let mut indices = Vec::new();
597+
let mut index = from;
598+
while index >= to {
599+
indices.push(index);
600+
index += stride;
601+
}
602+
Ok(SlicePlan::Indices(indices))
603+
}
604+
}
605+
532606
fn general_array_slice<O: OffsetSizeTrait>(
533607
array: &GenericListArray<O>,
534608
from_array: &Int64Array,
@@ -545,6 +619,9 @@ where
545619
let mut mutable =
546620
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
547621

622+
// We have the slice syntax compatible with DuckDB v0.8.1.
623+
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
624+
548625
let mut offsets = vec![O::usize_as(0)];
549626
let mut null_builder = NullBufferBuilder::new(array.len());
550627

@@ -571,72 +648,32 @@ where
571648
continue;
572649
}
573650

574-
let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
575-
let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
576-
577-
if let (Some(from), Some(to)) = (from_index, to_index) {
578-
let stride = stride.map(|s| s.value(row_index));
579-
// Default stride is 1 if not provided
580-
let stride = stride.unwrap_or(1);
581-
if stride.is_zero() {
582-
return exec_err!(
583-
"array_slice got invalid stride: {:?}, it cannot be 0",
584-
stride
585-
);
586-
} else if (from < to && stride.is_negative())
587-
|| (from > to && stride.is_positive())
588-
{
589-
// return empty array
590-
offsets.push(offsets[row_index]);
591-
continue;
651+
let slice_plan = compute_slice_plan::<O>(
652+
len,
653+
from_array.value(row_index),
654+
to_array.value(row_index),
655+
stride.map(|s| s.value(row_index)),
656+
)?;
657+
658+
match slice_plan {
659+
SlicePlan::Empty => offsets.push(offsets[row_index]),
660+
SlicePlan::Contiguous {
661+
start: rel_start,
662+
len: slice_len,
663+
} => {
664+
let start_index = (start + rel_start).to_usize().unwrap();
665+
let end_index = (start + rel_start + slice_len).to_usize().unwrap();
666+
mutable.extend(0, start_index, end_index);
667+
offsets.push(offsets[row_index] + slice_len);
592668
}
593-
594-
let stride: O = stride.try_into().map_err(|_| {
595-
internal_datafusion_err!("array_slice got invalid stride: {}", stride)
596-
})?;
597-
598-
if from <= to && stride > O::zero() {
599-
assert!(start + to <= end);
600-
if stride.eq(&O::one()) {
601-
// stride is default to 1
602-
mutable.extend(
603-
0,
604-
(start + from).to_usize().unwrap(),
605-
(start + to + O::usize_as(1)).to_usize().unwrap(),
606-
);
607-
offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
608-
continue;
609-
}
610-
let mut index = start + from;
611-
let mut cnt = 0;
612-
while index <= start + to {
613-
mutable.extend(
614-
0,
615-
index.to_usize().unwrap(),
616-
index.to_usize().unwrap() + 1,
617-
);
618-
index += stride;
619-
cnt += 1;
669+
SlicePlan::Indices(indices) => {
670+
let count = indices.len();
671+
for rel_index in indices {
672+
let absolute_index = (start + rel_index).to_usize().unwrap();
673+
mutable.extend(0, absolute_index, absolute_index + 1);
620674
}
621-
offsets.push(offsets[row_index] + O::usize_as(cnt));
622-
} else {
623-
let mut index = start + from;
624-
let mut cnt = 0;
625-
while index >= start + to {
626-
mutable.extend(
627-
0,
628-
index.to_usize().unwrap(),
629-
index.to_usize().unwrap() + 1,
630-
);
631-
index += stride;
632-
cnt += 1;
633-
}
634-
// invalid range, return empty array
635-
offsets.push(offsets[row_index] + O::usize_as(cnt));
675+
offsets.push(offsets[row_index] + O::usize_as(count));
636676
}
637-
} else {
638-
// invalid range, return empty array
639-
offsets.push(offsets[row_index]);
640677
}
641678
}
642679

@@ -666,12 +703,15 @@ where
666703
let mut mutable =
667704
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
668705

706+
// We must build `offsets` and `sizes` buffers manually as ListView does not enforce
707+
// monotonically increasing offsets.
669708
let mut offsets = Vec::with_capacity(array.len());
670709
let mut sizes = Vec::with_capacity(array.len());
671710
let mut current_offset = O::usize_as(0);
672711
let mut null_builder = NullBufferBuilder::new(array.len());
673712

674713
for row_index in 0..array.len() {
714+
// Propagate NULL semantics: any NULL input yields a NULL output slot.
675715
if array.is_null(row_index)
676716
|| from_array.is_null(row_index)
677717
|| to_array.is_null(row_index)
@@ -692,79 +732,47 @@ where
692732
continue;
693733
}
694734

695-
let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
696-
let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
697-
698-
if let (Some(from), Some(to)) = (from_index, to_index) {
699-
let stride_value = stride.map(|s| s.value(row_index)).unwrap_or(1);
700-
if stride_value.is_zero() {
701-
return exec_err!(
702-
"array_slice got invalid stride: {:?}, it cannot be 0",
703-
stride_value
704-
);
705-
} else if (from < to && stride_value.is_negative())
706-
|| (from > to && stride_value.is_positive())
707-
{
735+
let slice_plan = compute_slice_plan::<O>(
736+
len,
737+
from_array.value(row_index),
738+
to_array.value(row_index),
739+
stride.map(|s| s.value(row_index)),
740+
)?;
741+
742+
let start = array.value_offset(row_index);
743+
match slice_plan {
744+
SlicePlan::Empty => {
708745
offsets.push(current_offset);
709746
sizes.push(O::usize_as(0));
710-
continue;
711747
}
712-
713-
let stride: O = stride_value.try_into().map_err(|_| {
714-
internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
715-
})?;
716-
717-
let start = array.value_offset(row_index);
718-
if from <= to && stride > O::zero() {
719-
if stride == O::one() {
720-
let start_index = (start + from).to_usize().unwrap();
721-
let end_index =
722-
(start + to + O::usize_as(1)).to_usize().unwrap();
723-
mutable.extend(0, start_index, end_index);
724-
let length = to - from + O::usize_as(1);
725-
offsets.push(current_offset);
726-
sizes.push(length);
727-
current_offset = current_offset + length;
728-
continue;
729-
}
730-
731-
let mut index = start + from;
732-
let mut cnt = 0usize;
733-
let end_index = start + to;
734-
while index <= end_index {
735-
let idx = index.to_usize().unwrap();
736-
mutable.extend(0, idx, idx + 1);
737-
index += stride;
738-
cnt += 1;
739-
}
740-
let length = O::usize_as(cnt);
748+
SlicePlan::Contiguous {
749+
start: rel_start,
750+
len: slice_len,
751+
} => {
752+
let start_index = (start + rel_start).to_usize().unwrap();
753+
let end_index = (start + rel_start + slice_len).to_usize().unwrap();
754+
mutable.extend(0, start_index, end_index);
741755
offsets.push(current_offset);
742-
sizes.push(length);
743-
current_offset = current_offset + length;
744-
} else {
745-
let mut index = start + from;
746-
let mut cnt = 0usize;
747-
let end_index = start + to;
748-
while index >= end_index {
749-
let idx = index.to_usize().unwrap();
750-
mutable.extend(0, idx, idx + 1);
751-
index += stride;
752-
cnt += 1;
756+
sizes.push(slice_len);
757+
current_offset += slice_len;
758+
}
759+
SlicePlan::Indices(indices) => {
760+
let count = indices.len();
761+
for rel_index in indices {
762+
let absolute_index = (start + rel_index).to_usize().unwrap();
763+
mutable.extend(0, absolute_index, absolute_index + 1);
753764
}
754-
let length = O::usize_as(cnt);
765+
let length = O::usize_as(count);
755766
offsets.push(current_offset);
756767
sizes.push(length);
757-
current_offset = current_offset + length;
768+
current_offset += length;
758769
}
759-
} else {
760-
offsets.push(current_offset);
761-
sizes.push(O::usize_as(0));
762770
}
763771
}
764772

765773
let data = mutable.freeze();
766774
let field = match array.data_type() {
767-
ListView(field) | LargeListView(field) => field.clone(),
775+
ListView(field) | LargeListView(field) => Arc::clone(field),
768776
other => {
769777
return Err(internal_datafusion_err!(
770778
"array_slice got unexpected data type: {}",
@@ -1194,8 +1202,12 @@ mod tests {
11941202
let from = Int64Array::from(vec![2, 1]);
11951203
let to = Int64Array::from(vec![3, 2]);
11961204

1197-
let result =
1198-
general_list_view_array_slice::<i32>(&array, &from, &to, None::<&Int64Array>)?;
1205+
let result = general_list_view_array_slice::<i32>(
1206+
&array,
1207+
&from,
1208+
&to,
1209+
None::<&Int64Array>,
1210+
)?;
11991211
let result = result.as_ref().as_list_view::<i32>();
12001212

12011213
assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]);
@@ -1214,8 +1226,12 @@ mod tests {
12141226
let from = Int64Array::from(vec![1, 1]);
12151227
let to = Int64Array::from(vec![2, 2]);
12161228

1217-
let result =
1218-
general_list_view_array_slice::<i32>(&array, &from, &to, None::<&Int64Array>)?;
1229+
let result = general_list_view_array_slice::<i32>(
1230+
&array,
1231+
&from,
1232+
&to,
1233+
None::<&Int64Array>,
1234+
)?;
12191235
let result = result.as_ref().as_list_view::<i32>();
12201236

12211237
assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]);

0 commit comments

Comments
 (0)