From ea7ff18edcc52993e9e35ee29dbccd46fe48c6d8 Mon Sep 17 00:00:00 2001 From: Toon Willems Date: Sat, 15 Mar 2025 18:50:50 +0100 Subject: [PATCH] feat(EmailWorkers): Implement email worker functionality --- worker-build/src/js/shim.js | 61 +++++++++++++------------ worker-build/src/main.rs | 1 + worker-macros/src/event.rs | 41 ++++++++++++++++- worker-sys/src/types.rs | 2 + worker-sys/src/types/email.rs | 48 ++++++++++++++++++++ worker/src/email.rs | 85 +++++++++++++++++++++++++++++++++++ worker/src/lib.rs | 2 + 7 files changed, 211 insertions(+), 29 deletions(-) create mode 100644 worker-sys/src/types/email.rs create mode 100644 worker/src/email.rs diff --git a/worker-build/src/js/shim.js b/worker-build/src/js/shim.js index ed4c9fd1e..3013c4b1b 100644 --- a/worker-build/src/js/shim.js +++ b/worker-build/src/js/shim.js @@ -4,7 +4,7 @@ import wasmModule from "./index.wasm"; import { WorkerEntrypoint } from "cloudflare:workers"; const instance = new WebAssembly.Instance(wasmModule, { - "./index_bg.js": imports, + "./index_bg.js": imports, }); imports.__wbg_set_wasm(instance.exports); @@ -15,39 +15,44 @@ instance.exports.__wbindgen_start?.(); export { wasmModule }; class Entrypoint extends WorkerEntrypoint { - async fetch(request) { - let response = imports.fetch(request, this.env, this.ctx); - $WAIT_UNTIL_RESPONSE; - return await response; - } - - async queue(batch) { - return await imports.queue(batch, this.env, this.ctx); - } - - async scheduled(event) { - return await imports.scheduled(event, this.env, this.ctx); - } + async fetch(request) { + let response = imports.fetch(request, this.env, this.ctx); + $WAIT_UNTIL_RESPONSE; + return await response; + } + + async queue(batch) { + return await imports.queue(batch, this.env, this.ctx); + } + + async scheduled(event) { + return await imports.scheduled(event, this.env, this.ctx); + } + + async email(message) { + return await imports.email(message, this.env, this.ctx); + } } const EXCLUDE_EXPORT = [ - "IntoUnderlyingByteSource", - "IntoUnderlyingSink", - "IntoUnderlyingSource", - "MinifyConfig", - "PolishConfig", - "R2Range", - "RequestRedirect", - "fetch", - "queue", - "scheduled", - "getMemory", + "IntoUnderlyingByteSource", + "IntoUnderlyingSink", + "IntoUnderlyingSource", + "MinifyConfig", + "PolishConfig", + "R2Range", + "RequestRedirect", + "fetch", + "queue", + "scheduled", + "email", + "getMemory", ]; Object.keys(imports).map((k) => { - if (!(EXCLUDE_EXPORT.includes(k) | k.startsWith("__"))) { - Entrypoint.prototype[k] = imports[k]; - } + if (!(EXCLUDE_EXPORT.includes(k) | k.startsWith("__"))) { + Entrypoint.prototype[k] = imports[k]; + } }); export default Entrypoint; diff --git a/worker-build/src/main.rs b/worker-build/src/main.rs index f96f72622..4188d893f 100644 --- a/worker-build/src/main.rs +++ b/worker-build/src/main.rs @@ -220,6 +220,7 @@ fn bundle(esbuild_path: &Path) -> Result<()> { let mut command = Command::new(esbuild_path); command.args([ "--external:./index.wasm", + "--external:cloudflare:email", "--external:cloudflare:sockets", "--external:cloudflare:workers", "--format=esm", diff --git a/worker-macros/src/event.rs b/worker-macros/src/event.rs index 07d7f22c5..c513300d1 100644 --- a/worker-macros/src/event.rs +++ b/worker-macros/src/event.rs @@ -12,6 +12,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { Start, #[cfg(feature = "queue")] Queue, + Email, } use HandlerType::*; @@ -25,6 +26,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { "start" => handler_type = Some(Start), #[cfg(feature = "queue")] "queue" => handler_type = Some(Queue), + "email" => handler_type = Some(Email), "respond_with_errors" => { respond_with_errors = true; } @@ -32,7 +34,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { } } let handler_type = handler_type.expect( - "must have either 'fetch', 'scheduled', 'queue' or 'start' attribute, e.g. #[event(fetch)]", + "must have either 'fetch', 'scheduled', 'queue', 'email', or 'start' attribute, e.g. #[event(fetch)]", ); // create new var using syn item of the attributed fn @@ -215,5 +217,42 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { TokenStream::from(output) } + Email => { + let input_fn_ident = Ident::new( + &(input_fn.sig.ident.to_string() + "_email_glue"), + input_fn.sig.ident.span(), + ); + let wrapper_fn_ident = Ident::new("email", input_fn.sig.ident.span()); + // rename the original attributed fn + input_fn.sig.ident = input_fn_ident.clone(); + + let wrapper_fn = quote! { + pub async fn #wrapper_fn_ident(message: ::worker::worker_sys::EmailMessage, env: ::worker::Env, ctx: ::worker::worker_sys::Context) { + // call the original fn + let ctx = worker::Context::new(ctx); + match #input_fn_ident(::worker::EmailMessage { inner: message }, env, ctx).await { + Ok(()) => {}, + Err(e) => { + ::worker::console_log!("{}", &e); + panic!("{}", e); + } + } + } + }; + let wasm_bindgen_code = + wasm_bindgen_macro_support::expand(TokenStream::new().into(), wrapper_fn) + .expect("wasm_bindgen macro failed to expand"); + + let output = quote! { + #input_fn + + mod _worker_email { + use ::worker::{wasm_bindgen, wasm_bindgen_futures}; + use super::#input_fn_ident; + #wasm_bindgen_code + } + }; + TokenStream::from(output) + } } } diff --git a/worker-sys/src/types.rs b/worker-sys/src/types.rs index 76630bbbe..69752c49b 100644 --- a/worker-sys/src/types.rs +++ b/worker-sys/src/types.rs @@ -6,6 +6,7 @@ mod crypto; mod d1; mod durable_object; mod dynamic_dispatcher; +mod email; mod fetcher; mod fixed_length_stream; mod hyperdrive; @@ -28,6 +29,7 @@ pub use crypto::*; pub use d1::*; pub use durable_object::*; pub use dynamic_dispatcher::*; +pub use email::*; pub use fetcher::*; pub use fixed_length_stream::*; pub use hyperdrive::*; diff --git a/worker-sys/src/types/email.rs b/worker-sys/src/types/email.rs new file mode 100644 index 000000000..28dc7b738 --- /dev/null +++ b/worker-sys/src/types/email.rs @@ -0,0 +1,48 @@ +use wasm_bindgen::prelude::*; +use web_sys::ReadableStream; + +#[wasm_bindgen(module = "cloudflare:email")] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Debug, Clone, PartialEq, Eq)] + pub type EmailMessage; + + #[wasm_bindgen(constructor, catch)] + pub fn new(from: &str, to: &str, raw: &str) -> Result; + + #[wasm_bindgen(constructor, catch)] + pub fn new_from_stream( + from: &str, + to: &str, + raw: &ReadableStream, + ) -> Result; + + #[wasm_bindgen(method, catch, getter)] + pub fn from(this: &EmailMessage) -> Result; + + #[wasm_bindgen(method, catch, getter)] + pub fn to(this: &EmailMessage) -> Result; + + #[wasm_bindgen(method, catch, getter)] + pub fn headers(this: &EmailMessage) -> Result; + + #[wasm_bindgen(method, catch, getter)] + pub fn raw(this: &EmailMessage) -> Result; + + #[wasm_bindgen(method, catch, getter, js_name=rawSize)] + pub fn raw_size(this: &EmailMessage) -> Result; + + #[wasm_bindgen(method, catch, js_name=setReject)] + pub fn set_reject(this: &EmailMessage, reason: js_sys::JsString) -> Result<(), JsValue>; + + #[wasm_bindgen(method, catch)] + pub fn forward( + this: &EmailMessage, + recipient: js_sys::JsString, + headers: Option, + ) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn reply(this: &EmailMessage, message: EmailMessage) -> Result; + +} diff --git a/worker/src/email.rs b/worker/src/email.rs new file mode 100644 index 000000000..08a586429 --- /dev/null +++ b/worker/src/email.rs @@ -0,0 +1,85 @@ +use futures_util::TryStreamExt; +use wasm_bindgen_futures::JsFuture; +use web_sys::ReadableStream; +use worker_sys::EmailMessage as EmailMessageSys; + +use crate::{send::SendFuture, ByteStream, Headers, Result}; + +pub struct EmailMessage { + pub inner: EmailMessageSys, +} + +impl EmailMessage { + /// construct a new email message + pub fn new(from: &str, to: &str, raw: &str) -> Result { + Ok(EmailMessage { + inner: EmailMessageSys::new(from, to, raw)?, + }) + } + + /// construct a new email message for a ReadableStream + pub fn new_from_stream(from: &str, to: &str, raw: &ReadableStream) -> Result { + Ok(EmailMessage { + inner: EmailMessageSys::new_from_stream(from, to, raw)?, + }) + } + + /// the from field of the email message + pub fn from_email(&self) -> String { + self.inner.from().unwrap().into() + } + + /// the to field of the email message + pub fn to_email(&self) -> String { + self.inner.to().unwrap().into() + } + + /// the headers field of the email message + pub fn headers(&self) -> Headers { + Headers(self.inner.headers().unwrap()) + } + + /// the raw email message + pub fn raw(&self) -> Result { + self.inner.raw().map_err(Into::into).map(|rs| ByteStream { + inner: wasm_streams::ReadableStream::from_raw(rs).into_stream(), + }) + } + + pub async fn raw_bytes(&self) -> Result> { + self.raw()? + .try_fold(Vec::new(), |mut bytes, mut chunk| async move { + bytes.append(&mut chunk); + Ok(bytes) + }) + .await + } + + /// the raw size of the message + pub fn raw_size(&self) -> f64 { + self.inner.raw_size().unwrap().into() + } + + /// reject message with reason + pub fn reject(&self, reason: String) { + self.inner.set_reject(reason.into()).unwrap() + } + + /// forward message to recipient + pub async fn forward(&self, recipient: String, headers: Option) -> Result<()> { + let promise = self.inner.forward(recipient.into(), headers.map(|h| h.0))?; + + let fut = SendFuture::new(JsFuture::from(promise)); + fut.await?; + Ok(()) + } + + /// reply with email message to recipient + pub async fn reply(&self, message: EmailMessage) -> Result<()> { + let promise = self.inner.reply(message.inner)?; + + let fut = SendFuture::new(JsFuture::from(promise)); + fut.await?; + Ok(()) + } +} diff --git a/worker/src/lib.rs b/worker/src/lib.rs index e9a4b767e..3571f7e31 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -174,6 +174,7 @@ pub use crate::date::{Date, DateInit}; pub use crate::delay::Delay; pub use crate::durable::*; pub use crate::dynamic_dispatch::*; +pub use crate::email::*; pub use crate::env::{Env, EnvBinding, Secret, Var}; pub use crate::error::Error; pub use crate::fetcher::Fetcher; @@ -212,6 +213,7 @@ mod date; mod delay; pub mod durable; mod dynamic_dispatch; +mod email; mod env; mod error; mod fetcher;