Skip to content
This repository has been archived by the owner on Oct 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #435 from target/thread.rs-correctly-unwind-call-s…
Browse files Browse the repository at this point in the history
…tacks

Thread.rs correctly unwind call stacks
  • Loading branch information
Profpatsch authored Jun 29, 2020
2 parents 8ea8972 + 5fbfa23 commit 93d9301
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 138 deletions.
210 changes: 126 additions & 84 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::build_loop::{BuildLoop, Event};
use crate::nix::options::NixOptions;
use crate::ops::error::ExitError;
use crate::project::Project;
use crate::socket::SocketPath;
use crate::NixFile;
use crossbeam_channel as chan;
Expand Down Expand Up @@ -47,13 +46,10 @@ pub struct IndicateActivity {

struct Handler {
tx: chan::Sender<()>,
_handle: std::thread::JoinHandle<()>,
}

/// Keeps all state of the running `lorri daemon` service, watches nix files and runs builds.
pub struct Daemon {
/// A thread for each `BuildLoop`, keyed by the nix files listened on.
handler_threads: HashMap<NixFile, Handler>,
/// Sending end that we pass to every `BuildLoop` the daemon controls.
// TODO: this needs to transmit information to identify the builder with
build_events_tx: chan::Sender<LoopHandlerEvent>,
Expand All @@ -72,7 +68,6 @@ impl Daemon {
let (mon_tx, mon_rx) = chan::unbounded();
(
Daemon {
handler_threads: HashMap::new(),
build_events_tx,
build_events_rx,
mon_tx,
Expand All @@ -84,102 +79,149 @@ impl Daemon {

/// Serve the daemon's RPC endpoint.
pub fn serve(
mut self,
&mut self,
socket_path: SocketPath,
gc_root_dir: PathBuf,
cas: crate::cas::ContentAddressable,
) -> Result<(), ExitError> {
let (activity_tx, activity_rx) = chan::unbounded();
let (activity_tx, activity_rx): (
chan::Sender<IndicateActivity>,
chan::Receiver<IndicateActivity>,
) = chan::unbounded();

let server = rpc::Server::new(socket_path, activity_tx, self.build_events_tx.clone())?;
let mut pool = crate::thread::Pool::new();
pool.spawn("accept-loop", move || {
server
.serve()
.expect("failed to serve daemon server endpoint");
let build_events_tx = self.build_events_tx.clone();

let server =
rpc::Server::new(socket_path.clone(), activity_tx, build_events_tx).map_err(|e| {
ExitError::temporary(format!(
"unable to bind to the server socket at {}: {:?}",
socket_path.0.display(),
e
))
})?;

pool.spawn("accept-loop", || {
server.serve().expect("varlink error");
})?;

let build_events_rx = self.build_events_rx.clone();
let mon_tx = self.mon_tx.clone();
pool.spawn("build-loop", move || {
let mut project_states: HashMap<NixFile, Event> = HashMap::new();
let mut event_listeners: Vec<chan::Sender<Event>> = Vec::new();

for msg in build_events_rx {
mon_tx
.send(msg.clone())
.expect("listener still to be there");
match &msg {
LoopHandlerEvent::BuildEvent(ev) => match ev {
Event::SectionEnd => (),
Event::Started { nix_file, .. }
| Event::Completed { nix_file, .. }
| Event::Failure { nix_file, .. } => {
project_states.insert(nix_file.clone(), ev.clone());
event_listeners.retain(|tx| {
let keep = tx.send(ev.clone()).is_ok();
debug!("Sent"; "event" => ?ev, "keep" => keep);
keep
})
}
},
LoopHandlerEvent::NewListener(tx) => {
debug!("adding listener");
let keep = project_states.values().all(|event| {
let keeping = tx.send(event.clone()).is_ok();
debug!("Sent snapshot"; "event" => ?&event, "keep" => keeping);
keeping
});
debug!("Finished snapshot"; "keep" => keep);
if keep {
event_listeners.push(tx.clone());
}
pool.spawn("build-loop", || Self::build_loop(build_events_rx, mon_tx))?;

let build_events_tx = self.build_events_tx.clone();
let extra_nix_options = self.extra_nix_options.clone();
pool.spawn("foo", || {
Self::build_instruction_handler(
build_events_tx,
extra_nix_options,
activity_rx,
gc_root_dir,
cas,
)
})?;

pool.join_all_or_panic();

Ok(())
}

fn build_loop(
build_events_rx: chan::Receiver<LoopHandlerEvent>,
mon_tx: chan::Sender<LoopHandlerEvent>,
) {
let mut project_states: HashMap<NixFile, Event> = HashMap::new();
let mut event_listeners: Vec<chan::Sender<Event>> = Vec::new();

for msg in build_events_rx {
mon_tx
.send(msg.clone())
.expect("listener still to be there");
match &msg {
LoopHandlerEvent::BuildEvent(ev) => match ev {
Event::SectionEnd => (),
Event::Started { nix_file, .. }
| Event::Completed { nix_file, .. }
| Event::Failure { nix_file, .. } => {
project_states.insert(nix_file.clone(), ev.clone());
event_listeners.retain(|tx| {
let keep = tx.send(Event::SectionEnd).is_ok();
debug!("Sent new listener sectionend"; "keep" => keep);
let keep = tx.send(ev.clone()).is_ok();
debug!("Sent"; "event" => ?ev, "keep" => keep);
keep
})
}
},
LoopHandlerEvent::NewListener(tx) => {
debug!("adding listener");
let keep = project_states.values().all(|event| {
let keeping = tx.send(event.clone()).is_ok();
debug!("Sent snapshot"; "event" => ?&event, "keep" => keeping);
keeping
});
debug!("Finished snapshot"; "keep" => keep);
if keep {
event_listeners.push(tx.clone());
}
event_listeners.retain(|tx| {
let keep = tx.send(Event::SectionEnd).is_ok();
debug!("Sent new listener sectionend"; "keep" => keep);
keep
})
}
}
})?;
pool.spawn("build-instruction-handler", move || {
// For each build instruction, add the corresponding file
// to the watch list.
for start_build in activity_rx {
let project =
crate::project::Project::new(start_build.nix_file, &gc_root_dir, cas.clone())
// TODO: the project needs to create its gc root dir
.unwrap();
self.add(project)
}
})?;
pool.join_all_or_panic();

Ok(())
}
}

/// Add nix file to the set of files this daemon watches
/// & build if they change.
pub fn add(&mut self, project: Project) {
let (tx, rx) = chan::unbounded();
let build_events_tx = self.build_events_tx.clone();
let extra_nix_options = self.extra_nix_options.clone();
fn build_instruction_handler(
// TODO: use the pool here
// pool: &mut crate::thread::Pool,
build_events_tx: chan::Sender<LoopHandlerEvent>,
extra_nix_options: NixOptions,
activity_rx: chan::Receiver<IndicateActivity>,
gc_root_dir: PathBuf,
cas: crate::cas::ContentAddressable,
) {
// A thread for each `BuildLoop`, keyed by the nix files listened on.
let mut handler_threads: HashMap<NixFile, Handler> = HashMap::new();

// For each build instruction, add the corresponding file
// to the watch list.
for start_build in activity_rx {
let project =
crate::project::Project::new(start_build.nix_file, &gc_root_dir, cas.clone())
// TODO: the project needs to create its gc root dir
.unwrap();

// Add nix file to the set of files this daemon watches
// & build if they change.
let (tx, rx) = chan::unbounded();
// cloning the tx means the daemon’s rx gets all
// messages from all builders.
let build_events_tx = build_events_tx.clone();
let extra_nix_options = extra_nix_options.clone();

handler_threads
.entry(project.nix_file.clone())
.or_insert_with(|| {
// TODO: how to use the pool here?
// We cannot just spawn new threads once messages come in,
// because then then pool objects is stuck in this loop
// and will never start to wait for joins, which means
// we don’t catch panics as they happen!
// If we can get the pool to “wait for join but also spawn new
// thread when you get a message” that could work!
// pool.spawn(format!("build_loop for {}", nix_file.display()),
let _ = std::thread::spawn(move || {
let mut build_loop = BuildLoop::new(&project, extra_nix_options);

self.handler_threads
.entry(project.nix_file.clone())
.or_insert_with(|| Handler {
tx,
_handle: std::thread::spawn(move || {
let mut build_loop = BuildLoop::new(&project, extra_nix_options);

// cloning the tx means the daemon’s rx gets all
// messages from all builders.
build_loop.forever(build_events_tx, rx);
}),
})
// Notify the handler, whether or not it was newly added
.tx
.send(())
.unwrap();
build_loop.forever(build_events_tx, rx);
});
Handler { tx }
})
// Notify the handler, whether or not it was newly added
.tx
.send(())
.unwrap();
}
}
}
6 changes: 2 additions & 4 deletions src/daemon/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use super::IndicateActivity;
use super::LoopHandlerEvent;
use crate::build_loop::Event;
use crate::error;
use crate::ops::error::ExitError;
use crate::rpc;
use crate::socket::{BindLock, SocketPath};
use crate::watch;
Expand All @@ -29,7 +28,7 @@ impl Server {
socket_path: SocketPath,
activity_tx: chan::Sender<IndicateActivity>,
build_tx: chan::Sender<LoopHandlerEvent>,
) -> Result<Server, ExitError> {
) -> Result<Server, crate::socket::BindError> {
let lock = socket_path.lock()?;
Ok(Server {
socket_path,
Expand All @@ -40,7 +39,7 @@ impl Server {
}

/// Serve the daemon endpoint.
pub fn serve(self) -> Result<(), ExitError> {
pub fn serve(self) -> Result<(), varlink::error::Error> {
let address = &self.socket_path.address();
let service = varlink::VarlinkService::new(
/* vendor */ "com.target",
Expand All @@ -59,7 +58,6 @@ impl Server {
max_worker_threads,
idle_timeout,
)
.map_err(|e| ExitError::temporary(format!("{}", e)))
}
}

Expand Down
21 changes: 15 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ pub enum NixFile {
Services(PathBuf),
}

impl From<&NixFile> for PathBuf {
fn from(p: &NixFile) -> PathBuf {
match p {
NixFile::Shell(p) => p.to_path_buf(),
NixFile::Services(p) => p.to_path_buf(),
impl NixFile {
/// Underlying `Path`.
pub fn as_path(&self) -> &Path {
match self {
Self::Shell(ref path) => path,
Self::Services(ref path) => path,
}
}

/// Display the underlying path
pub fn display(&self) -> std::path::Display {
match self {
Self::Shell(path) => path.display(),
Self::Services(path) => path.display(),
}
}
}
Expand All @@ -72,7 +81,7 @@ impl slog::Value for NixFile {
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_arguments(key, &format_args!("{}", PathBuf::from(self).display()))
serializer.emit_arguments(key, &format_args!("{}", self.as_path().display()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/nix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ mod tests {
"--expr",
"my-cool-expression",
]
.into_iter()
.iter()
.map(OsStr::new)
.collect();
assert_eq!(exp, nix.command_arguments());
Expand Down Expand Up @@ -565,7 +565,7 @@ mod tests {
"--",
"/my-cool-file.nix",
]
.into_iter()
.iter()
.map(OsStr::new)
.collect();
assert_eq!(exp2, nix2.command_arguments());
Expand Down
2 changes: 1 addition & 1 deletion src/ops/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn main(opts: crate::cli::DaemonOptions) -> OpResult {
},
};

let (daemon, build_rx) = Daemon::new(extra_nix_options);
let (mut daemon, build_rx) = Daemon::new(extra_nix_options);
let build_handle = std::thread::spawn(|| {
for msg in build_rx {
info!("build status"; "message" => ?msg);
Expand Down
4 changes: 3 additions & 1 deletion src/ops/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ pub fn main(project: Project, opts: ShellOptions) -> OpResult {
.to_str()
.expect("lorri executable path not UTF-8 clean"),
&shell,
&PathBuf::from(&project.nix_file)
project
.nix_file
.as_path()
.to_str()
.expect("Nix file path not UTF-8 clean"),
])
Expand Down
2 changes: 1 addition & 1 deletion src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Project {
) -> std::io::Result<Project> {
let hash = format!(
"{:x}",
md5::compute(PathBuf::from(&nix_file).as_os_str().as_bytes())
md5::compute(nix_file.as_path().as_os_str().as_bytes())
);
let project_gc_root = gc_root_dir.join(&hash).join("gc_root");

Expand Down
9 changes: 2 additions & 7 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};

/// Small wrapper that makes sure lorri sockets are handled correctly.
pub struct SocketPath(PathBuf);
#[derive(Clone)]
pub struct SocketPath(pub PathBuf);

/// Binding to the socket failed.
#[derive(Debug)]
Expand All @@ -17,12 +18,6 @@ pub enum BindError {
Unix(nix::Error),
}

impl From<BindError> for crate::ops::error::ExitError {
fn from(e: BindError) -> crate::ops::error::ExitError {
crate::ops::error::ExitError::temporary(format!("Bind error: {:?}", e))
}
}

impl From<std::io::Error> for BindError {
fn from(e: std::io::Error) -> BindError {
BindError::Io(e)
Expand Down
Loading

0 comments on commit 93d9301

Please sign in to comment.