diff --git a/src/lib.rs b/src/lib.rs index 5c063ff..20462ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,7 +189,15 @@ pub mod dev_utils { } } - pub fn construct_fake_dag() -> Dag<&'static str> { + /// A wrapper struct that separates time series and spatial tests + /// into two different DAGs + #[derive(Debug, Clone)] + pub struct ScheduleDag { + pub series: Option>, + pub spatial: Option>, + } + + pub fn construct_fake_dag() -> ScheduleDag { let mut dag: Dag<&'static str> = Dag::new(); let test6 = dag.add_node("test6"); @@ -202,17 +210,24 @@ pub mod dev_utils { let _test1 = dag.add_node_with_children("test1", vec![test2, test3]); - dag + ScheduleDag { + series: Some(dag), + spatial: None, + } } - pub fn construct_hardcoded_dag() -> Dag<&'static str> { - let mut dag: Dag<&'static str> = Dag::new(); + pub fn construct_hardcoded_dag() -> ScheduleDag { + let mut series: Dag<&'static str> = Dag::new(); + series.add_node("dip_check"); + series.add_node("step_check"); - dag.add_node("dip_check"); - dag.add_node("step_check"); - dag.add_node("buddy_check"); - dag.add_node("sct"); + let mut spatial: Dag<&'static str> = Dag::new(); + spatial.add_node("buddy_check"); + spatial.add_node("sct"); - dag + ScheduleDag { + series: Some(series), + spatial: Some(spatial), + } } } diff --git a/src/scheduler.rs b/src/scheduler.rs index adba0e1..c0fc2b2 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,6 +1,7 @@ use crate::{ dag::{Dag, NodeId}, data_switch::{self, DataSwitch, Polygon, SeriesCache, SpatialCache, Timerange, Timestamp}, + dev_utils::ScheduleDag, harness, // TODO: rethink this dependency? pb::{ValidateSeriesResponse, ValidateSpatialResponse}, @@ -30,20 +31,20 @@ pub enum Error { #[derive(Debug, Clone)] pub struct Scheduler<'a> { // TODO: separate DAGs for series and spatial tests? - dag: Dag<&'static str>, + dag: ScheduleDag, data_switch: DataSwitch<'a>, } impl<'a> Scheduler<'a> { /// Instantiate a new scheduler - pub fn new(dag: Dag<&'static str>, data_switch: DataSwitch<'a>) -> Self { + pub fn new(dag: ScheduleDag, data_switch: DataSwitch<'a>) -> Self { Scheduler { dag, data_switch } } /// Construct a subdag of the given dag with only the required nodes, and their /// dependencies. fn construct_subdag( - &self, + dag: &Dag<&'static str>, required_nodes: &[impl AsRef], ) -> Result, Error> { fn add_descendants( @@ -72,18 +73,17 @@ impl<'a> Scheduler<'a> { let mut nodes_visited: HashMap = HashMap::new(); for required in required_nodes.iter() { - let index = self - .dag + let index = dag .index_lookup .get(required.as_ref()) .ok_or(Error::TestNotInDag(required.as_ref().to_string()))?; if !nodes_visited.contains_key(index) { - let subdag_index = subdag.add_node(self.dag.nodes.get(*index).unwrap().elem); + let subdag_index = subdag.add_node(dag.nodes.get(*index).unwrap().elem); nodes_visited.insert(*index, subdag_index); - add_descendants(&self.dag, &mut subdag, *index, &mut nodes_visited); + add_descendants(dag, &mut subdag, *index, &mut nodes_visited); } } @@ -161,6 +161,7 @@ impl<'a> Scheduler<'a> { // the `out_stream` will not be polled after client disconnect let (tx, rx) = channel(subdag.nodes.len()); tokio::spawn(async move { + // TODO(manuel): add IntervalStream/SleepStream here? And add `interval` field to Scheduler? let mut children_completed_map: HashMap = HashMap::new(); let mut test_futures = FuturesUnordered::new(); @@ -259,7 +260,7 @@ impl<'a> Scheduler<'a> { } }; - let subdag = self.construct_subdag(tests)?; + let subdag = Scheduler::construct_subdag(self.dag.series.as_ref().unwrap(), tests)?; Ok(Scheduler::schedule_tests_series(subdag, data)) } @@ -312,7 +313,7 @@ impl<'a> Scheduler<'a> { } }; - let subdag = self.construct_subdag(tests)?; + let subdag = Scheduler::construct_subdag(self.dag.spatial.as_ref().unwrap(), tests)?; Ok(Scheduler::schedule_tests_spatial(subdag, data)) } @@ -327,13 +328,17 @@ mod tests { fn test_construct_subdag() { let rove_service = Scheduler::new(construct_fake_dag(), DataSwitch::new(HashMap::new())); - assert_eq!(rove_service.dag.count_edges(), 6); + assert_eq!(rove_service.dag.series.as_ref().unwrap().count_edges(), 6); - let subdag = rove_service.construct_subdag(&vec!["test4"]).unwrap(); + let subdag = + Scheduler::construct_subdag(rove_service.dag.series.as_ref().unwrap(), &vec!["test4"]) + .unwrap(); assert_eq!(subdag.count_edges(), 1); - let subdag = rove_service.construct_subdag(&vec!["test1"]).unwrap(); + let subdag = + Scheduler::construct_subdag(rove_service.dag.series.as_ref().unwrap(), &vec!["test1"]) + .unwrap(); assert_eq!(subdag.count_edges(), 6); } diff --git a/src/server.rs b/src/server.rs index afd6573..2a7999e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,6 @@ use crate::{ - dag::Dag, data_switch::{DataSwitch, GeoPoint, Polygon, Timerange, Timestamp}, + dev_utils::ScheduleDag, pb::{ rove_server::{Rove, RoveServer}, ValidateSeriesRequest, ValidateSeriesResponse, ValidateSpatialRequest, @@ -161,7 +161,7 @@ impl Rove for Scheduler<'static> { async fn start_server_inner( listener: ListenerType, data_switch: DataSwitch<'static>, - dag: Dag<&'static str>, + dag: ScheduleDag, ) -> Result<(), Box> { let rove_service = Scheduler::new(dag, data_switch); @@ -192,7 +192,7 @@ async fn start_server_inner( pub async fn start_server_unix_listener( stream: UnixListenerStream, data_switch: DataSwitch<'static>, - dag: Dag<&'static str>, + dag: ScheduleDag, ) -> Result<(), Box> { start_server_inner(ListenerType::UnixListener(stream), data_switch, dag).await } @@ -205,7 +205,7 @@ pub async fn start_server_unix_listener( pub async fn start_server( addr: SocketAddr, data_switch: DataSwitch<'static>, - dag: Dag<&'static str>, + dag: ScheduleDag, ) -> Result<(), Box> { start_server_inner(ListenerType::Addr(addr), data_switch, dag).await } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 3c0ca59..4fef72b 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -2,8 +2,8 @@ use core::future::Future; use pb::{rove_client::RoveClient, Flag, ValidateSeriesRequest, ValidateSpatialRequest}; use rove::{ data_switch::{DataConnector, DataSwitch}, - dev_utils::{construct_fake_dag, construct_hardcoded_dag, TestDataSource}, - start_server_unix_listener, Dag, + dev_utils::{construct_fake_dag, construct_hardcoded_dag, ScheduleDag, TestDataSource}, + start_server_unix_listener, }; use std::{collections::HashMap, sync::Arc}; use tempfile::NamedTempFile; @@ -21,7 +21,7 @@ const DATA_LEN_SPATIAL: usize = 1000; pub async fn set_up_rove( data_switch: DataSwitch<'static>, - dag: Dag<&'static str>, + dag: ScheduleDag, ) -> (impl Future, RoveClient) { let coordintor_socket = NamedTempFile::new().unwrap(); let coordintor_socket = Arc::new(coordintor_socket.into_temp_path());