Skip to content

Commit

Permalink
fix exec hanging for end of output
Browse files Browse the repository at this point in the history
  • Loading branch information
kzdnk committed Jan 14, 2025
1 parent f0242f8 commit 5b34795
Showing 1 changed file with 89 additions and 7 deletions.
96 changes: 89 additions & 7 deletions tools/libcrowtty/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,7 @@ impl Crowtty {
// # connect to port N - stdio
// stty -icanon -echo && ncat 127.0.0.1 $PORT
// ```
for i in [
WellKnown::Loopback.into(),
WellKnown::HelloWorld.into(),
WellKnown::ForthShell0.into(),
]
.into_iter()
{
for i in [WellKnown::Loopback.into(), WellKnown::HelloWorld.into()].into_iter() {
let (inp_send, inp_recv) = channel();
let (out_send, out_recv) = channel();

Expand Down Expand Up @@ -222,6 +216,93 @@ impl Crowtty {
manager.workers.insert(i, handle);
}

{
let i = WellKnown::ForthShell0.into();
let (inp_send, inp_recv) = channel();
let (out_send, out_recv) = channel();

let socket =
std::net::TcpListener::bind(dbg!(format!("127.0.0.1:{}", tcp_port_base + i)))
.unwrap();

let work = TcpWorker {
out: out_recv,
inp: inp_send,
socket,
port: i,
};
let tag = tag.port(i);
let thread_hdl = spawn(move || {
let mux = " MUX".if_supports_color(Stream::Stdout, |s| s.cyan());
let dmux = "DMUX".if_supports_color(Stream::Stdout, |s| s.bright_purple());
let err = "ERR!".if_supports_color(Stream::Stdout, |err| err.red());
'incoming: for skt in work.socket.incoming() {
let mut skt = match skt {
Ok(skt) => skt,
Err(e) => {
panic!(
"{tag} CONN failed to accept host connection to port {} (:{}): {e}",
tcp_port_base + work.port,
work.port
);
}
};

println!(
"{tag} CONN host connected to port {} (:{})",
tcp_port_base + work.port,
work.port
);

skt.set_read_timeout(Some(Duration::from_millis(10))).ok();
// skt.set_nonblocking(true).ok();
// skt.set_nodelay(true).ok();

// let mut last = Instant::now();

let mut input = Vec::new();

'inner: loop {
let mut buf = [0u8; 128];
match skt.read(&mut buf) {
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(_) => {
skt.shutdown(std::net::Shutdown::Both).ok();
continue 'incoming;
}
Ok(0) => {
break 'inner;
}
Ok(n) => {
tag.if_verbose(format_args!("{mux} {n}B <- :{}", work.port));
input.extend_from_slice(&buf[..n]);
}
}
}

work.inp.send(input).ok();

if let Ok(msg) = work.out.recv_timeout(Duration::from_secs(1)) {
match skt.write_all(&msg) {
Ok(_) => {}
Err(e) => {
println!("{tag} {dmux} {err} write error: {e}");
}
}
continue 'incoming;
} else {
continue 'incoming;
}
}
});
let handle = WorkerHandle {
out: out_send,
inp: inp_recv,
_thread_hdl: thread_hdl,
};

manager.workers.insert(i, handle);
}
// spawn tracing listener
let trace_port = WellKnown::BinaryTracing as u16;
let trace_handle = {
Expand Down Expand Up @@ -428,6 +509,7 @@ impl Exec {
eprintln!("[stderr] connected to crowtty on {:?}", stream.peer_addr());

stream.write_all(&cmd).into_diagnostic()?;
stream.shutdown(std::net::Shutdown::Write).unwrap();

let mut response = Vec::new();
stream.read_to_end(&mut response).into_diagnostic()?;
Expand Down

0 comments on commit 5b34795

Please sign in to comment.