Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrays have multiple buffers #1743

Merged
merged 92 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
79e6ba2
Store desired alignment in the array buffer
gatesn Dec 18, 2024
33b911c
Buffer alignment
gatesn Dec 18, 2024
602f509
Buffer alignment
gatesn Dec 18, 2024
b54f546
Buffer alignment
gatesn Dec 18, 2024
7db32d1
Buffer alignment
gatesn Dec 18, 2024
7852ff3
Buffer alignment
gatesn Dec 18, 2024
1525de5
Buffer alignment
gatesn Dec 18, 2024
83ad2ba
Merge branch 'develop' into ngates/buffers
gatesn Dec 18, 2024
8209d76
AlignedBufferMut
gatesn Dec 18, 2024
897d063
AlignedBufferMut
gatesn Dec 18, 2024
1ef552e
AlignedBufferMut
gatesn Dec 18, 2024
b4cb50a
AlignedBufferMut
gatesn Dec 18, 2024
c5ab1ac
AlignedBufferMut
gatesn Dec 18, 2024
59bc34a
AlignedBufferMut
gatesn Dec 18, 2024
d14bc95
AlignedBufferMut
gatesn Dec 19, 2024
3baafa7
AlignedBufferMut
gatesn Dec 19, 2024
50438aa
AlignedBufferMut
gatesn Dec 19, 2024
7689fc8
AlignedBufferMut
gatesn Dec 19, 2024
6f67551
AlignedBufferMut
gatesn Dec 19, 2024
3989cca
Fix transmute
gatesn Dec 19, 2024
1f5eca7
Fix transmute
gatesn Dec 19, 2024
f61bd02
Fix transmute
gatesn Dec 19, 2024
c75b39a
Fix transmute
gatesn Dec 19, 2024
f247f1a
Fix transmute
gatesn Dec 19, 2024
434b635
Fix transmute
gatesn Dec 19, 2024
cf6049f
Fix transmute
gatesn Dec 19, 2024
a21f595
Combine into single ScalarBuffer
gatesn Dec 19, 2024
6b37397
Benchmark from_iter
gatesn Dec 19, 2024
3338746
Benchmark from_iter
gatesn Dec 19, 2024
20e4c7e
Benchmark from_iter
gatesn Dec 19, 2024
f5d3bf0
Rename ScalarBuffer to Buffer
gatesn Dec 19, 2024
12f91ea
Improve performance of from_iter
gatesn Dec 19, 2024
baf9bb1
Improve performance of from_iter
gatesn Dec 19, 2024
aef3e16
Improve performance of from_iter
gatesn Dec 19, 2024
e41a3c5
Improve performance of from_iter
gatesn Dec 19, 2024
0bfce15
Remove maybe_null prefix
gatesn Dec 20, 2024
1bae5c1
Remove maybe_null prefix
gatesn Dec 20, 2024
c680791
Remove maybe_null prefix
gatesn Dec 20, 2024
5be44d5
Remove maybe_null prefix
gatesn Dec 20, 2024
59abd5d
Some updates
gatesn Dec 20, 2024
7100a11
Add push benchmark
gatesn Dec 20, 2024
1d3f8ae
Speed up push
gatesn Dec 20, 2024
e7fbef1
Speed up push
gatesn Dec 20, 2024
91a7ef7
More things
gatesn Dec 20, 2024
8bf5025
Migrate to buffer
gatesn Dec 20, 2024
6e26355
Remove copy for ALP decompress
gatesn Dec 20, 2024
0937fdf
Remove copy for ALP decompress
gatesn Dec 20, 2024
1706492
Remove copy for ALP decompress
gatesn Dec 20, 2024
6ff25dd
Remove copy for ALP decompress
gatesn Dec 20, 2024
f25637b
Remove copy for ALP decompress
gatesn Dec 20, 2024
840f4c2
Remove copy for ALP decompress
gatesn Dec 20, 2024
750d979
Remove copy for ALP decompress
gatesn Dec 20, 2024
2f3da8f
Remove copy for ALP decompress
gatesn Dec 20, 2024
298b8dd
Fix cast bug
gatesn Dec 20, 2024
d199163
Fix cast bug
gatesn Dec 20, 2024
8ca57a6
Fix cast bug
gatesn Dec 20, 2024
f1b2fc6
push_n
gatesn Dec 20, 2024
6e953a8
Add nbuffers
gatesn Dec 20, 2024
afc1d32
Merge develop
gatesn Dec 21, 2024
82fd044
Lint
gatesn Dec 21, 2024
4c64819
Lint
gatesn Dec 21, 2024
852b573
Docs|
gatesn Dec 21, 2024
3a1c28c
Docs|
gatesn Dec 21, 2024
302ed43
Make Miri happy by borrowing the uninit region
gatesn Dec 21, 2024
343eaad
More tests
gatesn Dec 21, 2024
da06bd1
More tests
gatesn Dec 21, 2024
f4b65eb
More tests
gatesn Dec 21, 2024
898c8a5
Add some assertions
gatesn Dec 21, 2024
c969cec
Merge
gatesn Dec 21, 2024
af63176
Remove vortex-dtype dependency
gatesn Dec 21, 2024
92f66f8
fix typo
lwwmanning Dec 26, 2024
0f734d7
fix ALP compress to not copy
lwwmanning Dec 26, 2024
ca84348
trivial optimization
lwwmanning Dec 26, 2024
ce9c8fd
remove unnecessary allocations in bitpacking compress
lwwmanning Dec 26, 2024
3c1735a
nits
lwwmanning Dec 26, 2024
885eb4b
more nits
lwwmanning Dec 26, 2024
530c69c
fixup locks
lwwmanning Dec 26, 2024
4b51cf4
Zero-copy IO
gatesn Dec 29, 2024
eba9a69
Zero-copy IO
gatesn Dec 29, 2024
768c692
Zero-copy IO
gatesn Dec 29, 2024
2f46737
Zero-copy IO
gatesn Dec 29, 2024
039ed78
Zero-copy IO
gatesn Dec 29, 2024
aec6081
Zero-copy IO
gatesn Dec 29, 2024
6dafbdc
Zero-copy IO
gatesn Dec 29, 2024
120b1c7
Zero-copy IO
gatesn Dec 29, 2024
d31f847
Zero-copy IO
gatesn Dec 29, 2024
978a2fb
Appease Miri
gatesn Dec 29, 2024
cdf2bf9
Merge remote-tracking branch 'origin/develop' into ngates/buffers
lwwmanning Dec 30, 2024
f0965ad
Merge branch 'develop' into ngates/buffers
gatesn Dec 30, 2024
16dfe81
merge
gatesn Dec 30, 2024
ae4dba1
merge
gatesn Jan 7, 2025
62c0c08
merge
gatesn Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions encodings/bytebool/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ByteBoolArray {
Arc::new(ByteBoolMetadata {
validity: validity.to_metadata(length)?,
}),
Some(buffer.into_byte_buffer()),
[buffer.into_byte_buffer()].into(),
validity.into_array().into_iter().collect::<Vec<_>>().into(),
StatsSet::default(),
)?
Expand All @@ -63,7 +63,7 @@ impl ByteBoolArray {

pub fn buffer(&self) -> &ByteBuffer {
self.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("ByteBoolArray is missing the underlying buffer")
}

Expand Down
4 changes: 2 additions & 2 deletions encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl BitPackedArray {
dtype,
length,
Arc::new(metadata),
Some(packed),
[packed].into(),
children.into(),
StatsSet::default(),
)?
Expand All @@ -160,7 +160,7 @@ impl BitPackedArray {
#[inline]
pub fn packed(&self) -> &ByteBuffer {
self.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("BitPackedArray must contain packed buffer")
}

Expand Down
4 changes: 2 additions & 2 deletions encodings/roaring/src/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl RoaringBoolArray {
DType::Bool(Nullability::NonNullable),
length,
Arc::new(RoaringBoolMetadata),
Some(ByteBuffer::from(bitmap.serialize::<Native>())),
[ByteBuffer::from(bitmap.serialize::<Native>())].into(),
vec![].into(),
stats,
)?
Expand All @@ -77,7 +77,7 @@ impl RoaringBoolArray {

pub fn buffer(&self) -> &ByteBuffer {
self.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("Missing buffer in PrimitiveArray")
}
}
Expand Down
6 changes: 3 additions & 3 deletions encodings/roaring/src/integer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl RoaringIntArray {
DType::Primitive(ptype, NonNullable),
length,
Arc::new(RoaringIntMetadata { ptype }),
Some(ByteBuffer::from(bitmap.serialize::<Portable>())),
[ByteBuffer::from(bitmap.serialize::<Portable>())].into(),
vec![].into(),
stats,
)?
Expand All @@ -78,7 +78,7 @@ impl RoaringIntArray {
pub fn owned_bitmap(&self) -> Bitmap {
Bitmap::deserialize::<Portable>(
self.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("RoaringBoolArray buffer is missing")
.as_ref(),
)
Expand Down Expand Up @@ -139,7 +139,7 @@ impl VisitorVTable<RoaringIntArray> for RoaringIntEncoding {
visitor.visit_buffer(
array
.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("Missing buffer in RoaringIntArray"),
)
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ impl BoolArray {
/// Access internal array buffer
pub fn buffer(&self) -> &ByteBuffer {
self.as_ref()
.byte_buffer()
.byte_buffer(0)
.vortex_expect("Missing buffer in BoolArray")
}

/// Convert array into its internal buffer
pub fn into_buffer(self) -> ByteBuffer {
self.into_array()
.into_byte_buffer()
.into_byte_buffer(0)
.vortex_expect("BoolArray must have a buffer")
}

Expand Down Expand Up @@ -127,7 +127,7 @@ impl BoolArray {
validity: validity.to_metadata(buffer_len)?,
first_byte_bit_offset,
}),
Some(ByteBuffer::from_arrow_buffer(inner, Alignment::of::<u8>())),
vec![ByteBuffer::from_arrow_buffer(inner, Alignment::of::<u8>())].into(),
validity.into_array().into_iter().collect(),
StatsSet::default(),
)?
Expand Down
14 changes: 6 additions & 8 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl PrimitiveArray {
Arc::new(PrimitiveMetadata {
validity: validity.to_metadata(len).vortex_expect("Invalid validity"),
}),
Some(buffer.into_byte_buffer()),
[buffer.into_byte_buffer()].into(),
validity.into_array().into_iter().collect(),
StatsSet::default(),
)
Expand Down Expand Up @@ -102,16 +102,14 @@ impl PrimitiveArray {
}

pub fn byte_buffer(&self) -> &ByteBuffer {
let buffer = self
.as_ref()
.byte_buffer()
.vortex_expect("Missing buffer in PrimitiveArray");
buffer
self.as_ref()
.byte_buffer(0)
.vortex_expect("Missing buffer in PrimitiveArray")
}

pub fn into_byte_buffer(self) -> ByteBuffer {
self.into_array()
.into_byte_buffer()
.into_byte_buffer(0)
.vortex_expect("PrimitiveArray must have a buffer")
}

Expand All @@ -136,7 +134,7 @@ impl PrimitiveArray {
}
Buffer::from_byte_buffer(
self.into_array()
.into_byte_buffer()
.into_byte_buffer(0)
.vortex_expect("PrimitiveArray must have a buffer"),
)
}
Expand Down
33 changes: 23 additions & 10 deletions vortex-array/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ArrayData {
dtype: DType,
len: usize,
metadata: Arc<dyn ArrayMetadata>,
buffer: Option<ByteBuffer>,
buffers: Arc<[ByteBuffer]>,
children: Arc<[ArrayData]>,
statistics: StatsSet,
) -> VortexResult<Self> {
Expand All @@ -70,7 +70,7 @@ impl ArrayData {
dtype,
len,
metadata,
buffer,
buffers,
children,
stats_set: Arc::new(RwLock::new(statistics)),
#[cfg(feature = "canonical_counter")]
Expand Down Expand Up @@ -247,7 +247,7 @@ impl ArrayData {
.iter()
.map(|child| child.cumulative_nbuffers())
.sum::<usize>()
+ if self.byte_buffer().is_some() { 1 } else { 0 }
+ self.nbuffers()
}

/// Return the buffer offsets and the total length of all buffers, assuming the given alignment.
Expand All @@ -257,7 +257,7 @@ impl ArrayData {
let mut offset = 0;

for col_data in self.depth_first_traversal() {
if let Some(buffer) = col_data.byte_buffer() {
for buffer in col_data.byte_buffers() {
offsets.push(offset as u64);

let buffer_size = buffer.len();
Expand Down Expand Up @@ -320,17 +320,30 @@ impl ArrayData {
}
}

pub fn byte_buffer(&self) -> Option<&ByteBuffer> {
pub fn nbuffers(&self) -> usize {
match &self.0 {
InnerArrayData::Owned(d) => d.byte_buffer(),
InnerArrayData::Viewed(v) => v.byte_buffer(),
InnerArrayData::Owned(o) => o.buffers.len(),
InnerArrayData::Viewed(v) => v.nbuffers(),
}
}

pub fn into_byte_buffer(self) -> Option<ByteBuffer> {
pub fn byte_buffer(&self, index: usize) -> Option<&ByteBuffer> {
match &self.0 {
InnerArrayData::Owned(d) => d.byte_buffer(index),
InnerArrayData::Viewed(v) => v.buffer(index),
}
}

pub fn byte_buffers(&self) -> impl Iterator<Item = ByteBuffer> + '_ {
(0..self.nbuffers())
.map(|i| self.byte_buffer(i).vortex_expect("missing declared buffer"))
.cloned()
}

pub fn into_byte_buffer(self, index: usize) -> Option<ByteBuffer> {
match self.0 {
InnerArrayData::Owned(d) => d.into_byte_buffer(),
InnerArrayData::Viewed(v) => v.byte_buffer().cloned(),
InnerArrayData::Owned(d) => d.into_byte_buffer(index),
InnerArrayData::Viewed(v) => v.buffer(index).cloned(),
}
}

Expand Down
12 changes: 7 additions & 5 deletions vortex-array/src/data/owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(super) struct OwnedArrayData {
pub(super) dtype: DType, // FIXME(ngates): Arc?
pub(super) len: usize,
pub(super) metadata: Arc<dyn ArrayMetadata>,
pub(super) buffer: Option<ByteBuffer>,
pub(super) buffers: Arc<[ByteBuffer]>,
pub(super) children: Arc<[ArrayData]>,
pub(super) stats_set: Arc<RwLock<StatsSet>>,
#[cfg(feature = "canonical_counter")]
Expand All @@ -28,12 +28,14 @@ impl OwnedArrayData {
&self.metadata
}

pub fn byte_buffer(&self) -> Option<&ByteBuffer> {
self.buffer.as_ref()
pub fn byte_buffer(&self, index: usize) -> Option<&ByteBuffer> {
self.buffers.get(index)
}

pub fn into_byte_buffer(self) -> Option<ByteBuffer> {
self.buffer
pub fn into_byte_buffer(self, index: usize) -> Option<ByteBuffer> {
// While this does require a clone, we're still "into" because it makes sure the self
// reference is dropped.
self.buffers.get(index).cloned()
}

// We want to allow these panics because they are indicative of implementation error.
Expand Down
18 changes: 14 additions & 4 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,22 @@ impl ViewedArrayData {
collector.children()
}

pub fn byte_buffer(&self) -> Option<&ByteBuffer> {
pub fn nbuffers(&self) -> usize {
self.flatbuffer()
.buffers()
.and_then(|buffers| {
assert!(buffers.len() <= 1, "Array: expected at most one buffer");
(!buffers.is_empty()).then(|| buffers.get(0) as usize)
.map(|b| b.len())
.unwrap_or_default()
}

pub fn buffer(&self, index: usize) -> Option<&ByteBuffer> {
self.flatbuffer()
.buffers()
.map(|buffers| {
assert!(
index < buffers.len(),
"ArrayView buffer index out of bounds"
);
buffers.get(index) as usize
})
.map(|idx| &self.buffers[idx])
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ macro_rules! impl_encoding {
dtype,
len,
std::sync::Arc::new(metadata),
None,
vec![].into(),
children,
stats
)?)
Expand Down
2 changes: 1 addition & 1 deletion vortex-ipc/src/messages/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ mod test {
// Constant arrays have no buffers
let array = ConstantArray::new(10i32, 20).into_array();
assert!(
array.byte_buffer().is_none(),
array.byte_buffer(0).is_none(),
"Array should have no buffers"
);
write_and_read(array);
Expand Down
18 changes: 5 additions & 13 deletions vortex-ipc/src/messages/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl MessageEncoder {

let mut fb_buffers = vec![];
for child in array.depth_first_traversal() {
if let Some(buffer) = child.byte_buffer() {
for buffer in child.byte_buffers() {
let end_excl_padding = self.pos + buffer.len();
let end_incl_padding = end_excl_padding.next_multiple_of(self.alignment);
let padding = u16::try_from(end_incl_padding - end_excl_padding)
Expand Down Expand Up @@ -198,12 +198,9 @@ impl WriteFlatBuffer for ArrayWriter<'_> {
// The second tuple element holds the buffer_index for this Array subtree. If this array
// has a buffer, that is its buffer index. If it does not, that buffer index belongs
// to one of the children.
let child_buffer_idx = self.buffer_idx
+ if self.array.byte_buffer().is_some() {
1
} else {
0
};
let nbuffers = u16::try_from(self.array.nbuffers())
.vortex_expect("Array can have at most u16::MAX buffers");
let child_buffer_idx = self.buffer_idx + nbuffers;

let children = self
.array
Expand All @@ -225,12 +222,7 @@ impl WriteFlatBuffer for ArrayWriter<'_> {
.collect_vec();
let children = Some(fbb.create_vector(&children));

let buffers = self
.array
.byte_buffer()
.is_some()
.then_some(self.buffer_idx)
.map(|buffer_idx| fbb.create_vector_from_iter(std::iter::once(buffer_idx)));
let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));

let stats = Some(self.array.statistics().write_flatbuffer(fbb));

Expand Down
Loading