Skip to content

Commit cde77d3

Browse files
authored
Add Dataset.register_batch and wrappers for task ids (#9895)
### Related * closes rerun-io/dataplatform#689 ### What - Introduces `Dataset.register_batch()` to register multiple RRDs at once. - Introduces `Task` and `Tasks`, which are wrapper over one, respectively a bunch of, task ids. For now, they just have a `.wait(timeout_secs)` method. - Add an optional `timeout_secs` argument to `Dataset.register()`
1 parent b29dd0c commit cde77d3

File tree

8 files changed

+225
-35
lines changed

8 files changed

+225
-35
lines changed

rerun_py/rerun_bindings/rerun_bindings.pyi

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,8 +1038,36 @@ class Dataset(Entry):
10381038
def partition_url(self, partition_id: str) -> str:
10391039
"""Return the URL for the given partition."""
10401040

1041-
def register(self, recording_uri: str) -> None:
1042-
"""Register a RRD URI to the dataset."""
1041+
def register(self, recording_uri: str, timeout_secs: int = 60) -> None:
1042+
"""
1043+
Register a RRD URI to the dataset and wait for completion.
1044+
1045+
This method registers a single recording to the dataset and blocks until the registration is
1046+
complete, or after a timeout (in which case, a `TimeoutError` is raised).
1047+
1048+
Parameters
1049+
----------
1050+
recording_uri: str
1051+
The URI of the RRD to register
1052+
1053+
timeout_secs: int
1054+
The timeout after which this method returns.
1055+
1056+
"""
1057+
1058+
def register_batch(self, recording_uris: list[str]) -> Tasks:
1059+
"""
1060+
Register a batch of RRD URIs to the dataset and return a handle to the tasks.
1061+
1062+
This method initiates the registration of multiple recordings to the dataset, and returns
1063+
the corresponding task ids in a [`Tasks`] object.
1064+
1065+
Parameters
1066+
----------
1067+
recording_uris: list[str]
1068+
The URIs of the RRDs to register
1069+
1070+
"""
10431071

10441072
def download_partition(self, partition_id: str) -> Recording:
10451073
"""Download a partition from the dataset."""
@@ -1305,6 +1333,36 @@ class DataFusionTable:
13051333
def name(self) -> str:
13061334
"""Name of this table."""
13071335

1336+
class Task:
1337+
"""A handle on a remote task."""
1338+
1339+
@property
1340+
def id(self) -> str:
1341+
"""The task id."""
1342+
1343+
def wait(self, timeout_secs: int) -> None:
1344+
"""
1345+
Block until the task is completed or the timeout is reached.
1346+
1347+
A `TimeoutError` is raised if the timeout is reached.
1348+
"""
1349+
1350+
class Tasks:
1351+
"""A collection of [`Task`]."""
1352+
1353+
def wait(self, timeout_secs: int) -> None:
1354+
"""
1355+
Block until all tasks are completed or the timeout is reached.
1356+
1357+
A `TimeoutError` is raised if the timeout is reached.
1358+
"""
1359+
1360+
def __len__(self) -> int:
1361+
"""Return the number of tasks."""
1362+
1363+
def __getitem__(self, index: int) -> Task:
1364+
"""Return the task at the given index."""
1365+
13081366
#####################################################################################################################
13091367
## SEND_TABLE ##
13101368
#####################################################################################################################

rerun_py/rerun_sdk/rerun/catalog.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@
88
EntryId as EntryId,
99
EntryKind as EntryKind,
1010
Table as Table,
11+
Task as Task,
1112
VectorDistanceMetric as VectorDistanceMetric,
1213
)

rerun_py/src/catalog/connection_handle.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ impl ConnectionHandle {
219219
})
220220
}
221221

