From 7a1fc9c813c0330037d5de35494a6c38e88b6a9f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 19 Jan 2020 20:42:22 +0100 Subject: [PATCH 1/2] Wiring development --- Cargo.toml | 2 +- examples/single_directed.rs | 22 +++++++++++++++++++++ src/lib.rs | 2 +- src/stream/flow/map.rs | 33 +++++++++++++++++++++++++++++--- src/stream/sink/ignore.rs | 26 ++++++++++++++++++++++--- src/stream/source/single.rs | 32 ++++++++++++++++++++++++++----- src/stream/stage/graph.rs | 12 ++++++------ src/stream/stage/mod.rs | 2 +- src/stream/topology/architect.rs | 33 +++++++++++++++++++++----------- src/stream/topology/container.rs | 7 +++++++ src/stream/topology/mod.rs | 3 ++- 11 files changed, 142 insertions(+), 32 deletions(-) create mode 100644 examples/single_directed.rs create mode 100644 src/stream/topology/container.rs diff --git a/Cargo.toml b/Cargo.toml index 445699b..df84d06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ pin-utils = "0.1.0-alpha.4" slab = "0.4.2" streaming-iterator = "0.1.4" objekt-clonable = "0.2.2" -multiqueue = "0.3.2" +multiqueue2 = "0.1.6" crossbeam-channel = "0.3.9" bus = "2.2.2" diff --git a/examples/single_directed.rs b/examples/single_directed.rs new file mode 100644 index 0000000..8f0f38b --- /dev/null +++ b/examples/single_directed.rs @@ -0,0 +1,22 @@ +use bastion_streams::stream::source::single::Single; +use bastion_streams::stream::flow::map::Map; +use bastion_streams::stream::sink::ignore::Ignore; +use bastion_streams::stream::topology::architect::Architect; +use bastion_streams::stream::stage::prelude::*; +use bastion_streams::stream::topology::container::Container; + +fn main() { + let single = Single::::new(999); + let mapper = Map::::new(Box::new(|x: i32| { + x+1 + })); + let sink = Ignore::::new(); + + let stages = vec![ + Container::Source(Box::new(single)), + Container::Transform(Box::new(mapper)), + Container::Sink(Box::new(sink)) + ]; + let mut architect = Architect::graph(stages); + architect.visit_stages(); +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 8dd59d6..d71bd41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(clone_closures)] +//#![feature(clone_closures)] #[macro_use] pub mod stream; diff --git a/src/stream/flow/map.rs b/src/stream/flow/map.rs index 0004312..6e53cdd 100644 --- a/src/stream/flow/map.rs +++ b/src/stream/flow/map.rs @@ -2,11 +2,16 @@ use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; use objekt_clonable::clonable; +use std::mem::MaybeUninit; + #[clonable] pub trait MapClosure: Fn(I) -> O + Clone + Send + Sync + 'static {} +impl MapClosure for T where T: Fn(I) -> O + Clone + Send + Sync + 'static {} + type MapFn = Box>; + pub struct Map { pub shape: FlowShape<'static, I, O>, pub stage_id: usize, @@ -21,6 +26,28 @@ pub struct Map { pub logic: GraphStageLogic, } +impl Map +where + I: Clone, + O: Clone, +{ + pub fn new(map_fn: MapFn) -> Self { + Self { + shape: unsafe { MaybeUninit::uninit().assume_init() }, + stage_id: unsafe { MaybeUninit::uninit().assume_init() }, + + map_fn, + + demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + + in_handler: unsafe { MaybeUninit::uninit().assume_init() }, + out_handler: unsafe { MaybeUninit::uninit().assume_init() }, + logic: unsafe { MaybeUninit::uninit().assume_init() }, + } + } +} + ///// Map handler /////////////////////////// #[derive(Clone)] @@ -111,7 +138,7 @@ impl Default for MapHandler { } } -impl<'a, I, O> GraphStage<'a> for Map +impl GraphStage for Map where I: Clone + Send + Sync + 'static, O: Clone + Send + Sync + 'static, @@ -126,7 +153,7 @@ where }; } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { self.demand_tx = tx; self.demand_rx = rx; } @@ -160,7 +187,7 @@ where gsl } - fn get_shape(&'a self) -> ShapeType { + fn get_shape(&self) -> ShapeType { self.shape.shape_type() } } diff --git a/src/stream/sink/ignore.rs b/src/stream/sink/ignore.rs index 4784537..9668420 100644 --- a/src/stream/sink/ignore.rs +++ b/src/stream/sink/ignore.rs @@ -1,6 +1,7 @@ use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; +use std::mem::MaybeUninit; pub struct Ignore { pub shape: SinkShape<'static, I>, @@ -14,6 +15,25 @@ pub struct Ignore { pub logic: GraphStageLogic, } +impl Ignore +where + I: Clone +{ + pub fn new() -> Self { + Self { + shape: unsafe { MaybeUninit::uninit().assume_init() }, + stage_id: unsafe { MaybeUninit::uninit().assume_init() }, + + demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + + in_handler: unsafe { MaybeUninit::uninit().assume_init() }, + out_handler: unsafe { MaybeUninit::uninit().assume_init() }, + logic: unsafe { MaybeUninit::uninit().assume_init() }, + } + } +} + #[derive(Clone)] struct IgnoreHandler { pub stage_id: usize, @@ -56,7 +76,7 @@ impl InHandler for IgnoreHandler } } -impl<'a, I> GraphStage<'a> for Ignore +impl GraphStage for Ignore where I: Clone + 'static, { @@ -67,7 +87,7 @@ impl<'a, I> GraphStage<'a> for Ignore }; } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { self.demand_tx = tx; self.demand_rx = rx; } @@ -93,7 +113,7 @@ impl<'a, I> GraphStage<'a> for Ignore gsl } - fn get_shape(&'a self) -> ShapeType { + fn get_shape(&self) -> ShapeType { let shape: &dyn Shape = &self.shape; shape.shape_type() } diff --git a/src/stream/source/single.rs b/src/stream/source/single.rs index 698f056..3b443cb 100644 --- a/src/stream/source/single.rs +++ b/src/stream/source/single.rs @@ -2,11 +2,13 @@ use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; +use std::mem::MaybeUninit; + pub struct Single { pub shape: SourceShape<'static, O>, - pub elem: O, + pub element: O, pub demand_rx: BroadcastReceiver, pub demand_tx: BroadcastSender, @@ -16,6 +18,26 @@ pub struct Single { pub logic: GraphStageLogic, } +impl Single +where + O: Clone +{ + pub fn new(element: O) -> Self { + Self { + shape: unsafe { MaybeUninit::uninit().assume_init() }, + + element, + + demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + + in_handler: unsafe { MaybeUninit::uninit().assume_init() }, + out_handler: unsafe { MaybeUninit::uninit().assume_init() }, + logic: unsafe { MaybeUninit::uninit().assume_init() }, + } + } +} + #[derive(Clone, Debug)] struct SingleHandler { elem: O, @@ -45,7 +67,7 @@ impl OutHandler for SingleHandler } } -impl<'a, O> GraphStage<'a> for Single +impl GraphStage for Single where O: Clone + 'static, { @@ -56,7 +78,7 @@ where }; } - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver) { + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { self.demand_tx = tx; self.demand_rx = rx; } @@ -67,7 +89,7 @@ where let (tx, rx) = unbounded(); self.out_handler = Box::new(SingleHandler { - elem: self.elem.clone(), + elem: self.element.clone(), tx, rx, }); @@ -80,7 +102,7 @@ where gsl } - fn get_shape(&'a self) -> ShapeType { + fn get_shape(&self) -> ShapeType { let shape: &dyn Shape = &self.shape; shape.shape_type() } diff --git a/src/stream/stage/graph.rs b/src/stream/stage/graph.rs index c21d5b6..c9cdcd2 100644 --- a/src/stream/stage/graph.rs +++ b/src/stream/stage/graph.rs @@ -6,14 +6,14 @@ use crate::stream::stage::lets::{Inlet, Outlet}; use crate::stream::stage::shape::{Shape, ShapeType}; use crate::stream::stage::demand::{Demand}; -use multiqueue::{BroadcastSender, BroadcastReceiver}; +use multiqueue2::{BroadcastSender, BroadcastReceiver}; -pub trait GraphStage<'a> { - fn build_shape(&'a mut self); - fn build_demand(&'a mut self, tx: BroadcastSender, rx: BroadcastReceiver); - fn create_logic(&'a mut self, attributes: Attributes) -> GraphStageLogic; +pub trait GraphStage { + fn build_shape(&mut self); + fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver); + fn create_logic(&mut self, attributes: Attributes) -> GraphStageLogic; - fn get_shape(&'a self) -> ShapeType; + fn get_shape(&self) -> ShapeType; } /////////////// diff --git a/src/stream/stage/mod.rs b/src/stream/stage/mod.rs index 24abccc..7799f78 100644 --- a/src/stream/stage/mod.rs +++ b/src/stream/stage/mod.rs @@ -8,7 +8,7 @@ pub mod demand; pub mod error; pub mod prelude { - pub use multiqueue::{BroadcastReceiver, BroadcastSender}; + pub use multiqueue2::{BroadcastReceiver, BroadcastSender}; pub use super::attributes::*; pub use super::graph::*; pub use super::handlers::*; diff --git a/src/stream/topology/architect.rs b/src/stream/topology/architect.rs index c7dcfd8..cef827f 100644 --- a/src/stream/topology/architect.rs +++ b/src/stream/topology/architect.rs @@ -2,19 +2,20 @@ use crate::stream::stage::demand::{Demand}; use crate::stream::stage::graph::GraphStage; use crate::stream::stage::shape::ShapeType; -use multiqueue::{broadcast_queue, BroadcastReceiver, BroadcastSender}; +use multiqueue2::{broadcast_queue, BroadcastReceiver, BroadcastSender}; +use crate::stream::topology::container::Container; -pub struct Architect<'a> { +pub struct Architect { demand_tx: BroadcastSender, demand_rx: BroadcastReceiver, - stages: Vec>> + stages: Vec } -impl<'a> Architect<'a> { - pub fn graph(stages: Vec>>) -> Architect { - let stage_count = stages.len(); +impl Architect { + pub fn graph(stages: Vec) -> Architect { + let stage_count = stages.len() * 2; let (demand_tx, demand_rx) = broadcast_queue(stage_count as u64); @@ -29,20 +30,30 @@ impl<'a> Architect<'a> { unimplemented!() } - fn check_bounds(&'a self) { + pub fn check_bounds(&self) { if let Some(root) = self.stages.first() { - if root.get_shape() != ShapeType::Source { - unimplemented!() + use Container::*; + + match root { + Transform(_) | Sink(_) => { + panic!("Stage traversal failed. Stream graphs start with a Source.") + }, + _ => () } } } - fn visit_stages(&'a mut self) { + pub fn visit_stages(&mut self) { let tx = self.demand_tx.clone(); let rx = self.demand_rx.add_stream(); self.stages.iter_mut().for_each(|stage| { - stage.build_demand(tx.clone(), rx.clone()) + use Container::*; + + match stage { + Source(s) | Transform(s) | Sink(s) => + s.build_demand(tx.clone(), rx.clone()) + } }); } } diff --git a/src/stream/topology/container.rs b/src/stream/topology/container.rs new file mode 100644 index 0000000..8eee811 --- /dev/null +++ b/src/stream/topology/container.rs @@ -0,0 +1,7 @@ +use crate::stream::stage::graph::GraphStage; + +pub enum Container { + Source(Box), + Transform(Box), + Sink(Box), +} \ No newline at end of file diff --git a/src/stream/topology/mod.rs b/src/stream/topology/mod.rs index 6917e6c..6898d26 100644 --- a/src/stream/topology/mod.rs +++ b/src/stream/topology/mod.rs @@ -1 +1,2 @@ -pub mod architect; +pub mod container; +pub mod architect; \ No newline at end of file From afe0ef5e2069ddd785014b91c246f89421e61ae7 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 20 Jan 2020 11:02:18 +0100 Subject: [PATCH 2/2] Wiring & topology development --- examples/single_directed.rs | 15 ++--- src/stream/flow/map.rs | 98 ++++++++++++++++++-------------- src/stream/sink/ignore.rs | 84 +++++++++++++++------------ src/stream/source/single.rs | 75 ++++++++++++++---------- src/stream/stage/demand.rs | 14 +++-- src/stream/stage/error.rs | 1 + src/stream/stage/graph.rs | 10 ++-- src/stream/stage/handlers.rs | 4 -- src/stream/stage/mod.rs | 10 ++-- src/stream/stage/shape.rs | 3 +- src/stream/stage/types.rs | 3 - src/stream/topology/architect.rs | 84 +++++++++++++++++++++------ src/stream/topology/container.rs | 2 +- src/stream/topology/macros.rs | 1 + src/stream/topology/mod.rs | 4 +- tests/test_graph_stage_logic.rs | 5 +- 16 files changed, 251 insertions(+), 162 deletions(-) create mode 100644 src/stream/topology/macros.rs diff --git a/examples/single_directed.rs b/examples/single_directed.rs index 8f0f38b..7351736 100644 --- a/examples/single_directed.rs +++ b/examples/single_directed.rs @@ -1,22 +1,23 @@ -use bastion_streams::stream::source::single::Single; use bastion_streams::stream::flow::map::Map; use bastion_streams::stream::sink::ignore::Ignore; +use bastion_streams::stream::source::single::Single; + use bastion_streams::stream::topology::architect::Architect; -use bastion_streams::stream::stage::prelude::*; use bastion_streams::stream::topology::container::Container; fn main() { let single = Single::::new(999); - let mapper = Map::::new(Box::new(|x: i32| { - x+1 - })); + let mapper = Map::::new(Box::new(|x: i32| x + 1)); let sink = Ignore::::new(); let stages = vec![ Container::Source(Box::new(single)), Container::Transform(Box::new(mapper)), - Container::Sink(Box::new(sink)) + Container::Sink(Box::new(sink)), ]; + let mut architect = Architect::graph(stages); + architect.check_bounds(); architect.visit_stages(); -} \ No newline at end of file + architect.run(); +} diff --git a/src/stream/flow/map.rs b/src/stream/flow/map.rs index 6e53cdd..bd1f6d8 100644 --- a/src/stream/flow/map.rs +++ b/src/stream/flow/map.rs @@ -2,8 +2,6 @@ use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; use objekt_clonable::clonable; -use std::mem::MaybeUninit; - #[clonable] pub trait MapClosure: Fn(I) -> O + Clone + Send + Sync + 'static {} @@ -11,19 +9,18 @@ impl MapClosure for T where T: Fn(I) -> O + Clone + Send + Sync + type MapFn = Box>; - pub struct Map { - pub shape: FlowShape<'static, I, O>, - pub stage_id: usize, + pub shape: Option>, + pub stage_id: Option, pub map_fn: MapFn, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, } impl Map @@ -33,17 +30,17 @@ where { pub fn new(map_fn: MapFn) -> Self { Self { - shape: unsafe { MaybeUninit::uninit().assume_init() }, - stage_id: unsafe { MaybeUninit::uninit().assume_init() }, + shape: None, + stage_id: None, map_fn, - demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, - demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_rx: None, + demand_tx: None, - in_handler: unsafe { MaybeUninit::uninit().assume_init() }, - out_handler: unsafe { MaybeUninit::uninit().assume_init() }, - logic: unsafe { MaybeUninit::uninit().assume_init() }, + in_handler: None, + out_handler: None, + logic: None, } } } @@ -66,16 +63,16 @@ struct MapHandler { } impl OutHandler for MapHandler - where - I: Clone + Send + Sync + 'static, - O: Clone + Send + Sync + 'static, +where + I: Clone + Send + Sync + 'static, + O: Clone + Send + Sync + 'static, { fn name(&self) -> String { String::from("map-flow-out") } fn on_pull(&self) { - unimplemented!() + if let Ok(_elem) = self.out_rx.as_ref().unwrap().try_recv() {} } fn on_downstream_finish(&self) { @@ -88,9 +85,9 @@ impl OutHandler for MapHandler } impl InHandler for MapHandler - where - I: Clone + Send + Sync, - O: Clone + Send + Sync, +where + I: Clone + Send + Sync, + O: Clone + Send + Sync, { fn name(&self) -> String { String::from("map-flow-in") @@ -105,7 +102,7 @@ impl InHandler for MapHandler // todo: on_pull make demand from the upper let demand = Demand { stage_id: self.stage_id, - style: DemandStyle::DemandFull(100) + style: DemandStyle::DemandFull(100), }; self.demand_tx.as_ref().unwrap().try_send(demand).unwrap(); } @@ -147,47 +144,62 @@ where let map_flow_inlet = Inlet::::new(0, "Map.in"); let map_flow_outlet = Outlet::::new(0, "Map.out"); - self.shape = FlowShape { + self.shape = Some(FlowShape { inlet: map_flow_inlet, outlet: map_flow_outlet, - }; + }); } fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (in_tx, in_rx) = unbounded::(); let (out_tx, out_rx) = unbounded::(); - let handler = Box::new(MapHandler { + let handler = Some(Box::new(MapHandler { map_fn: Some(self.map_fn.clone()), in_tx: Some(in_tx), in_rx: Some(in_rx), out_rx: Some(out_rx), out_tx: Some(out_tx), - demand_rx: Some(self.demand_rx.clone()), - demand_tx: Some(self.demand_tx.clone()), - stage_id: self.stage_id - }); + demand_rx: Some(self.demand_rx.as_ref().unwrap().clone()), + demand_tx: Some(self.demand_tx.as_ref().unwrap().clone()), + stage_id, + })); + + self.stage_id = Some(stage_id); - self.in_handler = handler.clone(); - self.out_handler = handler.clone(); + self.in_handler = Some(handler.as_ref().unwrap().clone()); + self.out_handler = Some(handler.as_ref().unwrap().clone()); - let shape = Box::new(self.shape.clone()); + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_inlet_handler(self.shape.inlet.clone(), self.in_handler.clone()); - gsl.set_outlet_handler(self.shape.outlet.clone(), self.out_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_inlet_handler( + self.shape.as_ref().unwrap().inlet.clone(), + self.in_handler.as_ref().unwrap().clone(), + ); + gsl.set_outlet_handler( + self.shape.as_ref().unwrap().outlet.clone(), + self.out_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); } fn get_shape(&self) -> ShapeType { - self.shape.shape_type() + self.shape.as_ref().unwrap().shape_type() + } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() + } + + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() } } diff --git a/src/stream/sink/ignore.rs b/src/stream/sink/ignore.rs index 9668420..7183d04 100644 --- a/src/stream/sink/ignore.rs +++ b/src/stream/sink/ignore.rs @@ -1,35 +1,34 @@ use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; -use std::mem::MaybeUninit; pub struct Ignore { - pub shape: SinkShape<'static, I>, - pub stage_id: usize, + pub shape: Option>, + pub stage_id: Option, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, } impl Ignore where - I: Clone + I: Clone, { pub fn new() -> Self { Self { - shape: unsafe { MaybeUninit::uninit().assume_init() }, - stage_id: unsafe { MaybeUninit::uninit().assume_init() }, + shape: None, + stage_id: None, - demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, - demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_rx: None, + demand_tx: None, - in_handler: unsafe { MaybeUninit::uninit().assume_init() }, - out_handler: unsafe { MaybeUninit::uninit().assume_init() }, - logic: unsafe { MaybeUninit::uninit().assume_init() }, + in_handler: None, + out_handler: None, + logic: None, } } } @@ -46,8 +45,8 @@ struct IgnoreHandler { } impl InHandler for IgnoreHandler - where - I: Clone + 'static, +where + I: Clone + 'static, { fn name(&self) -> String { String::from("ignore-sink-in") @@ -57,11 +56,12 @@ impl InHandler for IgnoreHandler if let Ok(_elem) = self.in_rx.as_ref().unwrap().try_recv() { println!("Ignored"); } else { + println!("Demanding"); // todo: handle error case of try_recv // todo: on_pull make demand from the upper let demand = Demand { stage_id: self.stage_id, - style: DemandStyle::DemandFull(100) + style: DemandStyle::DemandFull(100), }; self.demand_tx.as_ref().unwrap().try_send(demand).unwrap(); } @@ -77,44 +77,56 @@ impl InHandler for IgnoreHandler } impl GraphStage for Ignore - where - I: Clone + 'static, +where + I: Clone + 'static, { fn build_shape(&mut self) { let ignore_sink_inlet = Inlet::::new(0, "Sink.out"); - self.shape = SinkShape { + self.shape = Some(SinkShape { inlet: ignore_sink_inlet, - }; + }); } fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (tx, rx) = unbounded::(); - self.in_handler = Box::new(IgnoreHandler { + self.in_handler = Some(Box::new(IgnoreHandler { in_tx: Some(tx), in_rx: Some(rx), - demand_rx: Some(self.demand_rx.clone()), - demand_tx: Some(self.demand_tx.clone()), - stage_id: self.stage_id - }); + demand_rx: Some(self.demand_rx.as_ref().unwrap().clone()), + demand_tx: Some(self.demand_tx.as_ref().unwrap().clone()), + stage_id, + })); + + self.stage_id = Some(stage_id); - let shape = Box::new(self.shape.clone()); + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_inlet_handler(self.shape.inlet.clone(), self.in_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_inlet_handler( + self.shape.as_ref().unwrap().inlet.clone(), + self.in_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); } fn get_shape(&self) -> ShapeType { - let shape: &dyn Shape = &self.shape; + let shape: &dyn Shape = self.shape.as_ref().unwrap(); shape.shape_type() } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() + } + + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() + } } diff --git a/src/stream/source/single.rs b/src/stream/source/single.rs index 3b443cb..b6617f7 100644 --- a/src/stream/source/single.rs +++ b/src/stream/source/single.rs @@ -1,53 +1,53 @@ - use crate::stream::stage::prelude::*; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::io::Error; -use std::mem::MaybeUninit; - pub struct Single { - pub shape: SourceShape<'static, O>, + pub shape: Option>, + pub stage_id: Option, pub element: O, - pub demand_rx: BroadcastReceiver, - pub demand_tx: BroadcastSender, + pub demand_rx: Option>, + pub demand_tx: Option>, - pub in_handler: Box, - pub out_handler: Box, - pub logic: GraphStageLogic, + pub in_handler: Option>, + pub out_handler: Option>, + pub logic: Option, } impl Single where - O: Clone + O: Clone, { pub fn new(element: O) -> Self { Self { - shape: unsafe { MaybeUninit::uninit().assume_init() }, + shape: None, + stage_id: None, element, - demand_rx: unsafe { MaybeUninit::uninit().assume_init() }, - demand_tx: unsafe { MaybeUninit::uninit().assume_init() }, + demand_rx: None, + demand_tx: None, - in_handler: unsafe { MaybeUninit::uninit().assume_init() }, - out_handler: unsafe { MaybeUninit::uninit().assume_init() }, - logic: unsafe { MaybeUninit::uninit().assume_init() }, + in_handler: None, + out_handler: None, + logic: None, } } } #[derive(Clone, Debug)] struct SingleHandler { + pub stage_id: usize, elem: O, pub rx: Receiver, pub tx: Sender, } impl OutHandler for SingleHandler - where - O: Clone + 'static, +where + O: Clone + 'static, { fn name(&self) -> String { String::from("single-source-out") @@ -69,41 +69,54 @@ impl OutHandler for SingleHandler impl GraphStage for Single where - O: Clone + 'static, + O: Clone + 'static, { fn build_shape(&mut self) { let single_source_outlet = Outlet::::new(0, "Single.out"); - self.shape = SourceShape { + self.shape = Some(SourceShape { outlet: single_source_outlet, - }; + }); } fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver) { - self.demand_tx = tx; - self.demand_rx = rx; + self.demand_tx = Some(tx); + self.demand_rx = Some(rx); } - fn create_logic(&mut self, _attributes: Attributes) -> GraphStageLogic { + fn create_logic(&mut self, stage_id: usize, _attributes: Attributes) { self.build_shape(); let (tx, rx) = unbounded(); - self.out_handler = Box::new(SingleHandler { + self.out_handler = Some(Box::new(SingleHandler { elem: self.element.clone(), tx, rx, - }); + stage_id, + })); - let shape = Box::new(self.shape.clone()); + self.stage_id = Some(stage_id); + + let shape = Box::new(self.shape.as_ref().unwrap().clone()); let mut gsl = GraphStageLogic::from_shape::(shape); - gsl.set_outlet_handler(self.shape.outlet.clone(), self.out_handler.clone()); - self.logic = gsl.clone(); - gsl + gsl.set_outlet_handler( + self.shape.as_ref().unwrap().outlet.clone(), + self.out_handler.as_ref().unwrap().clone(), + ); + self.logic = Some(gsl); } fn get_shape(&self) -> ShapeType { - let shape: &dyn Shape = &self.shape; + let shape: &dyn Shape = self.shape.as_ref().unwrap(); shape.shape_type() } + + fn get_stage_id(&self) -> usize { + self.stage_id.unwrap() + } + + fn get_logic(&self) -> &GraphStageLogic { + self.logic.as_ref().unwrap() + } } diff --git a/src/stream/stage/demand.rs b/src/stream/stage/demand.rs index 78d7cce..2227ada 100644 --- a/src/stream/stage/demand.rs +++ b/src/stream/stage/demand.rs @@ -1,20 +1,26 @@ -use crossbeam_channel::{Sender, Receiver}; +use crossbeam_channel::{Receiver, Sender}; #[derive(Clone, Debug)] pub enum DemandStyle { DemandFull(usize), - DemandPartial(usize, usize) + DemandPartial(usize, usize), } #[derive(Clone, Debug)] pub struct Demand { pub stage_id: usize, - pub style: DemandStyle + pub style: DemandStyle, +} + +impl Demand { + pub fn new(stage_id: usize, style: DemandStyle) -> Self { + Demand { stage_id, style } + } } // Demand endpoint struct #[derive(Clone, Debug)] pub struct Demander { pub tx: Sender, - pub rx: Receiver + pub rx: Receiver, } diff --git a/src/stream/stage/error.rs b/src/stream/stage/error.rs index e69de29..8b13789 100644 --- a/src/stream/stage/error.rs +++ b/src/stream/stage/error.rs @@ -0,0 +1 @@ + diff --git a/src/stream/stage/graph.rs b/src/stream/stage/graph.rs index c9cdcd2..5312fe1 100644 --- a/src/stream/stage/graph.rs +++ b/src/stream/stage/graph.rs @@ -1,19 +1,19 @@ - - use crate::stream::stage::attributes::Attributes; use crate::stream::stage::handlers::*; use crate::stream::stage::lets::{Inlet, Outlet}; use crate::stream::stage::shape::{Shape, ShapeType}; -use crate::stream::stage::demand::{Demand}; -use multiqueue2::{BroadcastSender, BroadcastReceiver}; +use crate::stream::stage::demand::Demand; +use multiqueue2::{BroadcastReceiver, BroadcastSender}; pub trait GraphStage { fn build_shape(&mut self); fn build_demand(&mut self, tx: BroadcastSender, rx: BroadcastReceiver); - fn create_logic(&mut self, attributes: Attributes) -> GraphStageLogic; + fn create_logic(&mut self, stage_id: usize, attributes: Attributes); fn get_shape(&self) -> ShapeType; + fn get_stage_id(&self) -> usize; + fn get_logic(&self) -> &GraphStageLogic; } /////////////// diff --git a/src/stream/stage/handlers.rs b/src/stream/stage/handlers.rs index d562ffc..a1ea901 100644 --- a/src/stream/stage/handlers.rs +++ b/src/stream/stage/handlers.rs @@ -1,10 +1,6 @@ use futures::io; use objekt_clonable::*; - - - - //#[clonable] //pub trait Handler: Clone {} ////impl Handler for T where T: Clone {} diff --git a/src/stream/stage/mod.rs b/src/stream/stage/mod.rs index 7799f78..c93ca37 100644 --- a/src/stream/stage/mod.rs +++ b/src/stream/stage/mod.rs @@ -1,20 +1,20 @@ pub mod attributes; +pub mod demand; +pub mod error; pub mod graph; pub mod handlers; pub mod lets; pub mod shape; pub mod types; -pub mod demand; -pub mod error; pub mod prelude { - pub use multiqueue2::{BroadcastReceiver, BroadcastSender}; pub use super::attributes::*; + pub use super::demand::*; + pub use super::error::*; pub use super::graph::*; pub use super::handlers::*; pub use super::lets::*; pub use super::shape::*; pub use super::types::*; - pub use super::demand::*; - pub use super::error::*; + pub use multiqueue2::{BroadcastReceiver, BroadcastSender}; } diff --git a/src/stream/stage/shape.rs b/src/stream/stage/shape.rs index 0fe2943..c8329a2 100644 --- a/src/stream/stage/shape.rs +++ b/src/stream/stage/shape.rs @@ -4,10 +4,9 @@ use crate::stream::stage::lets::{Inlet, Outlet}; pub enum ShapeType { Source, Flow, - Sink + Sink, } - pub trait Shape<'a, I, O> { fn shape_type(&self) -> ShapeType; fn inlets(&self) -> Vec>; diff --git a/src/stream/stage/types.rs b/src/stream/stage/types.rs index a1f1de2..4b702c5 100644 --- a/src/stream/stage/types.rs +++ b/src/stream/stage/types.rs @@ -1,7 +1,4 @@ - - pub struct NotUsed(); /// Exhaust pub(crate) struct Exhaust(); - diff --git a/src/stream/topology/architect.rs b/src/stream/topology/architect.rs index cef827f..1eb106a 100644 --- a/src/stream/topology/architect.rs +++ b/src/stream/topology/architect.rs @@ -1,44 +1,77 @@ -use crate::stream::stage::demand::{Demand}; +use crate::stream::stage::demand::{Demand, DemandStyle}; +use crate::stream::stage::attributes::Attributes; use crate::stream::stage::graph::GraphStage; -use crate::stream::stage::shape::ShapeType; -use multiqueue2::{broadcast_queue, BroadcastReceiver, BroadcastSender}; + use crate::stream::topology::container::Container; +use multiqueue2::{broadcast_queue, BroadcastReceiver, BroadcastSender}; pub struct Architect { demand_tx: BroadcastSender, demand_rx: BroadcastReceiver, - stages: Vec + stages: Vec, } - impl Architect { pub fn graph(stages: Vec) -> Architect { let stage_count = stages.len() * 2; - let (demand_tx, demand_rx) = - broadcast_queue(stage_count as u64); + let (demand_tx, demand_rx) = broadcast_queue(stage_count as u64); Architect { demand_rx, demand_tx, - stages + stages, } } - pub fn run() { - unimplemented!() + pub fn run(&mut self) { + if let Some(Container::Sink(sink)) = self.stages.last() { + let stage_id = (**sink).get_stage_id(); + let demand = Demand::new(stage_id, DemandStyle::DemandFull(100)); + self.demand_tx.try_send(demand); + self.stages.iter_mut().rev().for_each(|c| { + use Container::*; + + match c { + Source(s) | Transform(s) | Sink(s) => { + for x in (**s).get_logic().in_handlers.iter() { + dbg!("on_push"); + x.on_push(); + } + + for x in (**s).get_logic().out_handlers.iter() { + dbg!("on_pull"); + x.on_pull(); + } + } + } + }); + } else { + panic!("Run failed"); + } } pub fn check_bounds(&self) { - if let Some(root) = self.stages.first() { + if let Some(first_stage) = self.stages.first() { use Container::*; - match root { + match first_stage { Transform(_) | Sink(_) => { - panic!("Stage traversal failed. Stream graphs start with a Source.") - }, - _ => () + panic!("Stage traversal failed. Stream graph starts with a Source.") + } + _ => (), + } + } + + if let Some(last_stage) = self.stages.last() { + use Container::*; + + match last_stage { + Source(_) | Transform(_) => { + panic!("Stage traversal failed. Stream graph ends with a Sink.") + } + _ => (), } } } @@ -51,9 +84,26 @@ impl Architect { use Container::*; match stage { - Source(s) | Transform(s) | Sink(s) => - s.build_demand(tx.clone(), rx.clone()) + Source(s) | Transform(s) | Sink(s) => { + s.build_demand(tx.clone(), rx.clone()); + } + _ => (), } }); + + self.stages + .iter_mut() + .rev() + .enumerate() + .for_each(|(stage_id, stage)| { + use Container::*; + + match stage { + Source(s) | Transform(s) | Sink(s) => { + s.create_logic(stage_id, Attributes {}); + } + _ => (), + } + }); } } diff --git a/src/stream/topology/container.rs b/src/stream/topology/container.rs index 8eee811..745b400 100644 --- a/src/stream/topology/container.rs +++ b/src/stream/topology/container.rs @@ -4,4 +4,4 @@ pub enum Container { Source(Box), Transform(Box), Sink(Box), -} \ No newline at end of file +} diff --git a/src/stream/topology/macros.rs b/src/stream/topology/macros.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/stream/topology/macros.rs @@ -0,0 +1 @@ + diff --git a/src/stream/topology/mod.rs b/src/stream/topology/mod.rs index 6898d26..23c9b59 100644 --- a/src/stream/topology/mod.rs +++ b/src/stream/topology/mod.rs @@ -1,2 +1,4 @@ +pub mod architect; pub mod container; -pub mod architect; \ No newline at end of file +#[macro_use] +pub mod macros; diff --git a/tests/test_graph_stage_logic.rs b/tests/test_graph_stage_logic.rs index dcacd31..7d45d3e 100644 --- a/tests/test_graph_stage_logic.rs +++ b/tests/test_graph_stage_logic.rs @@ -2,12 +2,11 @@ mod tests { use bastion_streams::stream::stage::graph::GraphStageLogic; use bastion_streams::stream::stage::handlers::{InHandler, OutHandler}; - use bastion_streams::stream::stage::lets::{Outlet}; + use bastion_streams::stream::stage::lets::Outlet; use bastion_streams::stream::stage::shape::SourceShape; - + use bastion_streams::stream::stage::types::NotUsed; use futures::io::Error; - // let inlet0 = Inlet::::new(0, "in0"); // let inlet1 = Inlet::::new(1, "in1");