From 64db953aed4b25bc41000dd48ae3453c30f0f763 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 1 Mar 2020 19:06:05 +0100 Subject: [PATCH 1/3] here we go --- bastion/Cargo.toml | 4 ++ bastion/src/parallel/macros.rs | 0 bastion/src/parallel/mod.rs | 1 + bastion/src/parallel/ops.rs | 0 bastion/src/parallel/process.rs | 84 +++++++++++++++++++++++++++++++++ 5 files changed, 89 insertions(+) create mode 100644 bastion/src/parallel/macros.rs create mode 100644 bastion/src/parallel/mod.rs create mode 100644 bastion/src/parallel/ops.rs create mode 100644 bastion/src/parallel/process.rs diff --git a/bastion/Cargo.toml b/bastion/Cargo.toml index 8e18c9c4..a2b7925a 100644 --- a/bastion/Cargo.toml +++ b/bastion/Cargo.toml @@ -49,6 +49,10 @@ futures-timer = "3.0.0" fxhash = "0.2" lazy_static = "1.4" log = "0.4" +ipc-channel = "0.14.0" +serde = { version = "1.0", features = ["derive"] } + + # TODO: https://github.com/cogciprocate/qutex/pull/5 # TODO: https://github.com/cogciprocate/qutex/pull/6 bastion-qutex = { version = "0.2", features = ["async_await"] } diff --git a/bastion/src/parallel/macros.rs b/bastion/src/parallel/macros.rs new file mode 100644 index 00000000..e69de29b diff --git a/bastion/src/parallel/mod.rs b/bastion/src/parallel/mod.rs new file mode 100644 index 00000000..01eafd2e --- /dev/null +++ b/bastion/src/parallel/mod.rs @@ -0,0 +1 @@ +pub mod ops; diff --git a/bastion/src/parallel/ops.rs b/bastion/src/parallel/ops.rs new file mode 100644 index 00000000..e69de29b diff --git a/bastion/src/parallel/process.rs b/bastion/src/parallel/process.rs new file mode 100644 index 00000000..55e29145 --- /dev/null +++ b/bastion/src/parallel/process.rs @@ -0,0 +1,84 @@ +use std::collections::HashMap; +use std::ffi::OsStr; +use std::ffi::OsString; +use std::process::Stdio; +use crate::callbacks::Callbacks; + +pub struct ProcessData { + callbacks: Callbacks +} + + +#[derive(Debug, Default)] +pub struct Builder { + pub(crate) stdin: Option, + pub(crate) stdout: Option, + pub(crate) stderr: Option, + pub(crate) envs: HashMap, +} + +impl Builder { + pub fn new() -> Self { + Self { + stdin: None, + stdout: None, + stderr: None, + envs: std::env::vars_os().collect(), + } + } + + /// Set an environment variable in the spawned process. Equivalent to `Command::env` + pub fn env(&mut self, key: K, val: V) -> &mut Self + where + K: AsRef, + V: AsRef, + { + self.envs + .insert(key.as_ref().to_owned(), val.as_ref().to_owned()); + self + } + + /// Set environment variables in the spawned process. Equivalent to `Command::envs` + pub fn envs(&mut self, vars: I) -> &mut Self + where + I: IntoIterator, + K: AsRef, + V: AsRef, + { + self.envs.extend( + vars.into_iter() + .map(|(k, v)| (k.as_ref().to_owned(), v.as_ref().to_owned())), + ); + self + } + + /// Removes an environment variable in the spawned process. Equivalent to `Command::env_remove` + pub fn env_remove>(&mut self, key: K) -> &mut Self { + self.envs.remove(key.as_ref()); + self + } + + /// Clears all environment variables in the spawned process. Equivalent to `Command::env_clear` + pub fn env_clear(&mut self) -> &mut Self { + self.envs.clear(); + self + } + + /// Captures the `stdin` of the spawned process, allowing you to manually send data via `JoinHandle::stdin` + pub fn stdin>(&mut self, cfg: T) -> &mut Self { + self.stdin = Some(cfg.into()); + self + } + + /// Captures the `stdout` of the spawned process, allowing you to manually receive data via `JoinHandle::stdout` + pub fn stdout>(&mut self, cfg: T) -> &mut Self { + self.stdout = Some(cfg.into()); + self + } + + /// Captures the `stderr` of the spawned process, allowing you to manually receive data via `JoinHandle::stderr` + pub fn stderr>(&mut self, cfg: T) -> &mut Self { + self.stderr = Some(cfg.into()); + self + } +} From 3efb07c059dfb6ddbe8e0d05fb4dff0fd7a7545f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 1 Mar 2020 21:21:31 +0100 Subject: [PATCH 2/3] Callback refactor --- bastion/src/lib.rs | 2 ++ bastion/src/parallel/callbacks.rs | 27 +++++++++++++++++++ bastion/src/parallel/mod.rs | 2 ++ bastion/src/parallel/process.rs | 45 +++++++++++++++++++++++++------ 4 files changed, 68 insertions(+), 8 deletions(-) create mode 100644 bastion/src/parallel/callbacks.rs diff --git a/bastion/src/lib.rs b/bastion/src/lib.rs index bc08f12c..cafd0261 100644 --- a/bastion/src/lib.rs +++ b/bastion/src/lib.rs @@ -83,6 +83,8 @@ pub mod message; pub mod path; pub mod supervisor; +pub mod parallel; + /// /// Prelude of Bastion pub mod prelude { diff --git a/bastion/src/parallel/callbacks.rs b/bastion/src/parallel/callbacks.rs new file mode 100644 index 00000000..b3a8687b --- /dev/null +++ b/bastion/src/parallel/callbacks.rs @@ -0,0 +1,27 @@ +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; +use std::boxed::Box; +use std::io; + +pub type CallbackFunc = dyn FnMut() -> io::Result<()> + Send + Sync + 'static; +pub type SafeCallbackFunc = Arc>>; + +#[derive(Default)] +pub struct ProcessCallbacks { + pub before_start: Option, + pub before_restart: Option, + pub after_restart: Option, + pub after_stop: Option, +} + +impl Debug for ProcessCallbacks { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + fmt.debug_struct("ProcessCallbacks") + .field("before_start", &self.before_start.is_some()) + .field("before_restart", &self.before_start.is_some()) + .field("after_restart", &self.before_start.is_some()) + .field("after_stop", &self.before_start.is_some()) + .finish() + } +} + diff --git a/bastion/src/parallel/mod.rs b/bastion/src/parallel/mod.rs index 01eafd2e..0e6abdf4 100644 --- a/bastion/src/parallel/mod.rs +++ b/bastion/src/parallel/mod.rs @@ -1 +1,3 @@ +pub mod callbacks; pub mod ops; +pub mod process; diff --git a/bastion/src/parallel/process.rs b/bastion/src/parallel/process.rs index 55e29145..3a5bf7f7 100644 --- a/bastion/src/parallel/process.rs +++ b/bastion/src/parallel/process.rs @@ -1,11 +1,24 @@ +use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::ffi::OsStr; use std::ffi::OsString; use std::process::Stdio; -use crate::callbacks::Callbacks; +use std::io; +use super::callbacks::*; +#[derive(Debug)] pub struct ProcessData { - callbacks: Callbacks + pub(crate) callbacks: ProcessCallbacks, + pub(crate) envs: HashMap, +} + +impl Default for ProcessData { + fn default() -> Self { + Self { + callbacks: ProcessCallbacks::default(), + envs: std::env::vars_os().collect(), + } + } } @@ -14,7 +27,7 @@ pub struct Builder { pub(crate) stdin: Option, pub(crate) stdout: Option, pub(crate) stderr: Option, - pub(crate) envs: HashMap, + pub(crate) data: ProcessData, } impl Builder { @@ -23,7 +36,7 @@ impl Builder { stdin: None, stdout: None, stderr: None, - envs: std::env::vars_os().collect(), + data: ProcessData::default() } } @@ -33,7 +46,7 @@ impl Builder { K: AsRef, V: AsRef, { - self.envs + self.data.envs .insert(key.as_ref().to_owned(), val.as_ref().to_owned()); self } @@ -45,40 +58,56 @@ impl Builder { K: AsRef, V: AsRef, { - self.envs.extend( + self.data.envs.extend( vars.into_iter() .map(|(k, v)| (k.as_ref().to_owned(), v.as_ref().to_owned())), ); self } + /// /// Removes an environment variable in the spawned process. Equivalent to `Command::env_remove` pub fn env_remove>(&mut self, key: K) -> &mut Self { - self.envs.remove(key.as_ref()); + self.data.envs.remove(key.as_ref()); self } + /// /// Clears all environment variables in the spawned process. Equivalent to `Command::env_clear` pub fn env_clear(&mut self) -> &mut Self { - self.envs.clear(); + self.data.envs.clear(); self } + /// /// Captures the `stdin` of the spawned process, allowing you to manually send data via `JoinHandle::stdin` pub fn stdin>(&mut self, cfg: T) -> &mut Self { self.stdin = Some(cfg.into()); self } + /// /// Captures the `stdout` of the spawned process, allowing you to manually receive data via `JoinHandle::stdout` pub fn stdout>(&mut self, cfg: T) -> &mut Self { self.stdout = Some(cfg.into()); self } + /// /// Captures the `stderr` of the spawned process, allowing you to manually receive data via `JoinHandle::stderr` pub fn stderr>(&mut self, cfg: T) -> &mut Self { self.stderr = Some(cfg.into()); self } + + /// + /// Before start + #[cfg(unix)] + pub fn before_start(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.before_start = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } } From e0ec34b27cb889677b70319bdeedbe84ce44b188 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 1 Mar 2020 21:45:30 +0100 Subject: [PATCH 3/3] Get process data --- bastion/src/parallel/process.rs | 41 ++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/bastion/src/parallel/process.rs b/bastion/src/parallel/process.rs index 3a5bf7f7..faa66bcb 100644 --- a/bastion/src/parallel/process.rs +++ b/bastion/src/parallel/process.rs @@ -40,6 +40,12 @@ impl Builder { } } + /// Process data which has given to spawned Process + pub(crate) fn data(&mut self, proc_data: ProcessData) -> &mut Self { + self.data = proc_data; + self + } + /// Set an environment variable in the spawned process. Equivalent to `Command::env` pub fn env(&mut self, key: K, val: V) -> &mut Self where @@ -101,7 +107,7 @@ impl Builder { } /// - /// Before start + /// Process before start #[cfg(unix)] pub fn before_start(&mut self, f: F) -> &mut Self where @@ -110,4 +116,37 @@ impl Builder { self.data.callbacks.before_start = Some(Arc::new(Mutex::new(Box::new(f)))); self } + + /// + /// Process before restart + #[cfg(unix)] + pub fn before_restart(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.before_restart = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } + + /// + /// Process after restart + #[cfg(unix)] + pub fn after_restart(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.after_restart = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } + + /// + /// Process after stop + #[cfg(unix)] + pub fn after_stop(&mut self, f: F) -> &mut Self + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static + { + self.data.callbacks.after_stop = Some(Arc::new(Mutex::new(Box::new(f)))); + self + } }