Skip to content

Commit

Permalink
Select between task and shutdown signal
Browse files Browse the repository at this point in the history
  • Loading branch information
max-lt committed Feb 25, 2024
1 parent 38b1006 commit 704ecb8
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 46 deletions.
4 changes: 3 additions & 1 deletion examples/scheduled.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ async function handleSchedule(scheduledDate) {
new Date(scheduledDate).toISOString()
);

const res = await fetch("https://example.workers.rocks/data.json");
const res = await fetch("https://echo.workers.rocks/data.json");

console.log("Done waiting!", res.status, await res.json());

return "Called deploy hook!";
}

setTimeout(() => console.log("Hello from timeout"), 2000);
24 changes: 13 additions & 11 deletions examples/scheduled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,19 @@ async fn main() -> Result<(), ()> {

debug!("js worker for {:?} started", file_path);

// wait for completion signal
match done_rx.await {
Ok(()) => debug!("js task for {file_path} completed"),
Err(err) => error!("js task for {file_path} did not complete: {err}"),
}

// wait for shutdown signal
match shutdown_rx.await {
Ok(None) => debug!("js worker for {file_path} stopped"),
Ok(Some(err)) => error!("js worker for {file_path} error: {err}"),
Err(err) => error!("js worker for {file_path} error: {err}"),
tokio::select! {
_ = tokio::signal::ctrl_c() => debug!("ctrl-c received"),
// wait for completion signal
done = done_rx => match done {
Ok(()) => debug!("js task for {file_path} completed"),
Err(err) => error!("js task for {file_path} did not complete: {err}"),
},
// wait for shutdown signal
end = shutdown_rx => match end {
Ok(None) => error!("js worker for {file_path} stopped before replying"),
Ok(Some(err)) => error!("js worker for {file_path} error: {err}"),
Err(err) => error!("js worker for {file_path} error: {err}"),
}
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions examples/serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ async function handleRequest(request) {

return new Response("Hello world");
}

setTimeout(() => console.log("Hello from timeout"), 500);
70 changes: 39 additions & 31 deletions examples/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,54 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest) -> HttpResponse
.body(Default::default())
.unwrap();

std::thread::spawn(move || Worker::new(url, shutdown_tx).exec(Task::Fetch(Some(FetchInit::new(req, response_tx)))));
let handle = std::thread::spawn(move || Worker::new(url, shutdown_tx).exec(Task::Fetch(Some(FetchInit::new(req, response_tx)))));

let url = url_clone.clone();

debug!("js worker for {:?} started", url);

// wait for shutdown signal
match shutdown_rx.await {
Ok(None) => debug!("js worker for {:?} stopped", url.path()),
Ok(Some(err)) => {
error!("js worker for {:?} error: {}", url.path(), err);
return HttpResponse::InternalServerError().body(err.to_string());
}
Err(err) => {
error!("js worker for {:?} error: {}", url.path(), err);
return HttpResponse::InternalServerError().body(err.to_string());
let response = tokio::select! {
res = response_rx => {
match res {
Ok(res) => {
let mut rb = HttpResponse::build(res.status());

for (k, v) in res.headers() {
rb.append_header((k, v));
}

rb.body(res.body().clone())
}
Err(err) => {
error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err);
HttpResponse::InternalServerError().body(err.to_string())
}
}
},
end = shutdown_rx => {
match end {
Ok(None) => {
error!("js worker for {} exited before replying", url.path());
HttpResponse::InternalServerError().body("js worker exited")
},
Ok(Some(err)) => {
error!("js worker for {} error: {err}", url.path(), );
HttpResponse::InternalServerError().body(err.to_string())
}
Err(err) => {
error!("js worker for {} error: {err}", url.path());
HttpResponse::InternalServerError().body(err.to_string())
}
}
}
}
};

match response_rx.await {
Ok(res) => {
debug!(
"worker fetch replied {} {:?}",
res.status(),
start.elapsed()
);
debug!("handle_request done {}", start.elapsed().as_millis());

let mut rb = HttpResponse::build(res.status());
// Wait for the worker to finish; we should terminate the worker if it's still running after a reply
handle.join().unwrap();

for (k, v) in res.headers() {
rb.append_header((k, v));
}

rb.body(res.body().clone())
}
Err(err) => {
error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err);
HttpResponse::InternalServerError().body(err.to_string())
}
}
response
}

fn get_path() -> String {
Expand Down
6 changes: 3 additions & 3 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,17 @@ impl Worker {

let local = tokio::task::LocalSet::new();
match local.block_on(&runtime, future) {
Ok(_) => {
Ok(()) => {
log::debug!("worker thread finished");
self.shutdown_tx
.send(None)
.expect("failed to send shutdown signal");
.unwrap_or_else(|err| log::warn!("failed to send shutdown signal (ok) {:?}", err));
}
Err(err) => {
log::error!("worker thread failed {:?}", err);
self.shutdown_tx
.send(Some(err))
.expect("failed to send shutdown signal");
.unwrap_or_else(|err| log::warn!("failed to send shutdown signal ({:?})", err));
}
}
}
Expand Down

0 comments on commit 704ecb8

Please sign in to comment.