222+
/// Wait for the provided tasks to finish.
222223
pub fn wait_for_tasks(
223224
&mut self,
224225
py: Python<'_>,
@@ -251,7 +252,7 @@ impl ConnectionHandle {
251252
.decode()
252253
.map_err(to_py_err)?
253254
} else {
254-
return Err(PyValueError::new_err("no response from task"));
255+
return Err(PyConnectionError::new_err("no response from task"));
255256
};
256257

257258
// TODO(andrea): this is a bit hideous. Maybe the idea of returning a dataframe rather

rerun_py/src/catalog/dataset.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ use arrow::array::{RecordBatch, StringArray};
44
use arrow::datatypes::{Field, Schema as ArrowSchema};
55
use arrow::pyarrow::PyArrowType;
66
use pyo3::{exceptions::PyRuntimeError, pyclass, pymethods, Py, PyAny, PyRef, PyResult, Python};
7-
use re_grpc_client::redap::get_chunks_response_to_chunk_and_partition_id;
87
use tokio_stream::StreamExt as _;
98

109
use re_chunk_store::{ChunkStore, ChunkStoreHandle};
1110
use re_datafusion::{PartitionTableProvider, SearchResultsTableProvider};
11+
use re_grpc_client::redap::get_chunks_response_to_chunk_and_partition_id;
1212
use re_log_encoding::codec::wire::encoder::Encode as _;
1313
use re_log_types::{StoreId, StoreInfo, StoreKind, StoreSource};
1414
use re_protos::common::v1alpha1::ext::DatasetHandle;
@@ -20,6 +20,7 @@ use re_protos::manifest_registry::v1alpha1::{
2020
};
2121
use re_sorbet::{SorbetColumnDescriptors, TimeColumnSelector};
2222

23+
use crate::catalog::task::PyTasks;
2324
use crate::catalog::{
2425
dataframe_query::PyDataframeQueryView, to_py_err, PyEntry, VectorDistanceMetricLike, VectorLike,
2526
};
@@ -94,24 +95,52 @@ impl PyDataset {
9495
.to_string()
9596
}
9697

97-
/// Register a RRD URI to the dataset.
98-
fn register(self_: PyRef<'_, Self>, recording_uri: String) -> PyResult<()> {
99-
// TODO(#9731): In order to make the `register` method appear synchronous,
100-
// we need to hard-code a max timeout for waiting for the corresponding tasks.
101-
// 60 seconds is totally arbitrary but should work for interactive uses
102-
// for now.
103-
//
104-
// A more permanent solution is to expose an asynchronous register method, and/or
105-
// the timeout directly to the caller.
106-
// See also issue #9731
107-
const MAX_REGISTER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
98+
/// Register a RRD URI to the dataset and wait for completion.
99+
///
100+
/// This method registers a single recording to the dataset and blocks until the registration is
101+
/// complete, or after a timeout (in which case, a `TimeoutError` is raised).
102+
///
103+
/// Parameters
104+
/// ----------
105+
/// recording_uri: str
106+
/// The URI of the RRD to register
107+
///
108+
/// timeout_secs: int
109+
/// The timeout after which this method returns.
110+
#[pyo3(signature = (recording_uri, timeout_secs = 60))]
111+
fn register(self_: PyRef<'_, Self>, recording_uri: String, timeout_secs: u64) -> PyResult<()> {
112+
let register_timeout = std::time::Duration::from_secs(timeout_secs);
108113
let super_ = self_.as_super();
109114
let mut connection = super_.client.borrow(self_.py()).connection().clone();
110115
let dataset_id = super_.details.id;
111116

112117
let task_ids =
113118
connection.register_with_dataset(self_.py(), dataset_id, vec![recording_uri])?;
114-
connection.wait_for_tasks(self_.py(), &task_ids, MAX_REGISTER_TIMEOUT)
119+
120+
connection.wait_for_tasks(self_.py(), &task_ids, register_timeout)
121+
}
122+
123+
/// Register a batch of RRD URIs to the dataset and return a handle to the tasks.
124+
///
125+
/// This method initiates the registration of multiple recordings to the dataset, and returns
126+
/// the corresponding task ids in a [`Tasks`] object.
127+
///
128+
/// Parameters
129+
/// ----------
130+
/// recording_uris: list[str]
131+
/// The URIs of the RRDs to register
132+
#[allow(rustdoc::broken_intra_doc_links)]
133+
fn register_batch(self_: PyRef<'_, Self>, recording_uris: Vec<String>) -> PyResult<PyTasks> {
134+
let super_ = self_.as_super();
135+
let mut connection = super_.client.borrow(self_.py()).connection().clone();
136+
let dataset_id = super_.details.id;
137+
138+
let task_ids = connection.register_with_dataset(self_.py(), dataset_id, recording_uris)?;
139+
140+
Ok(PyTasks {
141+
client: super_.client.clone_ref(self_.py()),
142+
ids: task_ids,
143+
})
115144
}
116145

117146
/// Download a partition from the dataset.

rerun_py/src/catalog/errors.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
//! - Error type (either built-in such as [`pyo3::exceptions::PyValueError`] or custom) can always
1414
//! be used directly using, e.g. `PyValueError::new_err("message")`.
1515
16-
use pyo3::exceptions::{PyConnectionError, PyValueError};
17-
use pyo3::PyErr;
1816
use std::error::Error as _;
1917

18+
use pyo3::exceptions::{PyConnectionError, PyTimeoutError, PyValueError};
19+
use pyo3::PyErr;
20+
2021
use re_grpc_client::redap::ConnectionError;
2122
use re_protos::manifest_registry::v1alpha1::ext::GetDatasetSchemaResponseError;
23+
2224
// ---
2325

2426
/// Private error type to server as a bridge between various external error type and the
@@ -85,17 +87,22 @@ impl From<ExternalError> for PyErr {
8587
ExternalError::ConnectionError(err) => PyConnectionError::new_err(err.to_string()),
8688

8789
ExternalError::TonicStatusError(status) => {
88-
let mut msg = format!(
89-
"tonic status error: {} (code: {}",
90-
status.message(),
91-
status.code()
92-
);
93-
if let Some(source) = status.source() {
94-
msg.push_str(&format!(", source: {source})"));
90+
if status.code() == tonic::Code::DeadlineExceeded {
91+
PyTimeoutError::new_err("Deadline expired before operation could complete")
9592
} else {
96-
msg.push(')');
93+
let mut msg = format!(
94+
"tonic status error: {} (code: {}",
95+
status.message(),
96+
status.code()
97+
);
98+
if let Some(source) = status.source() {
99+
msg.push_str(&format!(", source: {source})"));
100+
} else {
101+
msg.push(')');
102+
}
103+
104+
PyConnectionError::new_err(msg)
97105
}
98-
PyConnectionError::new_err(msg)
99106
}
100107

101108
ExternalError::UriError(err) => PyValueError::new_err(format!("Invalid URI: {err}")),

rerun_py/src/catalog/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod dataset;
77
mod entry;
88
mod errors;
99
mod table;
10+
mod task;
1011

1112
use std::sync::Arc;
1213

@@ -18,12 +19,13 @@ use pyo3::{exceptions::PyRuntimeError, prelude::*, Bound, PyResult};
1819

1920
use crate::catalog::dataframe_query::PyDataframeQueryView;
2021

21-
pub use catalog_client::PyCatalogClient;
22-
pub use connection_handle::ConnectionHandle;
23-
pub use dataset::PyDataset;
24-
pub use entry::{PyEntry, PyEntryId, PyEntryKind};
25-
pub use errors::to_py_err;
26-
pub use table::PyTable;
22+
pub(crate) use catalog_client::PyCatalogClient;
23+
pub(crate) use connection_handle::ConnectionHandle;
24+
pub(crate) use dataset::PyDataset;
25+
pub(crate) use entry::{PyEntry, PyEntryId, PyEntryKind};
26+
pub(crate) use errors::to_py_err;
27+
pub(crate) use table::PyTable;
28+
pub(crate) use task::{PyTask, PyTasks};
2729

2830
/// Register the `rerun.catalog` module.
2931
pub(crate) fn register(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
@@ -34,6 +36,8 @@ pub(crate) fn register(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()>
3436
m.add_class::<PyEntry>()?;
3537
m.add_class::<PyDataset>()?;
3638
m.add_class::<PyTable>()?;
39+
m.add_class::<PyTask>()?;
40+
m.add_class::<PyTasks>()?;
3741

3842
m.add_class::<PyDataframeQueryView>()?;
3943

rerun_py/src/catalog/task.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use pyo3::exceptions::PyIndexError;
2+
use pyo3::{pyclass, pymethods, Py, PyRef, PyResult, Python};
3+
4+
use re_protos::common::v1alpha1::TaskId;
5+
6+
use crate::catalog::PyCatalogClient;
7+
8+
/// A handle on a remote task.
9+
#[pyclass(name = "Task")]
10+
pub struct PyTask {
11+
pub client: Py<PyCatalogClient>,
12+
13+
pub id: TaskId,
14+
}
15+
16+
/// A handle on a remote task.
17+
#[pymethods]
18+
impl PyTask {
19+
/// Entry id as a string.
20+
pub fn __repr__(&self) -> String {
21+
format!("Task({})", self.id.id)
22+
}
23+
24+
/// The task id.
25+
#[getter]
26+
pub fn id(&self) -> String {
27+
self.id.id.clone()
28+
}
29+
30+
/// Block until the task is completed or the timeout is reached.
31+
///
32+
/// A `TimeoutError` is raised if the timeout is reached.
33+
pub fn wait(&self, py: Python<'_>, timeout_secs: u64) -> PyResult<()> {
34+
let mut connection = self.client.borrow(py).connection().clone();
35+
let timeout = std::time::Duration::from_secs(timeout_secs);
36+
connection.wait_for_tasks(py, &[self.id.clone()], timeout)?;
37+
38+
Ok(())
39+
}
40+
41+
//TODO(ab): add method to poll about status
42+
}
43+
44+
/// A collection of [`Task`].
45+
#[allow(rustdoc::broken_intra_doc_links)]
46+
#[pyclass(name = "Tasks")]
47+
pub struct PyTasks {
48+
pub client: Py<PyCatalogClient>,
49+
50+
pub ids: Vec<TaskId>,
51+
}
52+
53+
#[pymethods]
54+
impl PyTasks {
55+
/// Block until all tasks are completed or the timeout is reached.
56+
///
57+
/// A `TimeoutError` is raised if the timeout is reached.
58+
pub fn wait(self_: PyRef<'_, Self>, timeout_secs: u64) -> PyResult<()> {
59+
let mut connection = self_.client.borrow(self_.py()).connection().clone();
60+
let timeout = std::time::Duration::from_secs(timeout_secs);
61+
connection.wait_for_tasks(self_.py(), &self_.ids, timeout)?;
62+
63+
Ok(())
64+
}
65+
66+
//TODO(ab): add method to poll about status (how many are done, etc.)
67+
68+
//
69+
// Sequence methods
70+
//
71+
72+
fn __len__(&self) -> usize {
73+
self.ids.len()
74+
}
75+
76+
/// Get the task at the given index.
77+
fn __getitem__(&self, py: Python<'_>, index: usize) -> PyResult<PyTask> {
78+
if index >= self.ids.len() {
79+
return Err(PyIndexError::new_err("Index out of range"));
80+
}
81+
82+
Ok(PyTask {
83+
client: self.client.clone_ref(py),
84+
id: self.ids[index].clone(),
85+
})
86+
}
87+
}

scripts/ci/python_check_signatures.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,13 @@ def __eq__(self, other: Any) -> bool:
6262
if not isinstance(other, APIDef):
6363
return NotImplemented
6464

65-
if self.name in ("__init__", "__iter__"):
66-
# Ignore the signature of __init__ and __new__ methods
65+
if self.name in ("__init__", "__iter__", "__len__"):
66+
# pyo3 has a special way to handle these methods that makes it impossible to match everything.
6767
# TODO(#7779): Remove this special case once we have a better way to handle these methods
6868
return self.name == other.name and self.signature == other.signature
69+
elif self.name in ("__getitem__"):
70+
# TODO(#7779): It's somehow even worse for these.
71+
return self.name == other.name
6972
else:
7073
return self.name == other.name and self.signature == other.signature and self.doc == other.doc
7174

0 commit comments

Comments
 (0)