Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe v2 Rust: public dataframe symbols #7808

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6251,6 +6251,7 @@ dependencies = [
"re_chunk_store",
"re_crash_handler",
"re_data_source",
"re_dataframe",
"re_entity_db",
"re_error",
"re_format",
Expand Down
37 changes: 37 additions & 0 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use arrow2::{
};
use itertools::Itertools;

use crate::TransportChunk;

// ---

/// Returns true if the given `list_array` is semantically empty.
Expand Down Expand Up @@ -409,3 +411,38 @@ pub fn take_array<A: ArrowArray + Clone, O: arrow2::types::Index>(
.unwrap()
.clone()
}

// ---

use arrow2::{chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema};

/// Concatenate multiple [`TransportChunk`]s into one.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved this from somewhere else -- it's very useful as an enduser as long as we're stuck with arrow2.

///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
schema: ArrowSchema,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
assert!(batches.iter().map(|batch| &batch.schema).all_equal());

let mut arrays = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let array = arrow2::compute::concatenate::concatenate(
&batches
.iter()
.map(|batch| &*batch.data[i] as &dyn ArrowArray)
.collect_vec(),
)?;
arrays.push(array);
}
}

Ok(TransportChunk {
schema,
data: ArrowChunk::new(arrays),
})
}
4 changes: 3 additions & 1 deletion crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ pub use self::query::QueryHandle;
#[doc(no_inline)]
pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
#[doc(no_inline)]
pub use self::external::re_chunk::util::concatenate_record_batches;
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
};
#[doc(no_inline)]
pub use self::external::re_log_types::{TimeInt, Timeline};
pub use self::external::re_log_types::{EntityPathFilter, ResolvedTimeRange, TimeInt, Timeline};
#[doc(no_inline)]
pub use self::external::re_query::Caches as QueryCache;

Expand Down
82 changes: 29 additions & 53 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ impl<'a> QueryHandle<'a> {
mod tests {
use std::sync::Arc;

use re_chunk::{Chunk, ChunkId, RowId, TimePoint};
use re_chunk::{util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint};
use re_chunk_store::{ChunkStore, ChunkStoreConfig, ResolvedTimeRange, TimeInt};
use re_log_types::{
build_frame_nr, build_log_time,
Expand Down Expand Up @@ -1248,7 +1248,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1283,7 +1283,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1333,7 +1333,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1382,7 +1382,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1437,7 +1437,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1495,7 +1495,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1538,7 +1538,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1595,7 +1595,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1624,7 +1624,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1653,7 +1653,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1692,7 +1692,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1750,7 +1750,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1790,7 +1790,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1843,7 +1843,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1879,7 +1879,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -1930,7 +1930,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -2003,7 +2003,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -2094,7 +2094,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -2151,7 +2151,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
Expand Down Expand Up @@ -2188,7 +2188,7 @@ mod tests {
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.into_batch_iter().collect_vec(),
);
)?;
eprintln!("{dataframe}");

// TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again
Expand Down Expand Up @@ -2252,11 +2252,11 @@ mod tests {
let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
)?;
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);
)?;

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());
Expand Down Expand Up @@ -2293,11 +2293,11 @@ mod tests {
let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
)?;
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);
)?;

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());
Expand Down Expand Up @@ -2337,11 +2337,11 @@ mod tests {
let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
)?;
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);
)?;

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());
Expand Down Expand Up @@ -2375,11 +2375,11 @@ mod tests {
let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
)?;
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);
)?;

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());
Expand Down Expand Up @@ -2682,28 +2682,4 @@ mod tests {

Ok(())
}

fn concatenate_record_batches(schema: ArrowSchema, batches: &[RecordBatch]) -> RecordBatch {
assert!(batches.iter().map(|batch| &batch.schema).all_equal());

let mut arrays = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let array = arrow2::compute::concatenate::concatenate(
&batches
.iter()
.map(|batch| &*batch.data[i] as &dyn ArrowArray)
.collect_vec(),
)
.unwrap();
arrays.push(array);
}
}

RecordBatch {
schema,
data: ArrowChunk::new(arrays),
}
}
}
5 changes: 5 additions & 0 deletions crates/top/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"]
default = [
"analytics",
"data_loaders",
"dataframe",
"demo",
"glam",
"image",
Expand Down Expand Up @@ -54,6 +55,9 @@ data_loaders = ["re_sdk?/data_loaders"]
## Demo helpers for examples.
demo = []

## Access to Rerun's dataframe API and related types.
dataframe = ["dep:re_dataframe"]

## Add support for some math operations using [`glam`](https://crates.io/crates/glam/).
## Only relevant if feature `sdk` is enabled.
glam = ["re_types?/glam"]
Expand Down Expand Up @@ -132,6 +136,7 @@ similar-asserts.workspace = true
re_analytics = { workspace = true, optional = true }
re_chunk_store = { workspace = true, optional = true }
re_data_source = { workspace = true, optional = true }
re_dataframe = { workspace = true, optional = true }
re_log_encoding = { workspace = true, optional = true, features = [
"decoder",
"encoder",
Expand Down
11 changes: 9 additions & 2 deletions crates/top/rerun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,18 @@ pub use commands::{run, CallSource};
#[cfg(feature = "sdk")]
pub use sdk::*;

/// All the types required by the dataframe API.
#[cfg(feature = "dataframe")]
pub mod dataframe {
pub use re_dataframe::*;
}

/// Everything needed to build custom `ChunkStoreSubscriber`s.
pub use re_entity_db::external::re_chunk_store::{
ChunkStore, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent, ChunkStoreGeneration,
ChunkStoreSubscriber,
ChunkStore, ChunkStoreConfig, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent,
ChunkStoreGeneration, ChunkStoreSubscriber, VersionPolicy,
};
pub use re_log_types::StoreKind;

/// To register a new external data loader, simply add an executable in your $PATH whose name
/// starts with this prefix.
Expand Down
Loading