diff --git a/arrow-select/src/cleanup_non_empty_nulls.rs b/arrow-select/src/cleanup_non_empty_nulls.rs new file mode 100644 index 000000000000..136b97f8eb34 --- /dev/null +++ b/arrow-select/src/cleanup_non_empty_nulls.rs @@ -0,0 +1,873 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrite variable-length arrays so that null entries point to empty offset +//! ranges, while preserving the original null mask. +//! +//! Some variable-length array types (`Binary`, `Utf8`, `List`, `Map`, ...) can +//! legally hold null entries whose offsets still reference real bytes/values +//! in the underlying buffer. Iterating the child values exposes that data, +//! which is rarely what callers want. This module provides: +//! +//! * [`has_non_empty_nulls`] - cheap check for whether the array contains +//! any such null entries. +//! * [`cleanup_non_empty_nulls`] - produces an equivalent array where every +//! null entry has a zero-length offset range. + +use std::sync::Arc; + +use arrow_array::{ + Array, ArrayRef, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait, UInt32Array, + cast::AsArray, make_array, new_null_array, types::ByteArrayType, +}; +use arrow_buffer::{Buffer, OffsetBuffer}; +use arrow_schema::{ArrowError, DataType}; + +/// Return true if there are no nulls pointing to non-empty values. +/// +/// This will return true if [`cleanup_non_empty_nulls`] will do anything. +/// +/// This should be called before [`cleanup_non_empty_nulls`] to avoid unnecessary work. +pub fn has_non_empty_nulls(array: &dyn Array) -> Result { + Ok(match array.data_type() { + DataType::Binary => array.as_binary::().has_non_empty_nulls(), + DataType::LargeBinary => array.as_binary::().has_non_empty_nulls(), + DataType::Utf8 => array.as_string::().has_non_empty_nulls(), + DataType::LargeUtf8 => array.as_string::().has_non_empty_nulls(), + DataType::List(_) => array.as_list::().has_non_empty_nulls(), + DataType::LargeList(_) => array.as_list::().has_non_empty_nulls(), + DataType::Map(_, _) => array.as_map().has_non_empty_nulls(), + dt => { + return Err(ArrowError::InvalidArgumentError(format!( + "data type {dt:?} is not supported" + ))); + } + }) +} + +/// Create a new list/map/bytes array with the same nulls as the original list/map/bytes array but all the nulls +/// are pointing to an empty list/map/byte slice. +/// +/// Users should first check if [`has_non_empty_nulls`] returns true before calling this method +/// to avoid unnecessary work. +/// +/// This is useful when wanting to go over the list/map/bytes values +/// and not wanting to deal with values that are not used by the list/map/bytes. +pub fn cleanup_non_empty_nulls(array: ArrayRef) -> Result { + match array.data_type() { + DataType::Binary => array.as_binary::().cleanup_non_empty_nulls(), + DataType::LargeBinary => array.as_binary::().cleanup_non_empty_nulls(), + DataType::Utf8 => array.as_string::().cleanup_non_empty_nulls(), + DataType::LargeUtf8 => array.as_string::().cleanup_non_empty_nulls(), + DataType::List(_) => array.as_list::().cleanup_non_empty_nulls(), + DataType::LargeList(_) => array.as_list::().cleanup_non_empty_nulls(), + DataType::Map(_, _) => array.as_map().cleanup_non_empty_nulls(), + dt => Err(ArrowError::InvalidArgumentError(format!( + "data type {dt:?} is not supported" + ))), + } +} + +/// Helper trait to make it easier to implement the cleanup nulls for variable length with offsets +trait VariableLengthArrayExt: Array + Clone + Sized + 'static { + /// Get the offsets of the variable length array. + fn get_offsets(&self) -> &OffsetBuffer; + + fn has_non_empty_nulls(&self) -> bool { + self.get_offsets().has_non_empty_nulls(self.nulls()) + } + + /// Create a new list/map/bytes array with the same nulls as the original list/map/bytes array but all the nulls + /// are pointing to an empty list/map/byte slice. + /// + /// Users should first check if [`Self::has_non_empty_nulls`] returns true before calling this method + /// to avoid unnecessary work. + /// + /// This is useful when wanting to go over the list/map/bytes values + /// and not wanting to deal with values that are not used by the list/map/bytes. + fn cleanup_non_empty_nulls(&self) -> Result { + let Some(nulls) = self.nulls().filter(|n| n.null_count() > 0) else { + // If no nulls, return as is + return Ok(Arc::new(self.clone())); + }; + + // Find an empty value so we can use the `take` kernel + let index_of_empty_value_to_reuse: Option = self + .get_offsets() + .lengths() + .position(|length| length == 0) + .map(|index| index as u32); + + if let Some(index_of_empty_value) = index_of_empty_value_to_reuse { + let buffer = { + let iter = nulls.iter().enumerate().map(|(i, is_valid)| { + if is_valid { + i as u32 + } else { + index_of_empty_value + } + }); + + // SAFETY: upper bound is trusted because `iter` is just map over `nulls` + unsafe { Buffer::from_trusted_len_iter(iter) } + }; + + let cleanup_array = crate::take::take( + &self, + &UInt32Array::new(buffer.into(), None), + Some(crate::take::TakeOptions { + // The indices are derived from the length + check_bounds: false, + }), + )?; + + let array_data_with_correct_nulls = { + let builder = cleanup_array + .into_data() + .into_builder() + .nulls(self.nulls().cloned()); + unsafe { + // This is safe as we are only updating the nulls + builder.build_unchecked() + } + }; + + let array = make_array(array_data_with_correct_nulls); + + return Ok(array); + } + + // Create a new list with 1 null item so we can use the `interleave` kernel + let array_with_null_item = new_null_array(self.data_type(), 1); + + let interleave_indices = nulls + .iter() + .enumerate() + // If the value is null, we want to take the null item from `list_with_null_item` + .map(|(i, is_valid)| if is_valid { (0, i) } else { (1, 0) }) + .collect::>(); + + let cleaned_up_array = crate::interleave::interleave( + &[&self, &array_with_null_item], + interleave_indices.as_slice(), + )?; + + Ok(cleaned_up_array) + } +} + +impl VariableLengthArrayExt + for GenericListArray +{ + fn get_offsets(&self) -> &OffsetBuffer { + self.offsets() + } +} + +impl VariableLengthArrayExt for MapArray { + fn get_offsets(&self) -> &OffsetBuffer { + self.offsets() + } +} + +impl VariableLengthArrayExt for GenericByteArray { + fn get_offsets(&self) -> &OffsetBuffer { + self.offsets() + } +} + +#[cfg(test)] +mod tests { + use arrow_array::types::Int32Type; + use arrow_array::{ + BinaryArray, Int32Array, LargeBinaryArray, LargeStringArray, StringArray, StructArray, + UInt32Array, + }; + use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; + use arrow_schema::{Field, Fields}; + + use super::*; + use std::sync::Arc; + + /// Build a `MapArray` with the given offsets/nulls and `Int32` keys/values. + fn build_map_array( + offsets: OffsetBuffer, + keys: Int32Array, + values: Int32Array, + nulls: Option, + ) -> MapArray { + let entries_fields = Fields::from(vec![ + Field::new("keys", DataType::Int32, false), + Field::new("values", DataType::Int32, false), + ]); + let entries = StructArray::new( + entries_fields.clone(), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + None, + ); + let field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, + )); + MapArray::new(field, offsets, entries, nulls, false) + } + + // ===== All nulls already point to empty values ===== + // Cleanup should be a no-op (logically equivalent array). + + #[test] + fn cleanup_when_all_nulls_are_empty_binary() { + let values = Buffer::from(b"helloworld".as_slice()); + let offsets = OffsetBuffer::::from_lengths(vec![5, 0, 5]); + let nulls = NullBuffer::from(vec![true, false, true]); + let binary = BinaryArray::new(offsets, values, Some(nulls)); + let array: ArrayRef = Arc::new(binary.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_binary::(), &binary); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_large_binary() { + let values = Buffer::from(b"helloworld".as_slice()); + let offsets = OffsetBuffer::::from_lengths(vec![5, 0, 5]); + let nulls = NullBuffer::from(vec![true, false, true]); + let binary = LargeBinaryArray::new(offsets, values, Some(nulls)); + let array: ArrayRef = Arc::new(binary.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_binary::(), &binary); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_string() { + let values = Buffer::from(b"helloworld".as_slice()); + let offsets = OffsetBuffer::::from_lengths(vec![5, 0, 5]); + let nulls = NullBuffer::from(vec![true, false, true]); + let string = StringArray::new(offsets, values, Some(nulls)); + let array: ArrayRef = Arc::new(string.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_string::(), &string); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_large_string() { + let values = Buffer::from(b"helloworld".as_slice()); + let offsets = OffsetBuffer::::from_lengths(vec![5, 0, 5]); + let nulls = NullBuffer::from(vec![true, false, true]); + let string = LargeStringArray::new(offsets, values, Some(nulls)); + let array: ArrayRef = Arc::new(string.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_string::(), &string); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_list() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 0, 2]); + let nulls = NullBuffer::from(vec![true, false, true]); + + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list = GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + Some(nulls), + ); + let array: ArrayRef = Arc::new(list.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_list::(), &list); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_large_list() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 0, 2]); + let nulls = NullBuffer::from(vec![true, false, true]); + + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list = GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + Some(nulls), + ); + let array: ArrayRef = Arc::new(list.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_list::(), &list); + } + + #[test] + fn cleanup_when_all_nulls_are_empty_map() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 0, 2]); + let nulls = NullBuffer::from(vec![true, false, true]); + let map = build_map_array(offsets, keys, values, Some(nulls)); + let array: ArrayRef = Arc::new(map.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_map(), &map); + } + + // ===== No nulls at all ===== + // Cleanup should return the same array unchanged. + + #[test] + fn list_null_cleanup_on_list_with_no_nulls() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 2]); + + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + None, + )); + + let list_with_cleaned_nulls = cleanup_non_empty_nulls(Arc::clone(&list)).unwrap(); + let list_with_cleaned_nulls = list_with_cleaned_nulls.as_list::(); + + assert_eq!( + list.as_list::(), + list_with_cleaned_nulls, + "should have the same list" + ); + } + + #[test] + fn cleanup_when_no_nulls_binary() { + let binary = BinaryArray::from_iter_values([b"foo".as_slice(), b"bar", b"baz"]); + let array: ArrayRef = Arc::new(binary.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_binary::(), &binary); + } + + #[test] + fn cleanup_when_no_nulls_string() { + let string = StringArray::from_iter_values(["foo", "bar", "baz"]); + let array: ArrayRef = Arc::new(string.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_string::(), &string); + } + + #[test] + fn cleanup_when_no_nulls_map() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 0, 2]); + let map = build_map_array(offsets, keys, values, None); + let array: ArrayRef = Arc::new(map.clone()); + + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert_eq!(cleaned.as_map(), &map); + } + + // ===== Non-empty nulls exist, but only outside the sliced range ===== + // `has_non_empty_nulls` and `cleanup_non_empty_nulls` should respect the slice view. + + #[test] + fn cleanup_when_non_empty_nulls_outside_sliced_binary() { + // Full array: ["foo", NULL->"bar"(non-empty), "baz", NULL->""(empty), "qux"] + // Slicing to [2, 5) excludes the problematic null at index 1. + let values = Buffer::from(b"foobarbazqux".as_slice()); + // offsets: 0 3 6 9 9 12 + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 3, 6, 9, 9, 12])); + let nulls = NullBuffer::from(vec![true, false, true, false, true]); + let binary = BinaryArray::new(offsets, values, Some(nulls)); + let full: ArrayRef = Arc::new(binary); + + assert!( + has_non_empty_nulls(full.as_ref()).unwrap(), + "full array has a non-empty null" + ); + + let sliced = full.slice(2, 3); + assert!( + !has_non_empty_nulls(sliced.as_ref()).unwrap(), + "sliced view has no non-empty nulls" + ); + + let cleaned = cleanup_non_empty_nulls(Arc::clone(&sliced)).unwrap(); + assert_eq!(cleaned.as_binary::(), sliced.as_binary::()); + } + + #[test] + fn cleanup_when_non_empty_nulls_outside_sliced_list() { + // Same idea as the binary slice test, but for a `ListArray`. + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 2, 5, 7, 7, 9])); + let nulls = NullBuffer::from(vec![true, false, true, false, true]); + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list = GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + Some(nulls), + ); + let full: ArrayRef = Arc::new(list); + + assert!(has_non_empty_nulls(full.as_ref()).unwrap()); + + let sliced = full.slice(2, 3); + assert!(!has_non_empty_nulls(sliced.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(Arc::clone(&sliced)).unwrap(); + assert_eq!(cleaned.as_list::(), sliced.as_list::()); + } + + // ===== Some nulls point to non-empty, some point to empty ===== + + #[test] + fn list_cleanup_nulls_with_null_pointing_to_non_empty_list_and_have_empty_list() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 0, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true, true]); + + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + Some(input_nulls.clone()), + )); + + assert!(has_non_empty_nulls(list.as_ref()).unwrap()); + + let list_with_cleaned_nulls = cleanup_non_empty_nulls(list).unwrap(); + assert!(!has_non_empty_nulls(list_with_cleaned_nulls.as_ref()).unwrap()); + let list_with_cleaned_nulls = list_with_cleaned_nulls.as_list::(); + + let (field, offsets, values, nulls) = list_with_cleaned_nulls.clone().into_parts(); + + assert_eq!(field, list_field, "List field should not change"); + assert_eq!( + offsets.lengths().collect::>(), + vec![3, 0, 0, 2], + "nulls should point to zero length list" + ); + assert_eq!(values.as_ref(), &UInt32Array::from(vec![1, 2, 3, 8, 9])); + assert_eq!(nulls, Some(input_nulls), "Nulls should not change"); + } + + #[test] + fn binary_cleanup_nulls_with_null_pointing_to_non_empty_and_have_empty() { + // values: foo bar baz + // offsets: 0 3 6 6 9 + // nulls: T F T T + // Null at idx 1 points to "bar" (non-empty); null at idx 2 is empty already. + let values = Buffer::from(b"foobarbaz".as_slice()); + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 3, 6, 6, 9])); + let input_nulls = NullBuffer::from(vec![true, false, true, true]); + let binary = BinaryArray::new(offsets, values, Some(input_nulls.clone())); + let array: ArrayRef = Arc::new(binary); + + assert!(has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert!(!has_non_empty_nulls(cleaned.as_ref()).unwrap()); + let cleaned = cleaned.as_binary::(); + + // All nulls must now have zero length. + assert_eq!( + cleaned.offsets().lengths().collect::>(), + vec![3, 0, 0, 3] + ); + assert_eq!(cleaned.nulls(), Some(&input_nulls)); + assert_eq!(cleaned.value(0), b"foo"); + assert_eq!(cleaned.value(3), b"baz"); + } + + // ===== All nulls point to non-empty values ===== + + #[test] + fn list_cleanup_nulls_with_null_pointing_to_non_empty_list_and_does_not_have_empty_list() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true]); + + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + Arc::clone(&list_field), + offsets, + Arc::new(list_values), + Some(input_nulls.clone()), + )); + + assert!(has_non_empty_nulls(list.as_ref()).unwrap()); + + let list_with_cleaned_nulls = cleanup_non_empty_nulls(list).unwrap(); + assert!(!has_non_empty_nulls(list_with_cleaned_nulls.as_ref()).unwrap()); + let list_with_cleaned_nulls = list_with_cleaned_nulls.as_list::(); + + let (field, offsets, values, nulls) = list_with_cleaned_nulls.clone().into_parts(); + + assert_eq!(field, list_field, "List field should not change"); + assert_eq!( + offsets.lengths().collect::>(), + vec![3, 0, 2], + "nulls should point to zero length list" + ); + assert_eq!(values.as_ref(), &UInt32Array::from(vec![1, 2, 3, 8, 9])); + assert_eq!(nulls, Some(input_nulls), "Nulls should not change"); + } + + #[test] + fn binary_cleanup_when_all_nulls_point_to_non_empty_and_no_empty_exists() { + let values = Buffer::from(b"foobarbaz".as_slice()); + let offsets = OffsetBuffer::::from_lengths(vec![3, 3, 3]); + let input_nulls = NullBuffer::from(vec![true, false, true]); + let binary = BinaryArray::new(offsets, values, Some(input_nulls.clone())); + let array: ArrayRef = Arc::new(binary); + + assert!(has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert!(!has_non_empty_nulls(cleaned.as_ref()).unwrap()); + let cleaned = cleaned.as_binary::(); + + assert_eq!( + cleaned.offsets().lengths().collect::>(), + vec![3, 0, 3] + ); + assert_eq!(cleaned.nulls(), Some(&input_nulls)); + assert_eq!(cleaned.value(0), b"foo"); + assert_eq!(cleaned.value(2), b"baz"); + } + + #[test] + fn map_cleanup_when_all_nulls_point_to_non_empty() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true]); + let map = build_map_array(offsets, keys, values, Some(input_nulls.clone())); + let array: ArrayRef = Arc::new(map); + + assert!(has_non_empty_nulls(array.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + assert!(!has_non_empty_nulls(cleaned.as_ref()).unwrap()); + let cleaned = cleaned.as_map(); + + assert_eq!( + cleaned.offsets().lengths().collect::>(), + vec![3, 0, 2] + ); + assert_eq!(cleaned.nulls(), Some(&input_nulls)); + } + + // ===== Underlying child array is sliced ===== + // Cleanup must drop the child values that were only reachable through nulls. + + #[test] + fn list_cleanup_slices_underlying_values_with_empty_entry() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 0, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true, true]); + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + list_field, + offsets, + Arc::new(list_values), + Some(input_nulls), + )); + + let cleaned = cleanup_non_empty_nulls(list).unwrap(); + let cleaned = cleaned.as_list::(); + + // The null at index 1 originally referenced [4, 5, 6, 7]; those values + // should no longer appear in the underlying child array. + assert_eq!( + cleaned + .values() + .as_primitive::(), + &UInt32Array::from(vec![1, 2, 3, 8, 9]), + ); + } + + #[test] + fn list_cleanup_slices_underlying_values_without_empty_entry() { + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true]); + let list_field = Arc::new(Field::new("item", list_values.data_type().clone(), false)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + list_field, + offsets, + Arc::new(list_values), + Some(input_nulls), + )); + + let cleaned = cleanup_non_empty_nulls(list).unwrap(); + let cleaned = cleaned.as_list::(); + + assert_eq!( + cleaned + .values() + .as_primitive::(), + &UInt32Array::from(vec![1, 2, 3, 8, 9]), + ); + } + + #[test] + fn map_cleanup_slices_underlying_entries_with_empty_entry() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 0, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true, true]); + let map = build_map_array(offsets, keys, values, Some(input_nulls)); + let array: ArrayRef = Arc::new(map); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + let cleaned = cleaned.as_map(); + + assert_eq!( + cleaned.keys().as_primitive::(), + &Int32Array::from(vec![1, 2, 3, 8, 9]), + ); + assert_eq!( + cleaned.values().as_primitive::(), + &Int32Array::from(vec![10, 20, 30, 80, 90]), + ); + } + + #[test] + fn map_cleanup_slices_underlying_entries_without_empty_entry() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90]); + let offsets = OffsetBuffer::::from_lengths(vec![3, 4, 2]); + let input_nulls = NullBuffer::from(vec![true, false, true]); + let map = build_map_array(offsets, keys, values, Some(input_nulls)); + let array: ArrayRef = Arc::new(map); + + let cleaned = cleanup_non_empty_nulls(array).unwrap(); + let cleaned = cleaned.as_map(); + + assert_eq!( + cleaned.keys().as_primitive::(), + &Int32Array::from(vec![1, 2, 3, 8, 9]), + ); + assert_eq!( + cleaned.values().as_primitive::(), + &Int32Array::from(vec![10, 20, 30, 80, 90]), + ); + } + + // ===== Cleanup does not recurse into inner arrays ===== + + #[test] + fn list_cleanup_does_not_recurse_into_inner_string_nulls() { + // Inner string: the null at index 1 points to the bytes "bar" + // (a non-empty null at the inner level). + let inner_values = Buffer::from(b"foobarbazqux".as_slice()); + let inner_offsets = + OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 3, 6, 9, 9, 12])); + let inner_nulls = NullBuffer::from(vec![true, false, true, true, true]); + let inner = StringArray::new(inner_offsets, inner_values, Some(inner_nulls)); + assert!( + has_non_empty_nulls(&inner).unwrap(), + "sanity: inner string has a non-empty null" + ); + + // Outer list wrapping the inner string. Index 1 is an outer null whose + // offsets still cover real entries; index 2 is an empty entry so cleanup + // can use it as the substitute for the cleaned-up outer null. + let outer_offsets = OffsetBuffer::::from_lengths(vec![2, 2, 0, 1]); + let outer_nulls = NullBuffer::from(vec![true, false, true, true]); + let list_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let list: ArrayRef = Arc::new(GenericListArray::::new( + list_field, + outer_offsets, + Arc::new(inner), + Some(outer_nulls), + )); + assert!(has_non_empty_nulls(list.as_ref()).unwrap()); + + let cleaned = cleanup_non_empty_nulls(list).unwrap(); + + // Outer list is fully cleaned at its own level. + assert!(!has_non_empty_nulls(cleaned.as_ref()).unwrap()); + + // The inner string array still has a non-empty null - cleanup did not + // recurse into it. + let inner_after = cleaned.as_list::().values().as_string::(); + assert!( + has_non_empty_nulls(inner_after).unwrap(), + "inner non-empty nulls must be preserved" + ); + } + + // ===== Empty arrays (length 0) ===== + // Cleanup is a no-op and `has_non_empty_nulls` is false. + + fn assert_cleanup_is_noop_on_empty(array: ArrayRef) { + assert_eq!(array.len(), 0); + assert!(!has_non_empty_nulls(array.as_ref()).unwrap()); + let cleaned = cleanup_non_empty_nulls(Arc::clone(&array)).unwrap(); + assert_eq!(cleaned.len(), 0); + assert!(!has_non_empty_nulls(cleaned.as_ref()).unwrap()); + assert_eq!(cleaned.to_data(), array.to_data()); + } + + #[test] + fn cleanup_on_empty_binary() { + let binary = BinaryArray::new( + OffsetBuffer::::new_empty(), + Buffer::from(&[] as &[u8]), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(binary)); + } + + #[test] + fn cleanup_on_empty_large_binary() { + let binary = LargeBinaryArray::new( + OffsetBuffer::::new_empty(), + Buffer::from(&[] as &[u8]), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(binary)); + } + + #[test] + fn cleanup_on_empty_string() { + let string = StringArray::new( + OffsetBuffer::::new_empty(), + Buffer::from(&[] as &[u8]), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(string)); + } + + #[test] + fn cleanup_on_empty_large_string() { + let string = LargeStringArray::new( + OffsetBuffer::::new_empty(), + Buffer::from(&[] as &[u8]), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(string)); + } + + #[test] + fn cleanup_on_empty_list() { + let list_field = Arc::new(Field::new("item", DataType::UInt32, false)); + let list = GenericListArray::::new( + list_field, + OffsetBuffer::::new_empty(), + Arc::new(UInt32Array::from(Vec::::new())), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(list)); + } + + #[test] + fn cleanup_on_empty_large_list() { + let list_field = Arc::new(Field::new("item", DataType::UInt32, false)); + let list = GenericListArray::::new( + list_field, + OffsetBuffer::::new_empty(), + Arc::new(UInt32Array::from(Vec::::new())), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(list)); + } + + #[test] + fn cleanup_on_empty_map() { + let map = build_map_array( + OffsetBuffer::::new_empty(), + Int32Array::from(Vec::::new()), + Int32Array::from(Vec::::new()), + None, + ); + assert_cleanup_is_noop_on_empty(Arc::new(map)); + } + + // Empty arrays produced via slicing a non-empty array down to length 0. + + #[test] + fn cleanup_on_zero_length_slice_of_list_with_non_empty_nulls() { + // The full array has a non-empty null, but slicing to length 0 hides it. + let list_values = UInt32Array::from(vec![1, 2, 3, 4, 5]); + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 3, 5])); + let nulls = NullBuffer::from(vec![true, false]); + let list_field = Arc::new(Field::new("item", DataType::UInt32, false)); + let full: ArrayRef = Arc::new(GenericListArray::::new( + list_field, + offsets, + Arc::new(list_values), + Some(nulls), + )); + assert!(has_non_empty_nulls(full.as_ref()).unwrap()); + + let sliced = full.slice(0, 0); + assert_cleanup_is_noop_on_empty(sliced); + } + + #[test] + fn cleanup_on_zero_length_slice_of_map_with_non_empty_nulls() { + let keys = Int32Array::from(vec![1, 2, 3, 4, 5]); + let values = Int32Array::from(vec![10, 20, 30, 40, 50]); + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 3, 5])); + let nulls = NullBuffer::from(vec![true, false]); + let map = build_map_array(offsets, keys, values, Some(nulls)); + let full: ArrayRef = Arc::new(map); + assert!(has_non_empty_nulls(full.as_ref()).unwrap()); + + let sliced = full.slice(0, 0); + assert_cleanup_is_noop_on_empty(sliced); + } + + // ===== Unsupported types ===== + + #[test] + fn unsupported_type_returns_error() { + let array: ArrayRef = Arc::new(UInt32Array::from(vec![1, 2, 3])); + assert!(has_non_empty_nulls(array.as_ref()).is_err()); + assert!(cleanup_non_empty_nulls(array).is_err()); + } +} diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs index 33c1ee8ddb0a..5a04d9917518 100644 --- a/arrow-select/src/lib.rs +++ b/arrow-select/src/lib.rs @@ -24,6 +24,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![warn(missing_docs)] +pub mod cleanup_non_empty_nulls; pub mod coalesce; pub mod concat; pub mod dictionary; diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 8e56457ff0a5..8cde42da67ce 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -195,6 +195,11 @@ name = "zip_kernels" harness = false required-features = ["test_utils"] +[[bench]] +name = "cleanup_non_empty_nulls_kernel" +harness = false +required-features = ["test_utils"] + [[bench]] name = "length_kernel" harness = false diff --git a/arrow/benches/cleanup_non_empty_nulls_kernel.rs b/arrow/benches/cleanup_non_empty_nulls_kernel.rs new file mode 100644 index 000000000000..a40aaf58d7fc --- /dev/null +++ b/arrow/benches/cleanup_non_empty_nulls_kernel.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for the `cleanup_non_empty_nulls` kernel. +//! +//! The kernel rewrites a variable-length array so that its null entries point +//! to empty offset ranges. The interesting workload is therefore an array +//! whose null entries still reference real data in the values buffer - the +//! state these benches construct via `inject_nulls`. + +use std::hint; +use std::sync::Arc; + +use arrow::array::*; +use arrow::util::bench_util::*; +use arrow_buffer::{NullBuffer, OffsetBuffer}; +use arrow_schema::{DataType, Field}; +use arrow_select::cleanup_non_empty_nulls::{cleanup_non_empty_nulls, has_non_empty_nulls}; +use criterion::{Criterion, criterion_group, criterion_main}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const SIZE: usize = 8192; +const NULL_DENSITIES: &[f32] = &[0.1, 0.5]; +const SEED: u64 = 42; + +/// Build a random `NullBuffer` of the given length and null density. +fn random_nulls(len: usize, null_density: f32, seed: u64) -> NullBuffer { + let mut rng = StdRng::seed_from_u64(seed); + NullBuffer::from_iter((0..len).map(|_| rng.random::() >= null_density)) +} + +/// Overlay a fresh null mask on an array's existing offsets/values. The new +/// nulls land on top of positions whose offset ranges are non-empty - exactly +/// the case `cleanup_non_empty_nulls` exists to fix. +fn inject_nulls_string(array: StringArray, null_density: f32) -> StringArray { + let (offsets, values, _) = array.into_parts(); + let nulls = random_nulls(offsets.len() - 1, null_density, SEED + 1); + StringArray::new(offsets, values, Some(nulls)) +} + +fn inject_nulls_list(array: ListArray, null_density: f32) -> ListArray { + let (field, offsets, values, _) = array.into_parts(); + let nulls = random_nulls(offsets.len() - 1, null_density, SEED + 1); + ListArray::new(field, offsets, values, Some(nulls)) +} + +/// Build a `ListArray` of `size` entries with random lengths in `length_range`. +/// Set the range's lower bound to 1 to forbid empty entries; use 0 to allow them. +fn build_list_array(size: usize, length_range: std::ops::Range) -> ListArray { + let mut rng = StdRng::seed_from_u64(SEED); + let lengths: Vec = (0..size) + .map(|_| rng.random_range(length_range.clone())) + .collect(); + let total: usize = lengths.iter().sum(); + let values: arrow_array::Int32Array = (0..total as i32).collect(); + let offsets = OffsetBuffer::::from_lengths(lengths); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + ListArray::new(field, offsets, Arc::new(values), None) +} + +/// Build a string array containing no zero-length entries. +fn build_string_array_no_empty(size: usize) -> StringArray { + create_string_array_with_len_range_and_prefix_and_seed::(size, 0.0, 1, 16, "", SEED) +} + +fn bench_pair(c: &mut Criterion, name: &str, array: ArrayRef) { + c.bench_function(&format!("has_non_empty_nulls {name}"), |b| { + b.iter(|| hint::black_box(has_non_empty_nulls(array.as_ref()).unwrap())) + }); + c.bench_function(&format!("cleanup_non_empty_nulls {name}"), |b| { + b.iter(|| hint::black_box(cleanup_non_empty_nulls(Arc::clone(&array)).unwrap())) + }); +} + +fn add_benchmark(c: &mut Criterion) { + // For each type and null density, bench two input shapes: + // * "N% nulls" - array also contains some empty entries + // * "N% nulls, no empty entries" - every entry has length >= 1 + + for &density in NULL_DENSITIES { + let pct = (density * 100.0).round() as u32; + + // ----- String ----- + let string_mixed = inject_nulls_string(create_string_array::(SIZE, 0.0), density); + let string_no_empty = inject_nulls_string(build_string_array_no_empty(SIZE), density); + bench_pair(c, &format!("string {pct}% nulls"), Arc::new(string_mixed)); + bench_pair( + c, + &format!("string {pct}% nulls, no empty entries"), + Arc::new(string_no_empty), + ); + + // ----- List ----- + let list_mixed = inject_nulls_list(build_list_array(SIZE, 0..8), density); + let list_no_empty = inject_nulls_list(build_list_array(SIZE, 1..8), density); + bench_pair(c, &format!("list {pct}% nulls"), Arc::new(list_mixed)); + bench_pair( + c, + &format!("list {pct}% nulls, no empty entries"), + Arc::new(list_no_empty), + ); + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches);