Skip to content

Commit

Permalink
Add event listener for scheduled
Browse files Browse the repository at this point in the history
  • Loading branch information
max-lt committed Feb 24, 2024
1 parent ba777a7 commit 2885b5c
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 21 deletions.
21 changes: 21 additions & 0 deletions examples/scheduled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

addEventListener("scheduled", (event) => {
event.waitUntil(handleSchedule(event.scheduledTime));
});

async function handleSchedule(scheduledDate) {
console.log(
"Called scheduled event:",
scheduledDate,
new Date(scheduledDate).toISOString()
);

const res = await fetch("https://example.workers.rocks/data.json");

console.log("Done waiting!", res.status, await res.json());

return "Called deploy hook!";
}
42 changes: 42 additions & 0 deletions examples/scheduled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use log::debug;
use log::error;
use openworkers_runtime::run_js;
use openworkers_runtime::AnyError;
use openworkers_runtime::Task;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> Result<(), ()> {
if !std::env::var("RUST_LOG").is_ok() {
std::env::set_var("RUST_LOG", "debug");
}

env_logger::init();

debug!("start main");

let file_path = String::from("examples/scheduled.js");

let (shutdown_tx, shutdown_rx) = oneshot::channel::<Option<AnyError>>();

let _res = {
let file_path = file_path.clone();

std::thread::spawn(move || run_js(file_path.as_str(), Task::Scheduled, shutdown_tx))
};

debug!("js worker for {:?} started", file_path);

// wait for shutdown signal
match shutdown_rx.await {
Ok(None) => debug!("js worker for {:?} stopped", file_path),
Ok(Some(err)) => {
error!("js worker for {:?} error: {}", file_path, err);
}
Err(err) => {
error!("js worker for {:?} error: {}", file_path, err);
}
}

Ok(())
}
File renamed without changes.
7 changes: 4 additions & 3 deletions examples/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use log::error;
use openworkers_runtime::run_js;
use openworkers_runtime::AnyError;
use openworkers_runtime::FetchInit;
use openworkers_runtime::Task;

use tokio::sync::oneshot;

Expand Down Expand Up @@ -32,15 +33,15 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest) -> HttpResponse
let _res = {
let file_path = file_path.clone();

let evt = Some(FetchInit {
let task = Task::Fetch(FetchInit {
req: http_v02::Request::builder()
.uri(req.uri())
.body(Default::default())
.unwrap(),
res_tx: Some(response_tx),
});

std::thread::spawn(move || run_js(file_path.as_str(), evt, shutdown_tx))
std::thread::spawn(move || run_js(file_path.as_str(), task, shutdown_tx))
};

debug!("js worker for {:?} started", file_path);
Expand Down Expand Up @@ -84,7 +85,7 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest) -> HttpResponse
fn get_path() -> String {
std::env::args()
.nth(1)
.unwrap_or_else(|| String::from("examples/hello.js"))
.unwrap_or_else(|| String::from("examples/serve.js"))
}

#[actix_web::main]
Expand Down
22 changes: 21 additions & 1 deletion src/ext/runtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js";
}

let fetchEventListener;
let scheduledEventListener;

function addEventListener(type, listener) {
if (typeof type !== "string") {
Expand All @@ -156,11 +157,29 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js";
case "fetch":
fetchEventListener = listener;
break;
case "scheduled":
scheduledEventListener = listener;
break;
default:
throw new Error(`Unsupported event type: ${type}`);
}
}

function triggerScheduledEvent(scheduledTime) {
if (!scheduledEventListener) {
throw new Error("No scheduled event listener registered");
}

scheduledEventListener({
scheduledTime,
waitUntil: () => {
// TODO: it works for now because we are waiting for
// the worker to terminate but we should return a promise
// and look for this promise to be resolved
},
});
}

function triggerFetchEvent() {
if (!fetchEventListener) {
throw new Error("No fetch event listener registered");
Expand Down Expand Up @@ -341,7 +360,8 @@ import * as eventSource from "ext:deno_fetch/27_eventsource.js";
// deno_fetch - 27 - eventsource
EventSource: nonEnumerable(eventSource.EventSource),

// fetch event
// Events
triggerScheduledEvent: nonEnumerable(triggerScheduledEvent),
triggerFetchEvent: nonEnumerable(triggerFetchEvent),
addEventListener: nonEnumerable(addEventListener),

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod ext;

mod runtime;
mod task;

pub use runtime::run_js;
pub use ext::FetchInit;
pub use deno_core::error::AnyError;
pub use task::Task;
pub use task::TaskType;
37 changes: 21 additions & 16 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::ext::fetch_init_ext;
use crate::ext::runtime_ext;

use crate::ext::permissions_ext;
use crate::ext::FetchInit;
use crate::ext::Permissions;
use crate::task::TaskType;
use crate::Task;

use std::rc::Rc;

Expand All @@ -13,11 +14,7 @@ use tokio::sync::oneshot;

use log::{debug, error};

pub fn run_js(
path_str: &str,
evt: Option<FetchInit>,
shutdown_tx: oneshot::Sender<Option<AnyError>>,
) {
pub fn run_js(path_str: &str, task: Task, shutdown_tx: oneshot::Sender<Option<AnyError>>) {
let current_dir = std::env::current_dir().unwrap();
let current_dir = current_dir.as_path();
let main_module = deno_core::resolve_path(path_str, current_dir).unwrap();
Expand Down Expand Up @@ -62,17 +59,22 @@ pub fn run_js(
.unwrap();
}

// Set fetch request
let task_type = task.task_type();

{
debug!("set fetch request");
match task {
Task::Fetch(evt) => {
debug!("set fetch request");

let op_state_rc = js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
let op_state_rc = js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();

if let Some(evt) = evt {
op_state.put(evt);
op_state.put(evt);
}
Task::Scheduled => {}
Task::None => {}
}
};
}

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -83,12 +85,15 @@ pub fn run_js(
let mod_id = js_runtime.load_main_module(&main_module, None).await?;
let result = js_runtime.mod_evaluate(mod_id);

{
// Trigger fetch event
if !task_type.is_none() {
js_runtime
.execute_script(
deno_core::located_script_name!(),
deno_core::ModuleCodeString::from(format!("globalThis.triggerFetchEvent()")),
deno_core::ModuleCodeString::from(match task_type {
TaskType::Fetch => format!("globalThis.triggerFetchEvent()"),
TaskType::Scheduled => format!("globalThis.triggerScheduledEvent(Date.now())"),
TaskType::None => unreachable!(),
}),
)
.unwrap();
}
Expand Down
32 changes: 32 additions & 0 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::FetchInit;

pub enum TaskType {
Fetch,
Scheduled,
None,
}

impl TaskType {
pub fn is_none(&self) -> bool {
match self {
TaskType::None => true,
_ => false,
}
}
}

pub enum Task {
Fetch(FetchInit),
Scheduled,
None,
}

impl Task {
pub fn task_type(&self) -> TaskType {
match self {
Task::Fetch(_) => TaskType::Fetch,
Task::Scheduled => TaskType::Scheduled,
Task::None => TaskType::None,
}
}
}

0 comments on commit 2885b5c

Please sign in to comment.