From 2885b5c512421191e189900c7c56bb02e497e685 Mon Sep 17 00:00:00 2001 From: max-lt Date: Sat, 24 Feb 2024 17:32:32 +0100 Subject: [PATCH] Add event listener for scheduled --- examples/scheduled.js | 21 +++++++++++++++++ examples/scheduled.rs | 42 +++++++++++++++++++++++++++++++++ examples/{hello.js => serve.js} | 0 examples/serve.rs | 7 +++--- src/ext/runtime.js | 22 ++++++++++++++++- src/lib.rs | 4 +++- src/runtime.rs | 37 ++++++++++++++++------------- src/task.rs | 32 +++++++++++++++++++++++++ 8 files changed, 144 insertions(+), 21 deletions(-) create mode 100644 examples/scheduled.js create mode 100644 examples/scheduled.rs rename examples/{hello.js => serve.js} (100%) create mode 100644 src/task.rs diff --git a/examples/scheduled.js b/examples/scheduled.js new file mode 100644 index 0000000..ffafe78 --- /dev/null +++ b/examples/scheduled.js @@ -0,0 +1,21 @@ +function wait(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +addEventListener("scheduled", (event) => { + event.waitUntil(handleSchedule(event.scheduledTime)); +}); + +async function handleSchedule(scheduledDate) { + console.log( + "Called scheduled event:", + scheduledDate, + new Date(scheduledDate).toISOString() + ); + + const res = await fetch("https://example.workers.rocks/data.json"); + + console.log("Done waiting!", res.status, await res.json()); + + return "Called deploy hook!"; +} diff --git a/examples/scheduled.rs b/examples/scheduled.rs new file mode 100644 index 0000000..35221ac --- /dev/null +++ b/examples/scheduled.rs @@ -0,0 +1,42 @@ +use log::debug; +use log::error; +use openworkers_runtime::run_js; +use openworkers_runtime::AnyError; +use openworkers_runtime::Task; +use tokio::sync::oneshot; + +#[tokio::main] +async fn main() -> Result<(), ()> { + if !std::env::var("RUST_LOG").is_ok() { + std::env::set_var("RUST_LOG", "debug"); + } + + env_logger::init(); + + debug!("start main"); + + let file_path = String::from("examples/scheduled.js"); + + let (shutdown_tx, shutdown_rx) = oneshot::channel::>(); + + let _res = { + let file_path = file_path.clone(); + + std::thread::spawn(move || run_js(file_path.as_str(), Task::Scheduled, shutdown_tx)) + }; + + debug!("js worker for {:?} started", file_path); + + // wait for shutdown signal + match shutdown_rx.await { + Ok(None) => debug!("js worker for {:?} stopped", file_path), + Ok(Some(err)) => { + error!("js worker for {:?} error: {}", file_path, err); + } + Err(err) => { + error!("js worker for {:?} error: {}", file_path, err); + } + } + + Ok(()) +} diff --git a/examples/hello.js b/examples/serve.js similarity index 100% rename from examples/hello.js rename to examples/serve.js diff --git a/examples/serve.rs b/examples/serve.rs index 15975d5..57efd15 100644 --- a/examples/serve.rs +++ b/examples/serve.rs @@ -5,6 +5,7 @@ use log::error; use openworkers_runtime::run_js; use openworkers_runtime::AnyError; use openworkers_runtime::FetchInit; +use openworkers_runtime::Task; use tokio::sync::oneshot; @@ -32,7 +33,7 @@ async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse let _res = { let file_path = file_path.clone(); - let evt = Some(FetchInit { + let task = Task::Fetch(FetchInit { req: http_v02::Request::builder() .uri(req.uri()) .body(Default::default()) @@ -40,7 +41,7 @@ async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse res_tx: Some(response_tx), }); - std::thread::spawn(move || run_js(file_path.as_str(), evt, shutdown_tx)) + std::thread::spawn(move || run_js(file_path.as_str(), task, shutdown_tx)) }; debug!("js worker for {:?} started", file_path); @@ -84,7 +85,7 @@ async fn handle_request(data: Data, req: HttpRequest) -> HttpResponse fn get_path() -> String { std::env::args() .nth(1) - .unwrap_or_else(|| String::from("examples/hello.js")) + .unwrap_or_else(|| String::from("examples/serve.js")) } #[actix_web::main] diff --git a/src/ext/runtime.js b/src/ext/runtime.js index 4168af0..6419afa 100644 --- a/src/ext/runtime.js +++ b/src/ext/runtime.js @@ -142,6 +142,7 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js"; } let fetchEventListener; + let scheduledEventListener; function addEventListener(type, listener) { if (typeof type !== "string") { @@ -156,11 +157,29 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js"; case "fetch": fetchEventListener = listener; break; + case "scheduled": + scheduledEventListener = listener; + break; default: throw new Error(`Unsupported event type: ${type}`); } } + function triggerScheduledEvent(scheduledTime) { + if (!scheduledEventListener) { + throw new Error("No scheduled event listener registered"); + } + + scheduledEventListener({ + scheduledTime, + waitUntil: () => { + // TODO: it works for now because we are waiting for + // the worker to terminate but we should return a promise + // and look for this promise to be resolved + }, + }); + } + function triggerFetchEvent() { if (!fetchEventListener) { throw new Error("No fetch event listener registered"); @@ -341,7 +360,8 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js"; // deno_fetch - 27 - eventsource EventSource: nonEnumerable(eventSource.EventSource), - // fetch event + // Events + triggerScheduledEvent: nonEnumerable(triggerScheduledEvent), triggerFetchEvent: nonEnumerable(triggerFetchEvent), addEventListener: nonEnumerable(addEventListener), diff --git a/src/lib.rs b/src/lib.rs index 880f5aa..99964c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ mod ext; - mod runtime; +mod task; pub use runtime::run_js; pub use ext::FetchInit; pub use deno_core::error::AnyError; +pub use task::Task; +pub use task::TaskType; diff --git a/src/runtime.rs b/src/runtime.rs index 648838a..d11b257 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -2,8 +2,9 @@ use crate::ext::fetch_init_ext; use crate::ext::runtime_ext; use crate::ext::permissions_ext; -use crate::ext::FetchInit; use crate::ext::Permissions; +use crate::task::TaskType; +use crate::Task; use std::rc::Rc; @@ -13,11 +14,7 @@ use tokio::sync::oneshot; use log::{debug, error}; -pub fn run_js( - path_str: &str, - evt: Option, - shutdown_tx: oneshot::Sender>, -) { +pub fn run_js(path_str: &str, task: Task, shutdown_tx: oneshot::Sender>) { let current_dir = std::env::current_dir().unwrap(); let current_dir = current_dir.as_path(); let main_module = deno_core::resolve_path(path_str, current_dir).unwrap(); @@ -62,17 +59,22 @@ pub fn run_js( .unwrap(); } - // Set fetch request + let task_type = task.task_type(); + { - debug!("set fetch request"); + match task { + Task::Fetch(evt) => { + debug!("set fetch request"); - let op_state_rc = js_runtime.op_state(); - let mut op_state = op_state_rc.borrow_mut(); + let op_state_rc = js_runtime.op_state(); + let mut op_state = op_state_rc.borrow_mut(); - if let Some(evt) = evt { - op_state.put(evt); + op_state.put(evt); + } + Task::Scheduled => {} + Task::None => {} } - }; + } let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -83,12 +85,15 @@ pub fn run_js( let mod_id = js_runtime.load_main_module(&main_module, None).await?; let result = js_runtime.mod_evaluate(mod_id); - { - // Trigger fetch event + if !task_type.is_none() { js_runtime .execute_script( deno_core::located_script_name!(), - deno_core::ModuleCodeString::from(format!("globalThis.triggerFetchEvent()")), + deno_core::ModuleCodeString::from(match task_type { + TaskType::Fetch => format!("globalThis.triggerFetchEvent()"), + TaskType::Scheduled => format!("globalThis.triggerScheduledEvent(Date.now())"), + TaskType::None => unreachable!(), + }), ) .unwrap(); } diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..32b3c59 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,32 @@ +use crate::FetchInit; + +pub enum TaskType { + Fetch, + Scheduled, + None, +} + +impl TaskType { + pub fn is_none(&self) -> bool { + match self { + TaskType::None => true, + _ => false, + } + } +} + +pub enum Task { + Fetch(FetchInit), + Scheduled, + None, +} + +impl Task { + pub fn task_type(&self) -> TaskType { + match self { + Task::Fetch(_) => TaskType::Fetch, + Task::Scheduled => TaskType::Scheduled, + Task::None => TaskType::None, + } + } +}