diff --git a/examples/scheduled.js b/examples/scheduled.js index ffafe78..86e624d 100644 --- a/examples/scheduled.js +++ b/examples/scheduled.js @@ -13,9 +13,11 @@ async function handleSchedule(scheduledDate) { new Date(scheduledDate).toISOString() ); - const res = await fetch("https://example.workers.rocks/data.json"); + const res = await fetch("https://echo.workers.rocks/data.json"); console.log("Done waiting!", res.status, await res.json()); return "Called deploy hook!"; } + +setTimeout(() => console.log("Hello from timeout"), 2000); \ No newline at end of file diff --git a/examples/scheduled.rs b/examples/scheduled.rs index f354392..9c7e5d7 100644 --- a/examples/scheduled.rs +++ b/examples/scheduled.rs @@ -47,17 +47,19 @@ async fn main() -> Result<(), ()> { debug!("js worker for {:?} started", file_path); - // wait for completion signal - match done_rx.await { - Ok(()) => debug!("js task for {file_path} completed"), - Err(err) => error!("js task for {file_path} did not complete: {err}"), - } - - // wait for shutdown signal - match shutdown_rx.await { - Ok(None) => debug!("js worker for {file_path} stopped"), - Ok(Some(err)) => error!("js worker for {file_path} error: {err}"), - Err(err) => error!("js worker for {file_path} error: {err}"), + tokio::select! { + _ = tokio::signal::ctrl_c() => debug!("ctrl-c received"), + // wait for completion signal + done = done_rx => match done { + Ok(()) => debug!("js task for {file_path} completed"), + Err(err) => error!("js task for {file_path} did not complete: {err}"), + }, + // wait for shutdown signal + end = shutdown_rx => match end { + Ok(None) => error!("js worker for {file_path} stopped before replying"), + Ok(Some(err)) => error!("js worker for {file_path} error: {err}"), + Err(err) => error!("js worker for {file_path} error: {err}"), + } } Ok(()) diff --git a/examples/serve.js b/examples/serve.js index be25560..b9bcc17 100644 --- a/examples/serve.js +++ b/examples/serve.js @@ -17,3 +17,5 @@ async function handleRequest(request) { return new Response("Hello world"); } + +setTimeout(() => console.log("Hello from timeout"), 500); \ No newline at end of file diff --git a/examples/serve.rs b/examples/serve.rs index 54c44c4..7d08a5f 100644 --- a/examples/serve.rs +++ b/examples/serve.rs @@ -37,46 +37,54 @@ async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse .body(Default::default()) .unwrap(); - std::thread::spawn(move || Worker::new(url, shutdown_tx).exec(Task::Fetch(Some(FetchInit::new(req, response_tx))))); + let handle = std::thread::spawn(move || Worker::new(url, shutdown_tx).exec(Task::Fetch(Some(FetchInit::new(req, response_tx))))); let url = url_clone.clone(); debug!("js worker for {:?} started", url); - // wait for shutdown signal - match shutdown_rx.await { - Ok(None) => debug!("js worker for {:?} stopped", url.path()), - Ok(Some(err)) => { - error!("js worker for {:?} error: {}", url.path(), err); - return HttpResponse::InternalServerError().body(err.to_string()); - } - Err(err) => { - error!("js worker for {:?} error: {}", url.path(), err); - return HttpResponse::InternalServerError().body(err.to_string()); + let response = tokio::select! { + res = response_rx => { + match res { + Ok(res) => { + let mut rb = HttpResponse::build(res.status()); + + for (k, v) in res.headers() { + rb.append_header((k, v)); + } + + rb.body(res.body().clone()) + } + Err(err) => { + error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err); + HttpResponse::InternalServerError().body(err.to_string()) + } + } + }, + end = shutdown_rx => { + match end { + Ok(None) => { + error!("js worker for {} exited before replying", url.path()); + HttpResponse::InternalServerError().body("js worker exited") + }, + Ok(Some(err)) => { + error!("js worker for {} error: {err}", url.path(), ); + HttpResponse::InternalServerError().body(err.to_string()) + } + Err(err) => { + error!("js worker for {} error: {err}", url.path()); + HttpResponse::InternalServerError().body(err.to_string()) + } + } } - } + }; - match response_rx.await { - Ok(res) => { - debug!( - "worker fetch replied {} {:?}", - res.status(), - start.elapsed() - ); + debug!("handle_request done {}", start.elapsed().as_millis()); - let mut rb = HttpResponse::build(res.status()); + // Wait for the worker to finish; we should terminate the worker if it's still running after a reply + handle.join().unwrap(); - for (k, v) in res.headers() { - rb.append_header((k, v)); - } - - rb.body(res.body().clone()) - } - Err(err) => { - error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err); - HttpResponse::InternalServerError().body(err.to_string()) - } - } + response } fn get_path() -> String { diff --git a/src/runtime.rs b/src/runtime.rs index 2a0500b..841566b 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -150,17 +150,17 @@ impl Worker { let local = tokio::task::LocalSet::new(); match local.block_on(&runtime, future) { - Ok(_) => { + Ok(()) => { log::debug!("worker thread finished"); self.shutdown_tx .send(None) - .expect("failed to send shutdown signal"); + .unwrap_or_else(|err| log::warn!("failed to send shutdown signal (ok) {:?}", err)); } Err(err) => { log::error!("worker thread failed {:?}", err); self.shutdown_tx .send(Some(err)) - .expect("failed to send shutdown signal"); + .unwrap_or_else(|err| log::warn!("failed to send shutdown signal ({:?})", err)); } } }