Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 300 additions & 20 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4719,6 +4719,14 @@ impl ScalarValue {
/// This can be relevant when `self` is a list or contains a list as a nested value, as
/// a single list holds an Arc to its entire original array buffer.
pub fn compact(&mut self) {
// copy_array_data + compact_view_buffers + downcast back, all in one step.
macro_rules! compact_array {
($arr:expr, $from_type:ty, $($as_method:tt)+) => {
*Arc::make_mut($arr) = ScalarValue::compact_view_buffers(
Arc::new(<$from_type>::from(copy_array_data(&$arr.to_data()))) as ArrayRef,
).$($as_method)+.clone()
};
}
match self {
ScalarValue::Null
| ScalarValue::Boolean(_)
Expand Down Expand Up @@ -4762,33 +4770,20 @@ impl ScalarValue {
| ScalarValue::LargeBinary(_)
| ScalarValue::BinaryView(_) => (),
ScalarValue::FixedSizeList(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = FixedSizeListArray::from(array);
}
ScalarValue::List(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = ListArray::from(array);
compact_array!(arr, FixedSizeListArray, as_fixed_size_list())
}
ScalarValue::List(arr) => compact_array!(arr, ListArray, as_list::<i32>()),
ScalarValue::LargeList(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = LargeListArray::from(array)
compact_array!(arr, LargeListArray, as_list::<i64>())
}
ScalarValue::ListView(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = ListViewArray::from(array);
compact_array!(arr, ListViewArray, as_list_view::<i32>())
}
ScalarValue::LargeListView(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = LargeListViewArray::from(array)
}
ScalarValue::Struct(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = StructArray::from(array);
}
ScalarValue::Map(arr) => {
let array = copy_array_data(&arr.to_data());
*Arc::make_mut(arr) = MapArray::from(array);
compact_array!(arr, LargeListViewArray, as_list_view::<i64>())
}
ScalarValue::Struct(arr) => compact_array!(arr, StructArray, as_struct()),
ScalarValue::Map(arr) => compact_array!(arr, MapArray, as_map()),
ScalarValue::Union(val, _, _) => {
if let Some((_, value)) = val.as_mut() {
value.compact();
Expand All @@ -4809,6 +4804,95 @@ impl ScalarValue {
self
}

/// Recursively compacts the backing buffers of any [`StringViewArray`] or
/// [`BinaryViewArray`] nested within `array`.
///
/// View-typed arrays keep an `Arc` reference to their original backing
/// buffers, so a single scalar extracted from a large batch still retains
/// the entire buffer. Calling [`.gc()`][StringViewArray::gc] copies only
/// the bytes that are actually referenced by the surviving views, releasing
/// the rest.
///
/// Container types (`List`, `LargeList`, `FixedSizeList`, `ListView`,
/// `LargeListView`, `Struct`, `Map`) are handled by recursing into their
/// child / values arrays and reconstructing the parent with the compacted
/// children. All other types are returned unchanged.
fn compact_view_buffers(array: ArrayRef) -> ArrayRef {
// Macro for the i32/i64-offset list pair (List / LargeList).
macro_rules! gc_list {
($field:expr, $offset_type:ty, $array_type:ty) => {{
let list = array.as_list::<$offset_type>();
Arc::new(<$array_type>::new(
Arc::clone($field),
list.offsets().clone(),
ScalarValue::compact_view_buffers(Arc::clone(list.values())),
list.nulls().cloned(),
)) as ArrayRef
}};
}
// Macro for the i32/i64-offset list-view pair (ListView / LargeListView).
macro_rules! gc_list_view {
($field:expr, $offset_type:ty, $array_type:ty) => {{
let list = array.as_list_view::<$offset_type>();
Arc::new(<$array_type>::new(
Arc::clone($field),
list.offsets().clone(),
list.sizes().clone(),
ScalarValue::compact_view_buffers(Arc::clone(list.values())),
list.nulls().cloned(),
)) as ArrayRef
}};
}

match array.data_type() {
DataType::Utf8View => Arc::new(array.as_string_view().gc()),
DataType::BinaryView => Arc::new(array.as_binary_view().gc()),
DataType::Struct(_) => {
let s = array.as_struct();
let columns = s
.columns()
.iter()
.map(|c| ScalarValue::compact_view_buffers(Arc::clone(c)))
.collect();
Arc::new(StructArray::new(
s.fields().clone(),
columns,
s.nulls().cloned(),
))
}
DataType::List(field) => gc_list!(field, i32, ListArray),
DataType::LargeList(field) => gc_list!(field, i64, LargeListArray),
DataType::FixedSizeList(field, size) => {
let list = array.as_fixed_size_list();
Arc::new(FixedSizeListArray::new(
Arc::clone(field),
*size,
ScalarValue::compact_view_buffers(Arc::clone(list.values())),
list.nulls().cloned(),
))
}
DataType::ListView(field) => gc_list_view!(field, i32, ListViewArray),
DataType::LargeListView(field) => {
gc_list_view!(field, i64, LargeListViewArray)
}
DataType::Map(field, ordered) => {
let map = array.as_map();
let entries = ScalarValue::compact_view_buffers(Arc::new(
map.entries().clone(),
)
as ArrayRef);
Arc::new(MapArray::new(
Arc::clone(field),
map.offsets().clone(),
entries.as_struct().clone(),
map.nulls().cloned(),
*ordered,
))
}
_ => array,
}
}

/// Returns the minimum value for the given numeric `DataType`.
///
/// This function returns the smallest representable value for numeric
Expand Down Expand Up @@ -10708,4 +10792,200 @@ mod tests {
]
);
}

// ── compact / compact_view_buffers ───────────────────────────────────────
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested that these tests fail when I back out the code changes

Details
diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs
index 63cbce10ae..73f4f0b76c 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -4719,14 +4719,6 @@ impl ScalarValue {
     /// This can be relevant when `self` is a list or contains a list as a nested value, as
     /// a single list holds an Arc to its entire original array buffer.
     pub fn compact(&mut self) {
-        // copy_array_data + compact_view_buffers + downcast back, all in one step.
-        macro_rules! compact_array {
-            ($arr:expr, $from_type:ty, $($as_method:tt)+) => {
-                *Arc::make_mut($arr) = ScalarValue::compact_view_buffers(
-                    Arc::new(<$from_type>::from(copy_array_data(&$arr.to_data()))) as ArrayRef,
-                ).$($as_method)+.clone()
-            };
-        }
         match self {
             ScalarValue::Null
             | ScalarValue::Boolean(_)
@@ -4770,20 +4762,33 @@ impl ScalarValue {
             | ScalarValue::LargeBinary(_)
             | ScalarValue::BinaryView(_) => (),
             ScalarValue::FixedSizeList(arr) => {
-                compact_array!(arr, FixedSizeListArray, as_fixed_size_list())
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = FixedSizeListArray::from(array);
+            }
+            ScalarValue::List(arr) => {
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = ListArray::from(array);
             }
-            ScalarValue::List(arr) => compact_array!(arr, ListArray, as_list::<i32>()),
             ScalarValue::LargeList(arr) => {
-                compact_array!(arr, LargeListArray, as_list::<i64>())
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = LargeListArray::from(array)
             }
             ScalarValue::ListView(arr) => {
-                compact_array!(arr, ListViewArray, as_list_view::<i32>())
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = ListViewArray::from(array);
             }
             ScalarValue::LargeListView(arr) => {
-                compact_array!(arr, LargeListViewArray, as_list_view::<i64>())
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = LargeListViewArray::from(array)
+            }
+            ScalarValue::Struct(arr) => {
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = StructArray::from(array);
+            }
+            ScalarValue::Map(arr) => {
+                let array = copy_array_data(&arr.to_data());
+                *Arc::make_mut(arr) = MapArray::from(array);
             }
-            ScalarValue::Struct(arr) => compact_array!(arr, StructArray, as_struct()),
-            ScalarValue::Map(arr) => compact_array!(arr, MapArray, as_map()),
-              ScalarValue::Union(val, _, _) => {
                 if let Some((_, value)) = val.as_mut() {
                     value.compact();
@@ -4804,95 +4809,6 @@ impl ScalarValue {
         self
     }

-    /// Recursively compacts the backing buffers of any [`StringViewArray`] or
-    /// [`BinaryViewArray`] nested within `array`.
-    ///
-    /// View-typed arrays keep an `Arc` reference to their original backing
-    /// buffers, so a single scalar extracted from a large batch still retains
-    /// the entire buffer.  Calling [`.gc()`][StringViewArray::gc] copies only
-    /// the bytes that are actually referenced by the surviving views, releasing
-    /// the rest.
-    ///
-    /// Container types (`List`, `LargeList`, `FixedSizeList`, `ListView`,
-    /// `LargeListView`, `Struct`, `Map`) are handled by recursing into their
-    /// child / values arrays and reconstructing the parent with the compacted
-    /// children.  All other types are returned unchanged.
-    fn compact_view_buffers(array: ArrayRef) -> ArrayRef {
-        // Macro for the i32/i64-offset list pair (List / LargeList).
-        macro_rules! gc_list {
-            ($field:expr, $offset_type:ty, $array_type:ty) => {{
-                let list = array.as_list::<$offset_type>();
-                Arc::new(<$array_type>::new(
-                    Arc::clone($field),
-                    list.offsets().clone(),
-                    ScalarValue::compact_view_buffers(Arc::clone(list.values())),
-                    list.nulls().cloned(),
-                )) as ArrayRef
-            }};
-        }
-        // Macro for the i32/i64-offset list-view pair (ListView / LargeListView).
-        macro_rules! gc_list_view {
-            ($field:expr, $offset_type:ty, $array_type:ty) => {{
-                let list = array.as_list_view::<$offset_type>();
-                Arc::new(<$array_type>::new(
-                    Arc::clone($field),
-                    list.offsets().clone(),
-                    list.sizes().clone(),
-                    ScalarValue::compact_view_buffers(Arc::clone(list.values())),
-                    list.nulls().cloned(),
-                )) as ArrayRef
-            }};
-        }
-
-        match array.data_type() {
-            DataType::Utf8View => Arc::new(array.as_string_view().gc()),
-            DataType::BinaryView => Arc::new(array.as_binary_view().gc()),
-            DataType::Struct(_) => {
-                let s = array.as_struct();
-                let columns = s
-                    .columns()
-                    .iter()
-                    .map(|c| ScalarValue::compact_view_buffers(Arc::clone(c)))
-                    .collect();
-                Arc::new(StructArray::new(
-                    s.fields().clone(),
-                    columns,
-                    s.nulls().cloned(),
-                ))
-            }
- 
-            DataType::List(field) => gc_list!(field, i32, ListArray),
-            DataType::LargeList(field) => gc_list!(field, i64, LargeListArray),
-            DataType::FixedSizeList(field, size) => {
-                let list = array.as_fixed_size_list();
-                Arc::new(FixedSizeListArray::new(
-                    Arc::clone(field),
-                    *size,
-                    ScalarValue::compact_view_buffers(Arc::clone(list.values())),
-                    list.nulls().cloned(),
-                ))
-            }
-            DataType::ListView(field) => gc_list_view!(field, i32, ListViewArray),
-            DataType::LargeListView(field) => {
-                gc_list_view!(field, i64, LargeListViewArray)
-            }
-            DataType::Map(field, ordered) => {
-                let map = array.as_map();
-                let entries = ScalarValue::compact_view_buffers(Arc::new(
-                    map.entries().clone(),
-                )
-                    as ArrayRef);
-                Arc::new(MapArray::new(
-                    Arc::clone(field),
-                    map.offsets().clone(),
-                    entries.as_struct().clone(),
-                    map.nulls().cloned(),
-                    *ordered,
-                ))
-            }
-            _ => array,
-        }
-    }
-
$ cargo test -p datafusion-common --lib scalar::tests::test_compact
...
failures:

---- scalar::tests::test_compact_struct_utf8view stdout ----

thread 'scalar::tests::test_compact_struct_utf8view' (76301068) panicked at datafusion/common/src/scalar/mod.rs:10864:9:
assertion `left == right` failed
  left: 1300
 right: 26

---- scalar::tests::test_compact_large_list_view_utf8view stdout ----

thread 'scalar::tests::test_compact_large_list_view_utf8view' (76301064) panicked at datafusion/common/src/scalar/mod.rs:10837:9:
assertion `left == right` failed
  left: 1300
 right: 26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- scalar::tests::test_compact_list_view_utf8view stdout ----

thread 'scalar::tests::test_compact_list_view_utf8view' (76301066) panicked at datafusion/common/src/scalar/mod.rs:10814:9:
assertion `left == right` failed
  left: 1300
 right: 26

---- scalar::tests::test_compact_list_utf8view stdout ----

thread 'scalar::tests::test_compact_list_utf8view' (76301065) panicked at datafusion/common/src/scalar/mod.rs:10745:9:
assertion `left == right` failed
  left: 1300
 right: 26

---- scalar::tests::test_compact_large_list_utf8view stdout ----

thread 'scalar::tests::test_compact_large_list_utf8view' (76301063) panicked at datafusion/common/src/scalar/mod.rs:10768:9:
assertion `left == right` failed
  left: 1300
 right: 26

---- scalar::tests::test_compact_map_utf8view stdout ----

thread 'scalar::tests::test_compact_map_utf8view' (76301067) panicked at datafusion/common/src/scalar/mod.rs:10904:9:
assertion `left == right` failed
  left: 1300
 right: 26

---- scalar::tests::test_compact_fixed_size_list_utf8view stdout ----

thread 'scalar::tests::test_compact_fixed_size_list_utf8view' (76301062) panicked at datafusion/common/src/scalar/mod.rs:10791:9:
assertion `left == right` failed
  left: 1300
 right: 26


failures:
    scalar::tests::test_compact_fixed_size_list_utf8view
    scalar::tests::test_compact_large_list_utf8view
    scalar::tests::test_compact_large_list_view_utf8view
    scalar::tests::test_compact_list_utf8view
    scalar::tests::test_compact_list_view_utf8view
    scalar::tests::test_compact_map_utf8view
    scalar::tests::test_compact_struct_utf8view

test result: FAILED. 0 passed; 7 failed; 0 ignored; 0 measured; 450 filtered out; finished in 0.00s

error: test failed, to rerun pass `-p datafusion-common --lib`


/// Builds a `StringViewArray` with `n` strings that are all longer than
/// 12 bytes so they are stored in backing buffers rather than inline.
fn make_long_strings(n: usize) -> StringViewArray {
let mut b = StringViewBuilder::new();
for i in 0..n {
b.append_value(format!("long_string_value_pad_{i:04}"));
}
b.finish()
}

/// Total bytes across all backing buffers of a `StringViewArray`.
fn utf8view_buffer_bytes(a: &StringViewArray) -> usize {
a.data_buffers().iter().map(|b| b.len()).sum()
}

#[test]
fn test_compact_list_utf8view() {
const N: usize = 50;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an awful lot of repetition in these test

        const N: usize = 50;
        let strings = make_long_strings(N);
        let one_len = strings.value(0).len();
        assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

        let single_row_list_array =
            SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
                .build_large_list_array();

...

        assert_eq!(
            utf8view_buffer_bytes(arr.values().as_string_view()),
            one_len
        );
        assert_eq!(arr.values().as_string_view().value(0), strings.value(0));

is just copy/pasted in each one.

Maybe we could move it into a fixture to reduce duplication and make it clearer what is different between different tests

let strings = make_long_strings(N);
let one_len = strings.value(0).len();
assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

let single_row_list_array =
SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
.build_list_array();
let mut scalar = ScalarValue::List(Arc::new(single_row_list_array));
scalar.compact();

let ScalarValue::List(arr) = &scalar else {
panic!("expected List")
};
assert_eq!(
utf8view_buffer_bytes(arr.values().as_string_view()),
one_len
);
assert_eq!(arr.values().as_string_view().value(0), strings.value(0));
}

#[test]
fn test_compact_large_list_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();
assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

let single_row_list_array =
SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
.build_large_list_array();
let mut scalar = ScalarValue::LargeList(Arc::new(single_row_list_array));
scalar.compact();

let ScalarValue::LargeList(arr) = &scalar else {
panic!("expected LargeList")
};
assert_eq!(
utf8view_buffer_bytes(arr.values().as_string_view()),
one_len
);
assert_eq!(arr.values().as_string_view().value(0), strings.value(0));
}

#[test]
fn test_compact_fixed_size_list_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();
assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

let single_row_list_array =
SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
.build_fixed_size_list_array(1);
let mut scalar = ScalarValue::FixedSizeList(Arc::new(single_row_list_array));
scalar.compact();

let ScalarValue::FixedSizeList(arr) = &scalar else {
panic!("expected FixedSizeList")
};
assert_eq!(
utf8view_buffer_bytes(arr.values().as_string_view()),
one_len
);
assert_eq!(arr.values().as_string_view().value(0), strings.value(0));
}

#[test]
fn test_compact_list_view_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();
assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

let single_row_list_array =
SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
.build_list_view_array();
let mut scalar = ScalarValue::ListView(Arc::new(single_row_list_array));
scalar.compact();

let ScalarValue::ListView(arr) = &scalar else {
panic!("expected ListView")
};
assert_eq!(
utf8view_buffer_bytes(arr.values().as_string_view()),
one_len
);
assert_eq!(arr.values().as_string_view().value(0), strings.value(0));
}

#[test]
fn test_compact_large_list_view_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();
assert!(utf8view_buffer_bytes(&strings) >= N * one_len);

let single_row_list_array =
SingleRowListArrayBuilder::new(Arc::new(strings.slice(0, 1)) as ArrayRef)
.build_large_list_view_array();
let mut scalar = ScalarValue::LargeListView(Arc::new(single_row_list_array));
scalar.compact();

let ScalarValue::LargeListView(arr) = &scalar else {
panic!("expected LargeListView")
};
assert_eq!(
utf8view_buffer_bytes(arr.values().as_string_view()),
one_len
);
assert_eq!(arr.values().as_string_view().value(0), strings.value(0));
}

#[test]
fn test_compact_struct_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();

let field = Arc::new(Field::new("name", DataType::Utf8View, true));
let struct_arr = StructArray::new(
Fields::from(vec![Arc::clone(&field)]),
vec![Arc::new(strings.slice(0, 1)) as ArrayRef],
None,
);

let mut scalar = ScalarValue::Struct(Arc::new(struct_arr));
scalar.compact();

let ScalarValue::Struct(arr) = &scalar else {
panic!("expected Struct")
};
let col = arr.column(0).as_string_view();
assert_eq!(utf8view_buffer_bytes(col), one_len);
assert_eq!(col.value(0), strings.value(0));
}

#[test]
fn test_compact_map_utf8view() {
const N: usize = 50;
let strings = make_long_strings(N);
let one_len = strings.value(0).len();

let key_field = Arc::new(Field::new("key", DataType::Utf8View, false));
let val_field = Arc::new(Field::new("value", DataType::Int32, true));
let entries = StructArray::new(
Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]),
vec![
Arc::new(strings.slice(0, 1)) as ArrayRef,
Arc::new(Int32Array::from(vec![1i32])) as ArrayRef,
],
None,
);
let entries_field = Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![key_field, val_field])),
false,
));
let map = MapArray::new(
entries_field,
OffsetBuffer::new(vec![0i32, 1].into()),
entries,
None,
false,
);

let mut scalar = ScalarValue::Map(Arc::new(map));
scalar.compact();

let ScalarValue::Map(arr) = &scalar else {
panic!("expected Map")
};
let keys = arr.entries().column(0).as_string_view();
assert_eq!(utf8view_buffer_bytes(keys), one_len);
assert_eq!(keys.value(0), strings.value(0));
}
}
Loading