Skip to content

Commit

Permalink
Vastly improved support for deserialized iteration (#7024)
Browse files Browse the repository at this point in the history
Whether we like it or not, sometimes we've got to deserialize.

Up until now, the new APIs only provided a nice UX for the
deserialization-free paths, while the deserialized path was particularly
painful to use.

The source of the pain is, as so often, the lack of lending iterators in
the stdlib. This PR fixes that.

Essentially, this:
```rust
let all_fill_mode_chunks = results.get_optional_chunks(&FillMode::name());
let all_fill_modes = results.iter_as(timeline, FillMode::name());
let mut all_fill_mode_iters = all_fill_mode_chunks
    .iter()
    .map(|chunk| chunk.iter_component::<FillMode>())
    .collect_vec();
let mut all_fill_modes_indexed = {
    let all_fill_modes =
        all_fill_mode_iters.iter_mut().flat_map(|it| it.into_iter());
    let all_fill_modes_indices = all_fill_mode_chunks.iter().flat_map(|chunk| {
        chunk.iter_component_indices(&timeline, &FillMode::name())
    });
    itertools::izip!(all_fill_modes_indices, all_fill_modes)
};
```
becomes this:
```rust
let all_fill_modes = results.iter_as(timeline, FillMode::name()).component::<FillMode>();
```
  • Loading branch information
teh-cmc authored Aug 2, 2024
1 parent 35de029 commit d9deef0
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 146 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4620,6 +4620,7 @@ dependencies = [
"ahash",
"anyhow",
"backtrace",
"bytemuck",
"clean-path",
"criterion",
"crossbeam",
Expand Down Expand Up @@ -4724,6 +4725,7 @@ dependencies = [
"ahash",
"anyhow",
"backtrace",
"bytemuck",
"criterion",
"indent",
"indexmap 2.1.0",
Expand Down
79 changes: 56 additions & 23 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,44 +529,77 @@ impl Chunk {

/// The actual iterator implementation for [`Chunk::iter_component`].
pub struct ChunkComponentIter<C, IO> {
values: Vec<C>,
values: Arc<Vec<C>>,
offsets: IO,
}

/// The intermediate state for [`ChunkComponentIter`].
/// The underlying item type for [`ChunkComponentIter`].
///
/// Required so that we can return references to the inner data.
pub struct ChunkComponentIterRef<'a, C, IO> {
values: &'a [C],
offsets: &'a mut IO,
/// This allows us to cheaply carry slices of deserialized data, while working around the
/// limitations of Rust's Iterator trait and ecosystem.
///
/// See [`ChunkComponentIterItem::as_slice`].
#[derive(Clone, PartialEq)]
pub struct ChunkComponentIterItem<C> {
values: Arc<Vec<C>>,
index: usize,
len: usize,
}

impl<'a, C: Component, IO: Iterator<Item = (usize, usize)>> IntoIterator
for &'a mut ChunkComponentIter<C, IO>
{
type Item = &'a [C];
impl<C: PartialEq> PartialEq<[C]> for ChunkComponentIterItem<C> {
fn eq(&self, rhs: &[C]) -> bool {
self.as_slice().eq(rhs)
}
}

type IntoIter = ChunkComponentIterRef<'a, C, IO>;
impl<C: PartialEq> PartialEq<Vec<C>> for ChunkComponentIterItem<C> {
fn eq(&self, rhs: &Vec<C>) -> bool {
self.as_slice().eq(rhs)
}
}

impl<C: Eq> Eq for ChunkComponentIterItem<C> {}

// NOTE: No `C: Default`!
impl<C> Default for ChunkComponentIterItem<C> {
#[inline]
fn into_iter(self) -> Self::IntoIter {
ChunkComponentIterRef {
values: &self.values,
offsets: &mut self.offsets,
fn default() -> Self {
Self {
values: Arc::new(Vec::new()),
index: 0,
len: 0,
}
}
}

impl<'a, C: Component, IO: Iterator<Item = (usize, usize)>> Iterator
for ChunkComponentIterRef<'a, C, IO>
{
type Item = &'a [C];
impl<C> ChunkComponentIterItem<C> {
#[inline]
pub fn as_slice(&self) -> &[C] {
&self.values[self.index..self.index + self.len]
}
}

impl<C> std::ops::Deref for ChunkComponentIterItem<C> {
type Target = [C];

#[inline]
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}

impl<C: Component, IO: Iterator<Item = (usize, usize)>> Iterator for ChunkComponentIter<C, IO> {
type Item = ChunkComponentIterItem<C>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.offsets
.next()
.map(move |(idx, len)| &self.values[idx..idx + len])
.map(move |(index, len)| ChunkComponentIterItem {
values: Arc::clone(&self.values),
index,
len,
})
}
}

Expand All @@ -591,7 +624,7 @@ impl Chunk {
) -> ChunkComponentIter<C, impl Iterator<Item = (usize, usize)> + '_> {
let Some(list_array) = self.components.get(&C::name()) else {
return ChunkComponentIter {
values: vec![],
values: Arc::new(vec![]),
offsets: Either::Left(std::iter::empty()),
};
};
Expand All @@ -614,15 +647,15 @@ impl Chunk {
);
}
return ChunkComponentIter {
values: vec![],
values: Arc::new(vec![]),
offsets: Either::Left(std::iter::empty()),
};
}
};

// NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
ChunkComponentIter {
values,
values: Arc::new(values),
offsets: Either::Right(self.iter_component_offsets(&C::name())),
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use self::builder::{ChunkBuilder, ChunkTimelineBuilder};
pub use self::chunk::{Chunk, ChunkError, ChunkResult, ChunkTimeline};
pub use self::helpers::{ChunkShared, UnitChunkShared};
pub use self::id::{ChunkId, RowId};
pub use self::iter::{ChunkComponentIter, ChunkComponentIterItem, ChunkIndicesIter};
pub use self::latest_at::LatestAtQuery;
pub use self::range::RangeQuery;
pub use self::transport::TransportChunk;
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ arrow2 = { workspace = true, features = [
"compute_concatenate",
] }
backtrace.workspace = true
bytemuck.workspace = true
clean-path.workspace = true
document-features.workspace = true
fixed = { workspace = true, default-features = false }
Expand Down
10 changes: 6 additions & 4 deletions crates/store/re_log_types/src/example_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl re_types_core::Archetype for MyPoints {

// ----------------------------------------------------------------------------

#[derive(Clone, Copy, Debug, Default, PartialEq)]
#[derive(Clone, Copy, Debug, Default, PartialEq, bytemuck::Pod, bytemuck::Zeroable)]
#[repr(C)]
pub struct MyPoint {
pub x: f32,
pub y: f32,
Expand Down Expand Up @@ -148,7 +149,8 @@ impl Loggable for MyPoint {

// ----------------------------------------------------------------------------

#[derive(Clone, Copy, Debug, Default, PartialEq)]
#[derive(Clone, Copy, Debug, Default, PartialEq, bytemuck::Pod, bytemuck::Zeroable)]
#[repr(C)]
pub struct MyPoint64 {
pub x: f64,
pub y: f64,
Expand Down Expand Up @@ -256,7 +258,7 @@ impl Loggable for MyPoint64 {

// ----------------------------------------------------------------------------

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[repr(transparent)]
pub struct MyColor(pub u32);
Expand Down Expand Up @@ -381,7 +383,7 @@ impl Loggable for MyLabel {

// ----------------------------------------------------------------------------

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[repr(transparent)]
pub struct MyIndex(pub u64);
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_query2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ thiserror.workspace = true


[dev-dependencies]
bytemuck.workspace = true
criterion.workspace = true
mimalloc.workspace = true
rand = { workspace = true, features = ["std", "std_rng"] }
Expand Down
43 changes: 18 additions & 25 deletions crates/store/re_query2/examples/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,29 @@ fn main() -> anyhow::Result<()> {
// once for the whole column, and will then return references into that data.
// This is why you have to process the data in two-steps: the iterator needs to have somewhere
// to reference to.
let mut all_points_iters = all_points_chunks
.iter()
.map(|chunk| chunk.iter_component::<MyPoint>())
.collect_vec();
let all_points_indexed = {
let all_points = all_points_iters.iter_mut().flat_map(|it| it.into_iter());
let all_points_indices = all_points_chunks
.iter()
.flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyPoint::name()));
izip!(all_points_indices, all_points)
};
let mut all_labels_iters = all_labels_chunks
let all_points_indexed = all_points_chunks.iter().flat_map(|chunk| {
izip!(
chunk.iter_component_indices(&query.timeline(), &MyPoint::name()),
chunk.iter_component::<MyPoint>()
)
});
let all_labels_indexed = all_labels_chunks
.unwrap_or_default()
.iter()
.map(|chunk| chunk.iter_component::<MyLabel>())
.collect_vec();
let all_labels_indexed = {
let all_labels = all_labels_iters.iter_mut().flat_map(|it| it.into_iter());
let all_labels_indices = all_labels_chunks
.unwrap_or_default()
.iter()
.flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyLabel::name()));
izip!(all_labels_indices, all_labels)
};
.flat_map(|chunk| {
izip!(
chunk.iter_component_indices(&query.timeline(), &MyLabel::name()),
chunk.iter_component::<MyLabel>()
)
});

// Or, if you want every last bit of performance you can get, you can manipulate the raw
// data directly:
let all_colors_indexed = all_colors_chunks
.unwrap_or_default()
.iter()
.flat_map(|chunk| {
itertools::izip!(
izip!(
chunk.iter_component_indices(&query.timeline(), &MyColor::name()),
chunk.iter_primitive::<u32>(&MyColor::name()),
)
Expand All @@ -90,8 +81,10 @@ fn main() -> anyhow::Result<()> {

eprintln!("results:");
for ((data_time, row_id), points, colors, labels) in all_frames {
let colors = colors.unwrap_or(&[]).iter().map(|c| Some(MyColor(*c)));
let labels = labels.unwrap_or(&[]).iter().cloned().map(Some);
let points = points.as_slice();
let colors = colors.unwrap_or_default().iter().map(|c| Some(MyColor(*c)));
let labels = labels.unwrap_or_default();
let labels = labels.as_slice().iter().cloned().map(Some);

// Apply your instance-level joining logic, if any:
let results =
Expand Down
55 changes: 27 additions & 28 deletions crates/store/re_query2/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::sync::Arc;

use itertools::Itertools as _;
use itertools::Itertools;

use re_chunk::{RowId, Timeline};
use re_chunk_store::{
Expand Down Expand Up @@ -1039,42 +1039,41 @@ fn query_and_compare(
);

let all_points_chunks = cached.get_required(&MyPoint::name()).unwrap();
let mut all_points_iters = all_points_chunks
let all_points_indexed = all_points_chunks
.iter()
.map(|chunk| chunk.iter_component::<MyPoint>())
.flat_map(|chunk| {
itertools::izip!(
chunk.iter_component_indices(&query.timeline(), &MyPoint::name()),
chunk.iter_component::<MyPoint>()
)
})
.collect_vec();
// Only way I've managed to make `rustc` realize there's a `PartialEq` available.
let all_points_indexed = all_points_indexed
.iter()
.map(|(index, points)| (*index, points.as_slice()))
.collect_vec();
let all_points_indexed = {
let all_points = all_points_iters.iter_mut().flat_map(|it| it.into_iter());
let all_points_indices = all_points_chunks.iter().flat_map(|chunk| {
chunk.iter_component_indices(&query.timeline(), &MyPoint::name())
});
itertools::izip!(all_points_indices, all_points)
};

let all_colors_chunks = cached.get(&MyColor::name()).unwrap_or_default();
let mut all_colors_iters = all_colors_chunks
let all_colors_indexed = all_colors_chunks
.iter()
.map(|chunk| chunk.iter_component::<MyColor>())
.flat_map(|chunk| {
itertools::izip!(
chunk.iter_component_indices(&query.timeline(), &MyColor::name()),
chunk.iter_primitive::<u32>(&MyColor::name()),
)
})
.collect_vec();
// Only way I've managed to make `rustc` realize there's a `PartialEq` available.
let all_colors_indexed = all_colors_indexed
.iter()
.map(|(index, colors)| (*index, bytemuck::cast_slice(colors)))
.collect_vec();
let all_colors_indexed = {
let all_colors = all_colors_iters.iter_mut().flat_map(|it| it.into_iter());
let all_colors_indices = all_colors_chunks.iter().flat_map(|chunk| {
chunk.iter_component_indices(&query.timeline(), &MyColor::name())
});
itertools::izip!(all_colors_indices, all_colors)
};

eprintln!("{query:?}");
eprintln!("{store}");

similar_asserts::assert_eq!(
expected_all_points_indexed,
all_points_indexed.collect_vec(),
);

similar_asserts::assert_eq!(
expected_all_colors_indexed,
all_colors_indexed.collect_vec(),
);
similar_asserts::assert_eq!(expected_all_points_indexed, all_points_indexed);
similar_asserts::assert_eq!(expected_all_colors_indexed, all_colors_indexed);
}
}
16 changes: 15 additions & 1 deletion crates/viewer/re_space_view/src/results_ext2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl<'a> RangeResultsExt for HybridResults<'a> {

// ---

use re_chunk::{RowId, TimeInt, Timeline};
use re_chunk::{ChunkComponentIterItem, RowId, TimeInt, Timeline};
use re_chunk_store::external::{re_chunk, re_chunk::external::arrow2};

/// The iterator type backing [`HybridResults::iter_as`].
Expand All @@ -375,6 +375,20 @@ pub struct HybridResultsChunkIter<'a> {
}

impl<'a> HybridResultsChunkIter<'a> {
/// Iterate as indexed deserialized batches.
///
/// See [`Chunk::iter_component`] for more information.
pub fn component<C: re_types_core::Component>(
&'a self,
) -> impl Iterator<Item = ((TimeInt, RowId), ChunkComponentIterItem<C>)> + 'a {
self.chunks.iter().flat_map(move |chunk| {
itertools::izip!(
chunk.iter_component_indices(&self.timeline, &self.component_name),
chunk.iter_component::<C>(),
)
})
}

/// Iterate as indexed primitives.
///
/// See [`Chunk::iter_primitive`] for more information.
Expand Down
Loading

0 comments on commit d9deef0

Please sign in to comment.