diff --git a/samples/tcp-ingress/config.capnp b/samples/tcp-ingress/config.capnp new file mode 100644 index 00000000000..418ff5931fb --- /dev/null +++ b/samples/tcp-ingress/config.capnp @@ -0,0 +1,20 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const tcpIngressExample :Workerd.Config = ( + services = [ + (name = "main", worker = .worker), + ], + + sockets = [ + ( name = "http", address = "*:8080", http = (), service = "main" ), + ( name = "tcp", address = "*:8081", tcp = (), service = "main" ) + ] +); + +const worker :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = embed "worker.js") + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + compatibilityDate = "2023-02-28", +); diff --git a/samples/tcp-ingress/worker.js b/samples/tcp-ingress/worker.js new file mode 100644 index 00000000000..32aafb0b78d --- /dev/null +++ b/samples/tcp-ingress/worker.js @@ -0,0 +1,11 @@ + +export default { + async fetch(req) { + return new Response("ok"); + }, + + async connect(socket) { + // pipe the input stream to the output + await socket.readable.pipeTo(socket.writable); + } +}; diff --git a/src/workerd/api/actor.h b/src/workerd/api/actor.h index d59e2a69c59..686dc4a34dd 100644 --- a/src/workerd/api/actor.h +++ b/src/workerd/api/actor.h @@ -114,6 +114,7 @@ class DurableObject final: public Fetcher { JSG_TS_DEFINE(interface DurableObject { fetch(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?(ws: WebSocket, message: string | ArrayBuffer): void | Promise; webSocketClose?(ws: WebSocket, code: number, reason: string, wasClean: boolean): void | Promise; @@ -121,7 +122,7 @@ class DurableObject final: public Fetcher { }); JSG_TS_OVERRIDE( type DurableObjectStub = - Fetcher + Fetcher & { readonly id: DurableObjectId; readonly name?: string; diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index 8a234cdb9a4..dbb1d5ccde2 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -86,6 +86,7 @@ jsg::LenientOptional mapAddRef(jsg::Lock& js, jsg::LenientOptional& functi ExportedHandler ExportedHandler::clone(jsg::Lock& js) { return ExportedHandler{ .fetch{mapAddRef(js, fetch)}, + .connect{mapAddRef(js, connect)}, .tail{mapAddRef(js, tail)}, .trace{mapAddRef(js, trace)}, .tailStream{mapAddRef(js, tailStream)}, @@ -118,6 +119,55 @@ void ServiceWorkerGlobalScope::clear() { unhandledRejections.clear(); } +kj::Promise ServiceWorkerGlobalScope::connect(kj::String host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + kj::HttpService::ConnectResponse& response, + Worker::Lock& lock, + kj::Maybe exportedHandler) { + ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error, + "Connect ingress is not currently supported with Service Workers syntax."); + KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(), + "connect handling requires the experimental flag."); + + KJ_IF_SOME(handler, eh.connect) { + // Has a connect handler! + response.accept(200, "OK", headers); + + // Using neuterable stream to manage lifetime of stream promises + auto ownConnection = newNeuterableIoStream(connection); + + auto& ioContext = IoContext::current(); + jsg::Lock& js = lock; + + auto input = kj::str("fake://", host); + auto url = JSG_REQUIRE_NONNULL( + jsg::Url::tryParse(input.asPtr()), TypeError, "Specified address could not be parsed."); + auto hostName = url.getHostname(); + auto port = url.getPort(); + JSG_REQUIRE(hostName != ""_kj, TypeError, "Specified address is missing hostname."); + JSG_REQUIRE(port != ""_kj, TypeError, "Specified address is missing port."); + + // TLS support is not implemented so far. + auto nullTlsStarter = kj::heap(); + // We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to + // provide a more descriptive error message for HTTP, but this is not relevant on the TCP server + // side. + jsg::Ref jsSocket = setupSocket(js, kj::mv(ownConnection), kj::mv(host), kj::none, + kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::str(hostName), false, kj::none); + // handleProxyStatus() is required to indicate that the socket was opened properly. Since the + // connection is already open at this point, exception handling is not required. + jsSocket->handleProxyStatus(js, kj::Promise>(kj::none)); + + kj::Maybe span = ioContext.makeTraceSpan("connect_handler"_kjc); + auto promise = handler(js, kj::mv(jsSocket), eh.env.addRef(js), eh.getCtx()); + return ioContext.awaitJs(js, kj::mv(promise)).attach(kj::mv(span)); + } + lock.logWarningOnce("Received a connect event but we lack a handler. " + "Did you remember to export a connect() function?"); + JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function."); +} + kj::Promise> ServiceWorkerGlobalScope::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 3f59a4f53c9..8d30c1f9f51 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -9,6 +9,7 @@ #include "http.h" #include "messagechannel.h" #include "performance.h" +#include "sockets.h" #include #ifdef WORKERD_FUZZILLI @@ -350,6 +351,10 @@ struct ExportedHandler { jsg::Optional> ctx); jsg::LenientOptional> fetch; + using ConnectHandler = jsg::Promise( + jsg::Ref socket, jsg::Value env, jsg::Optional> ctx); + jsg::LenientOptional> connect; + using TailHandler = kj::Promise(kj::Array> events, jsg::Value env, jsg::Optional> ctx); @@ -389,6 +394,7 @@ struct ExportedHandler { jsg::SelfRef self; JSG_STRUCT(fetch, + connect, tail, trace, tailStream, @@ -406,6 +412,7 @@ struct ExportedHandler { JSG_STRUCT_TS_DEFINE( type ExportedHandlerFetchHandler = (request: Request>, env: Env, ctx: ExecutionContext) => Response | Promise; + type ExportedHandlerConnectHandler = (socket: Socket, env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailHandler = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTraceHandler = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailStreamHandler = (event : TailStream.TailEvent, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise; @@ -416,6 +423,7 @@ struct ExportedHandler { JSG_STRUCT_TS_OVERRIDE( { email?: EmailExportedHandler; fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -515,6 +523,14 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module // exports and move that code somewhere more appropriate. + // Received TCP/socket ingress (called from C++, not JS). + kj::Promise connect(kj::String host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + kj::HttpService::ConnectResponse& response, + Worker::Lock& lock, + kj::Maybe exportedHandler); + // Received sendTraces (called from C++, not JS). void sendTraces(kj::ArrayPtr> traces, Worker::Lock& lock, diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index 874af6dece8..6f634ce1b5c 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -318,6 +318,12 @@ jsg::Promise Socket::close(jsg::Lock& js) { }).catch_(js, [this](jsg::Lock& js, jsg::Value err) { errorHandler(js, kj::mv(err)); }); } +jsg::Promise Socket::proxyTo(jsg::Lock& js, jsg::Ref sock) { + auto pipeA = sock->readable->pipeTo(js, writable.addRef(), {}); + auto pipeB = readable->pipeTo(js, sock->writable.addRef(), {}); + return pipeA.then(js, [pipeB = kj::mv(pipeB)](jsg::Lock& js) mutable { kj::mv(pipeB); }); +} + jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOptions) { JSG_REQUIRE( secureTransport != SecureTransportKind::ON, TypeError, "Cannot startTls on a TLS socket."); diff --git a/src/workerd/api/sockets.h b/src/workerd/api/sockets.h index 0d89d133417..cc9da13ea6a 100644 --- a/src/workerd/api/sockets.h +++ b/src/workerd/api/sockets.h @@ -126,6 +126,11 @@ class Socket: public jsg::Object { // closing. jsg::Promise close(jsg::Lock& js); + // Proxies to the other socket. Equivalent to: + // a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); + // TODO: May want to add jsg::Optional options? + jsg::Promise proxyTo(jsg::Lock& js, jsg::Ref sock); + // Flushes write buffers then performs a TLS handshake on the current Socket connection. // The current `Socket` instance is closed and its readable/writable instances are also closed. // All new operations should be performed on the new `Socket` instance. @@ -155,6 +160,7 @@ class Socket: public jsg::Object { JSG_READONLY_PROTOTYPE_PROPERTY(secureTransport, getSecureTransport); JSG_METHOD(close); JSG_METHOD(startTls); + JSG_METHOD(proxyTo); JSG_TS_OVERRIDE({ get secureTransport(): 'on' | 'off' | 'starttls'; diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 55dad299471..57701746760 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -19,6 +19,18 @@ wd_test( data = ["delete-all-deletes-alarm-test.js"], ) +wd_test( + src = "connect-handler-test.wd-test", + args = ["--experimental"], + data = [ + "connect-handler-test.js", + "connect-handler-test-proxy.js", + ], + # Test uses TCP sockets for ports 8081-8083 and may fail when running concurrently with other + # tests that do so. + tags = ["exclusive"], +) + wd_test( src = "actor-alarms-test.wd-test", args = ["--experimental"], @@ -42,7 +54,10 @@ wd_test( "tail-worker-test-invalid.js", "tail-worker-test-jsrpc.js", "websocket-hibernation.js", + "connect-handler-test.js", + "connect-handler-test-proxy.js", ], + tags = ["exclusive"], ) # Test to validate timing semantics for JSRPC streaming responses. @@ -327,6 +342,7 @@ wd_test( src = "js-rpc-test.wd-test", args = ["--experimental"], data = ["js-rpc-test.js"], + tags = ["exclusive"], ) wd_test( @@ -577,6 +593,7 @@ wd_test( "--no-verbose", ], data = ["js-rpc-test.js"], + tags = ["exclusive"], ) wd_test( diff --git a/src/workerd/api/tests/connect-handler-test-proxy.js b/src/workerd/api/tests/connect-handler-test-proxy.js new file mode 100644 index 00000000000..cfe49bfea80 --- /dev/null +++ b/src/workerd/api/tests/connect-handler-test-proxy.js @@ -0,0 +1,19 @@ +import { connect } from 'cloudflare:sockets'; +import { WorkerEntrypoint } from 'cloudflare:workers'; + +export class ConnectProxy extends WorkerEntrypoint { + async connect(socket) { + // proxy for ConnectEndpoint instance on port 8083. + let upstream = connect('localhost:8083'); + await socket.proxyTo(upstream); + } +} + +export class ConnectEndpoint extends WorkerEntrypoint { + async connect(socket) { + const enc = new TextEncoder(); + let writer = socket.writable.getWriter(); + await writer.write(enc.encode('hello-from-endpoint')); + await writer.close(); + } +} diff --git a/src/workerd/api/tests/connect-handler-test.js b/src/workerd/api/tests/connect-handler-test.js new file mode 100644 index 00000000000..aca350ec255 --- /dev/null +++ b/src/workerd/api/tests/connect-handler-test.js @@ -0,0 +1,45 @@ +import { connect } from 'cloudflare:sockets'; +import { ok, strictEqual } from 'assert'; + +export const connectHandler = { + async test() { + // Check that the connect handler can send a message through a socket + const socket = connect('localhost:8081'); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + strictEqual(result, 'hello'); + await socket.closed; + }, +}; + +export const connectHandlerProxy = { + async test() { + // Check that we can get a message proxied through a connect handler. This call connects us with + // an instance of Server, which serves as a proxy for an instance of OtherServer, as defined in + // connect-handler-test-proxy.js. + const socket = connect('localhost:8082'); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + strictEqual(result, 'hello-from-endpoint'); + await socket.closed; + }, +}; + +export default { + async connect(socket) { + const enc = new TextEncoder(); + let writer = socket.writable.getWriter(); + await writer.write(enc.encode('hello')); + await writer.close(); + }, +}; diff --git a/src/workerd/api/tests/connect-handler-test.wd-test b/src/workerd/api/tests/connect-handler-test.wd-test new file mode 100644 index 00000000000..98b517a861a --- /dev/null +++ b/src/workerd/api/tests/connect-handler-test.wd-test @@ -0,0 +1,36 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "connect-handler-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "connect-handler-test-proxy", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "connect-handler-test-endpoint", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "internet", network = ( allow = ["private"] ) ) + ], + sockets = [ + (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), + (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), + (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) + ] +); diff --git a/src/workerd/api/tests/js-rpc-socket-test.wd-test b/src/workerd/api/tests/js-rpc-socket-test.wd-test index a13544d8cf6..4c871606ed7 100644 --- a/src/workerd/api/tests/js-rpc-socket-test.wd-test +++ b/src/workerd/api/tests/js-rpc-socket-test.wd-test @@ -72,6 +72,7 @@ const unitTests :Workerd.Config = ( http = (capnpConnectHost = "cappy") ) ), + ( name = "internet", network = ( allow = ["private"] ) ), ], sockets = [ ( name = "MyService-loop", @@ -99,6 +100,8 @@ const unitTests :Workerd.Config = ( service = (name = "js-rpc-test", entrypoint = "GreeterFactory"), http = (capnpConnectHost = "cappy") ), + # For testing connect() handler + (name = "tcp", address = "*:8081", tcp = (), service = "js-rpc-test") ], v8Flags = [ "--expose-gc" ], ); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index 2f9e84ccabc..9cf38ceb622 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -194,6 +194,13 @@ export class MyService extends WorkerEntrypoint { return new Response('method = ' + req.method + ', url = ' + req.url); } + async connect(socket) { + const enc = new TextEncoder(); + let writer = socket.writable.getWriter(); + await writer.write(enc.encode('hello')); + await writer.close(); + } + // Define a property to test behavior of property accessors. get nonFunctionProperty() { return { foo: 123 }; @@ -631,6 +638,20 @@ export let extendingEntrypointClasses = { assert.equal(svc instanceof WorkerEntrypoint, true); }, }; +export let connectBinding = { + async test(controller, env, ctx) { + let socket = await env.MyService.connect('localhost:8081'); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + assert.strictEqual(result, 'hello'); + await socket.closed; + }, +}; export let namedServiceBinding = { async test(controller, env, ctx) { diff --git a/src/workerd/api/tests/tail-worker-test-receiver.js b/src/workerd/api/tests/tail-worker-test-receiver.js index 22767022f85..15e69bcf6a7 100644 --- a/src/workerd/api/tests/tail-worker-test-receiver.js +++ b/src/workerd/api/tests/tail-worker-test-receiver.js @@ -32,7 +32,7 @@ export const test = { // The shared tail worker we configured only produces onset and outcome events, so every trace is identical here. // Number of traces based on how often main tail worker is invoked from previous tests - let numTraces = 28; + let numTraces = 31; let basicTrace = '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}'; assert.deepStrictEqual( diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index c28ed4a7293..643a6746e3d 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -131,6 +131,10 @@ export const test = { // Test for transient objects - getCounter returns an object with methods // All transient calls happen in a single trace event, with only the entrypoint method reported '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + // tests/connect-handler-test.js: connect events + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandler","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"connect"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandlerProxy","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', ]; assert.deepStrictEqual(response, expected); diff --git a/src/workerd/api/tests/tail-worker-test.wd-test b/src/workerd/api/tests/tail-worker-test.wd-test index f58e8f8d999..75aed0f3abe 100644 --- a/src/workerd/api/tests/tail-worker-test.wd-test +++ b/src/workerd/api/tests/tail-worker-test.wd-test @@ -32,6 +32,32 @@ const unitTests :Workerd.Config = ( (name = "alarms", worker = .alarmsWorker), (name = "hiber", worker = .hiberWorker), (name = "js-rpc-test", worker = .jsRpcWorker), + ( name = "connect-handler-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + streamingTails = ["log"], + ) + ), + ( name = "connect-handler-test-proxy", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "connect-handler-test-endpoint", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "internet", network = ( allow = ["private"] ) ), (name = "TEST_TMPDIR", disk = (writable = true)), # Dummy buffered tail worker (gets traces from alarms worker and produces trace for main tracer) (name = "buffered", worker = .logBuffered, ), @@ -50,6 +76,11 @@ const unitTests :Workerd.Config = ( ), ) ], + sockets = [ + (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), + (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), + (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) + ] ); const alarmsWorker :Workerd.Worker = ( diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index afd4eebe016..620e01b4ebf 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -144,6 +144,9 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { return kj::Maybe(js.alloc(trace, scheduled)); } + KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { + return kj::Maybe(jsg::alloc(js, trace, connect)); + } KJ_CASE_ONEOF(alarm, tracing::AlarmEventInfo) { return kj::Maybe(js.alloc(trace, alarm)); } @@ -228,6 +231,9 @@ kj::Maybe TraceItem::getEvent(jsg::Lock& js) { KJ_CASE_ONEOF(info, jsg::Ref) { return info.addRef(); } + KJ_CASE_ONEOF(info, jsg::Ref) { + return info.addRef(); + } } KJ_UNREACHABLE; }); @@ -713,6 +719,9 @@ void TraceItem::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_CASE_ONEOF(info, jsg::Ref) { tracker.trackField("eventInfo", info); } + KJ_CASE_ONEOF(info, jsg::Ref) { + tracker.trackField("eventInfo", info); + } } } for (const auto& log: logs) { @@ -761,4 +770,7 @@ void TraceItem::HibernatableWebSocketEventInfo::visitForMemoryInfo( } } +TraceItem::ConnectEventInfo::ConnectEventInfo( + jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo) {} + } // namespace workerd::api diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index ea3742854bd..50afc156846 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -71,6 +71,7 @@ class TraceItem final: public jsg::Object { public: class FetchEventInfo; class JsRpcEventInfo; + class ConnectEventInfo; class ScheduledEventInfo; class AlarmEventInfo; class QueueEventInfo; @@ -83,6 +84,7 @@ class TraceItem final: public jsg::Object { using EventInfo = kj::OneOf, jsg::Ref, + jsg::Ref, jsg::Ref, jsg::Ref, jsg::Ref, @@ -282,6 +284,14 @@ class TraceItem::JsRpcEventInfo final: public jsg::Object { kj::String rpcMethod; }; +class TraceItem::ConnectEventInfo final: public jsg::Object { + public: + explicit ConnectEventInfo( + jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo); + + JSG_RESOURCE_TYPE(ConnectEventInfo) {} +}; + class TraceItem::ScheduledEventInfo final: public jsg::Object { public: explicit ScheduledEventInfo(const Trace& trace, const tracing::ScheduledEventInfo& eventInfo); @@ -645,12 +655,12 @@ class TraceCustomEvent final: public WorkerInterface::CustomEvent { #define EW_TRACE_ISOLATE_TYPES \ api::ScriptVersion, api::TailEvent, api::TraceItem, api::TraceItem::AlarmEventInfo, \ - api::TraceItem::CustomEventInfo, api::TraceItem::ScheduledEventInfo, \ - api::TraceItem::QueueEventInfo, api::TraceItem::EmailEventInfo, \ - api::TraceItem::TailEventInfo, api::TraceItem::TailEventInfo::TailItem, \ - api::TraceItem::FetchEventInfo, api::TraceItem::FetchEventInfo::Request, \ - api::TraceItem::FetchEventInfo::Response, api::TraceItem::JsRpcEventInfo, \ - api::TraceItem::HibernatableWebSocketEventInfo, \ + api::TraceItem::ConnectEventInfo, api::TraceItem::CustomEventInfo, \ + api::TraceItem::ScheduledEventInfo, api::TraceItem::QueueEventInfo, \ + api::TraceItem::EmailEventInfo, api::TraceItem::TailEventInfo, \ + api::TraceItem::TailEventInfo::TailItem, api::TraceItem::FetchEventInfo, \ + api::TraceItem::FetchEventInfo::Request, api::TraceItem::FetchEventInfo::Response, \ + api::TraceItem::JsRpcEventInfo, api::TraceItem::HibernatableWebSocketEventInfo, \ api::TraceItem::HibernatableWebSocketEventInfo::Message, \ api::TraceItem::HibernatableWebSocketEventInfo::Close, \ api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \ diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index d743c3ac575..cb7dcae0dae 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -24,6 +24,7 @@ namespace { V(CFJSON, "cfJson") \ V(CLOSE, "close") \ V(CODE, "code") \ + V(CONNECT, "connect") \ V(COUNT, "count") \ V(CPUTIME, "cpuTime") \ V(CRON, "cron") \ @@ -292,6 +293,12 @@ jsg::JsValue ToJs(jsg::Lock& js, const HibernatableWebSocketEventInfo& info, Str return obj; } +jsg::JsValue ToJs(jsg::Lock& js, const ConnectEventInfo& info, StringCache& cache) { + auto obj = js.obj(); + obj.set(js, TYPE_STR, cache.get(js, CONNECT_STR)); + return obj; +} + jsg::JsValue ToJs(jsg::Lock& js, const CustomEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, CUSTOM_STR)); @@ -385,6 +392,9 @@ jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } + KJ_CASE_ONEOF(connect, ConnectEventInfo) { + obj.set(js, INFO_STR, ToJs(js, connect, cache)); + } KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index a12e3bbd8a9..4d9d732dec6 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -326,6 +326,16 @@ static kj::HttpMethod validateMethod(capnp::HttpMethod method) { } // namespace +ConnectEventInfo::ConnectEventInfo() {} + +ConnectEventInfo::ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader) {} + +void ConnectEventInfo::copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const {} + +ConnectEventInfo ConnectEventInfo::clone() const { + return ConnectEventInfo(); +} + FetchEventInfo::FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers) : method(method), @@ -774,6 +784,10 @@ void Trace::copyTo(rpc::Trace::Builder builder) const { auto jsRpcBuilder = eventInfoBuilder.initJsRpc(); jsRpc.copyTo(jsRpcBuilder); } + KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { + auto connectBuilder = eventInfoBuilder.initConnect(); + connect.copyTo(connectBuilder); + } KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { auto scheduledBuilder = eventInfoBuilder.initScheduled(); scheduled.copyTo(scheduledBuilder); @@ -880,6 +894,9 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev case rpc::Trace::EventInfo::Which::JS_RPC: eventInfo = tracing::JsRpcEventInfo(e.getJsRpc()); break; + case rpc::Trace::EventInfo::Which::CONNECT: + eventInfo = tracing::ConnectEventInfo(e.getConnect()); + break; case rpc::Trace::EventInfo::Which::SCHEDULED: eventInfo = tracing::ScheduledEventInfo(e.getScheduled()); break; @@ -1096,6 +1113,9 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { case rpc::Trace::Onset::Info::JS_RPC: { return JsRpcEventInfo(info.getJsRpc()); } + case rpc::Trace::Onset::Info::CONNECT: { + return ConnectEventInfo(info.getConnect()); + } case rpc::Trace::Onset::Info::SCHEDULED: { return ScheduledEventInfo(info.getScheduled()); } @@ -1126,6 +1146,9 @@ void writeOnsetInfo(const Onset::Info& info, rpc::Trace::Onset::Info::Builder& i KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); } + KJ_CASE_ONEOF(connect, ConnectEventInfo) { + connect.copyTo(infoBuilder.initConnect()); + } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { jsrpc.copyTo(infoBuilder.initJsRpc()); } @@ -1278,6 +1301,9 @@ EventInfo cloneEventInfo(const EventInfo& info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); } + KJ_CASE_ONEOF(connect, ConnectEventInfo) { + return connect.clone(); + } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.clone(); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index dfd34b06b77..cba5936f0d0 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -362,6 +362,15 @@ struct JsRpcEventInfo final { kj::String toString() const; }; +class ConnectEventInfo { + public: + explicit ConnectEventInfo(); + explicit ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader); + + void copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const; + ConnectEventInfo clone() const; +}; + // Describes a scheduled request struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); @@ -577,6 +586,7 @@ using EventInfo = kj::OneOf; EventInfo cloneEventInfo(const EventInfo& info); diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 0a9458d007e..f07983ef86d 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -249,6 +249,30 @@ void WorkerEntrypoint::init(kj::Own worker, .attach(kj::mv(actor)); } +kj::Exception exceptionToPropagate(bool isInternalException, kj::Exception&& exception) { + if (isInternalException) { + // We've already logged it here, the only thing that matters to the client is that we failed + // due to an internal error. Note that this does not need to be labeled "remote." since jsg + // will sanitize it as an internal error. Note that we use `setDescription()` to preserve + // the exception type for `jsg::exceptionToJs(...)` downstream. + exception.setDescription(kj::str("worker_do_not_log; Request failed due to internal error")); + return kj::mv(exception); + } else { + // We do not care how many remote capnp servers this went through since we are returning + // it to the worker via jsg. + // TODO(someday) We also do this stripping when making the tunneled exception for + // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type + // instead of `loggedExceptionEarlier`. It would save use some work. + auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); + if (!description.startsWith("remote.")) { + // If we already were annotated as remote from some other worker entrypoint, no point + // adding an additional prefix. + exception.setDescription(kj::str("remote.", description)); + } + return kj::mv(exception); + } +} + kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, @@ -435,31 +459,6 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, } } - auto exceptionToPropagate = [&]() { - if (isInternalException) { - // We've already logged it here, the only thing that matters to the client is that we failed - // due to an internal error. Note that this does not need to be labeled "remote." since jsg - // will sanitize it as an internal error. Note that we use `setDescription()` to preserve - // the exception type for `jsg::exceptionToJs(...)` downstream. - exception.setDescription( - kj::str("worker_do_not_log; Request failed due to internal error")); - return kj::mv(exception); - } else { - // We do not care how many remote capnp servers this went through since we are returning - // it to the worker via jsg. - // TODO(someday) We also do this stripping when making the tunneled exception for - // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type - // instead of `loggedExceptionEarlier`. It would save use some work. - auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); - if (!description.startsWith("remote.")) { - // If we already were annotated as remote from some other worker entrypoint, no point - // adding an additional prefix. - exception.setDescription(kj::str("remote.", description)); - } - return kj::mv(exception); - } - }; - if (wrappedResponse->isSent()) { // We can't fail open if the response was already sent, so set `failOpenService` null so that // that branch isn't taken below. @@ -471,7 +470,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate(); + return exceptionToPropagate(isInternalException, kj::mv(exception)); } else KJ_IF_SOME(service, failOpenService) { // Fall back to origin. @@ -505,7 +504,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Like with the isActor check, we want to return exceptions back to the caller. // We don't want to handle this case the same as the isActor case though, since we want // fail-open to operate normally, which means this case must happen after fail-open handling. - return exceptionToPropagate(); + return exceptionToPropagate(isInternalException, kj::mv(exception)); } else { // Return error. @@ -542,19 +541,17 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "connect() can only be called once")); this->incomingRequest = kj::none; - // Whenever we implement incoming connections over the `connect` handler we need to remember to - // add tracing `onset` and `return` events using setEventInfo()/setReturn(), as with the other - // event types here. - incomingRequest->delivered(); auto& context = incomingRequest->getContext(); + auto featureFlags = context.getWorker().getIsolate().getApi().getFeatureFlags(); - KJ_DEFER({ - // Since we called incomingRequest->delivered, we are obliged to call `drain()`. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - }); + if (featureFlags.getConnectPassThrough()) { + incomingRequest->delivered(); - if (context.getWorker().getIsolate().getApi().getFeatureFlags().getConnectPassThrough()) { + KJ_DEFER({ + // Since we called incomingRequest->delivered, we are obliged to call `drain()`. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + }); // connect_pass_through feature flag means we should just forward the connect request on to // the global outbound. @@ -564,9 +561,98 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, // Note: Intentionally return without co_await so that the `incomingRequest` is destroyed, // because we don't have any need to keep the context around. return next->connect(host, headers, connection, response, settings); + } else if (!featureFlags.getWorkerdExperimental()) { + JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); + } + + // Capture workerTracer, see request() for rationale. + kj::Maybe workerTracer; + + bool isActor = context.getActor() != kj::none; + + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + t.setEventInfo(*incomingRequest, tracing::ConnectEventInfo()); + workerTracer = t; } + incomingRequest->delivered(); + + auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); + + return context + .run( + [this, &headers, &context, &connection, &response, entrypointName = entrypointName, + versionInfo = kj::mv(versionInfo), host = kj::str(host)](Worker::Lock& lock) mutable { + jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + + return lock.getGlobalScope().connect(kj::mv(host), headers, connection, response, lock, + lock.getExportedHandler( + entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor())); + }) + .then([&context, workerTracer]() { + KJ_IF_SOME(t, workerTracer) { + t.setReturn(context.now()); + } + }) + .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { + // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of + // logging internal errors to syslog. + loggedExceptionEarlier = true; + context.logUncaughtExceptionAsync(UncaughtExceptionSource::REQUEST_HANDLER, kj::cp(exception)); + + // Do not allow the exception to escape the isolate without waiting for the output gate to + // open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`. + return context.waitForOutputLocks().then( + [exception = kj::mv(exception)]() mutable -> kj::Promise { + return kj::mv(exception); + }); + }) + .attach(kj::defer([this, incomingRequest = kj::mv(incomingRequest), &context]() mutable { + // The request has been canceled, but allow it to continue executing in the background. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + })) + .catch_([this, isActor, &response, metrics = kj::mv(metricsForCatch), workerTracer]( + kj::Exception&& exception) mutable -> kj::Promise { + // Don't return errors to end user. + auto isInternalException = !jsg::isTunneledException(exception.getDescription()) && + !jsg::isDoNotLogException(exception.getDescription()); + if (!loggedExceptionEarlier) { + // This exception seems to have originated during the deferred proxy task, so it was not + // logged to the IoContext earlier. + if (exception.getType() != kj::Exception::Type::DISCONNECTED && isInternalException) { + LOG_EXCEPTION("workerEntrypoint", exception); + } else { + KJ_LOG(INFO, exception); // Run with --verbose to see exception logs. + } + } - JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); + if (isActor || tunnelExceptions) { + // We want to tunnel exceptions from actors back to the caller. + // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another + // worker, not just for actors (and W2W below), but getting that right will require cleaning + // up error handling more generally. + return exceptionToPropagate(isInternalException, kj::mv(exception)); + } else { + // Return error. + + // We're catching the exception and replacing it with 5xx, but metrics should still indicate + // an exception. + metrics->reportFailure(exception); + + kj::HttpHeaders headers(threadContext.getHeaderTable()); + if (exception.getType() == kj::Exception::Type::OVERLOADED) { + response.reject(503, "Service Unavailable", headers, static_cast(0)); + } else { + response.reject(500, "Internal Server Error", headers, static_cast(0)); + } + // TODO(o11y): Should we also indicate a return response code for TCP? + KJ_IF_SOME(t, workerTracer) { + t.setReturn(kj::none); + } + + return kj::READY_NOW; + } + }); } kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 2a4a9fb54a5..7c952458d12 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -97,6 +97,7 @@ struct Trace @0x8e8d911203762d34 { email @16 :EmailEventInfo; trace @18 :TraceEventInfo; hibernatableWebSocket @20 :HibernatableWebSocketEventInfo; + connect @28 :ConnectEventInfo; } struct FetchEventInfo { method @0 :HttpMethod; @@ -114,6 +115,9 @@ struct Trace @0x8e8d911203762d34 { methodName @0 :Text; } + struct ConnectEventInfo { + } + struct ScheduledEventInfo { scheduledTime @0 :Float64; cron @1 :Text; @@ -269,6 +273,7 @@ struct Trace @0x8e8d911203762d34 { email @5 :EmailEventInfo; trace @6 :TraceEventInfo; hibernatableWebSocket @7 :HibernatableWebSocketEventInfo; + connect @9 :ConnectEventInfo; custom @8 :CustomEventInfo; } } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 43b84c8be80..2c7e7f6612f 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -5328,17 +5329,20 @@ class Server::HttpListener final: public kj::Refcounted { kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override { + TRACE_EVENT("workerd", "Connection:connect()"); KJ_IF_SOME(h, parent.rewriter->getCapnpConnectHost()) { if (h == host) { // Client is requesting to open a capnp session! response.accept(200, "OK", kj::HttpHeaders(parent.headerTable)); - return parent.acceptCapnpConnection(connection); + co_return co_await parent.acceptCapnpConnection(connection); } } - // TODO(someday): Deliver connect() event to to worker? For now we call the default - // implementation which throws an exception. - return kj::HttpService::connect(host, headers, connection, response, kj::mv(settings)); + IoChannelFactory::SubrequestMetadata metadata; + metadata.cfBlobJson = mapCopyString(cfBlobJson); + + auto worker = parent.service->startRequest(kj::mv(metadata)); + co_return co_await worker->connect(host, headers, connection, response, kj::mv(settings)); } // --------------------------------------------------------------------------- @@ -5358,6 +5362,57 @@ class Server::HttpListener final: public kj::Refcounted { }; }; +class Server::TcpListener final: public kj::Refcounted { + public: + TcpListener(Server& owner, + kj::Own listener, + kj::Own service, + kj::HttpHeaderTable& headerTable, + kj::StringPtr addrStr) + : owner(owner), + listener(kj::mv(listener)), + service(kj::mv(service)), + headerTable(headerTable), + addrStr(addrStr) {} + + kj::Promise run() { + TRACE_EVENT("workerd", "TcpListener::run"); + for (;;) { + kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); + TRACE_EVENT("workerd", "TcpListener handle connection"); + + IoChannelFactory::SubrequestMetadata metadata; + auto req = service->startRequest(kj::mv(metadata)); + auto response = kj::heap(); + kj::HttpHeaders headers(headerTable); + owner.tasks.add(req->connect(addrStr, headers, *stream.stream, *response, {}) + .attach(kj::mv(stream.stream), kj::mv(response)) + .attach(kj::mv(req))); + } + } + + private: + Server& owner; + kj::Own listener; + kj::Own service; + kj::HttpHeaderTable& headerTable; + kj::StringPtr addrStr; + + struct ResponseWrapper final: public kj::HttpService::ConnectResponse { + void accept( + uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers) override { + // Ok.. we're accepting the connection... anything to do? + } + kj::Own reject(uint statusCode, + kj::StringPtr statusText, + const kj::HttpHeaders& headers, + kj::Maybe expectedBodySize = kj::none) override { + // Doh... we're rejecting the connection... anything to do? + return newNullOutputStream(); + } + }; +}; + kj::Promise Server::listenHttp(kj::Own listener, kj::Own service, kj::StringPtr physicalProtocol, @@ -5368,6 +5423,13 @@ kj::Promise Server::listenHttp(kj::Own listener, co_return co_await obj->run(); } +kj::Promise Server::listenTcp( + kj::Own listener, kj::Own service, kj::StringPtr addrStr) { + auto obj = kj::refcounted( + *this, kj::mv(listener), kj::mv(service), globalContext->headerTable, addrStr); + co_return co_await obj->run(); +} + // ======================================================================================= // Debug port for exposing all services via RPC @@ -5836,6 +5898,15 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, physicalProtocol = "https"; goto validSocket; } + case config::Socket::TCP: { + auto tcp = sock.getTcp(); + // No default port + // No physical protocol mention here. + if (tcp.hasTlsOptions()) { + tls = makeTlsContext(tcp.getTlsOptions()); + } + goto validSocket; + } } reportConfigError(kj::str("Encountered unknown socket type in \"", name, "\". Was the config compiled with a " @@ -5867,9 +5938,15 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, auto rewriter = kj::heap(httpOptions, headerTableBuilder); auto handle = kj::coCapture( - [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name]( + [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name, + isHttp = sock.which() != config::Socket::TCP, addrStr]( kj::Promise> promise) mutable -> kj::Promise { - TRACE_EVENT("workerd", "setup listenHttp"); + if (isHttp) { + TRACE_EVENT("workerd", "setup listenHttp"); + } else { + TRACE_EVENT("workerd", "setup listenTcp"); + } + auto listener = co_await promise; KJ_IF_SOME(stream, controlOverride) { auto message = kj::str("{\"event\":\"listen\",\"socket\":\"", name, @@ -5880,7 +5957,12 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, KJ_LOG(ERROR, e); } } - co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); + + if (isHttp) { + co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); + } else { + co_await listenTcp(kj::mv(listener), kj::mv(service), addrStr); + } }); tasks.add(handle(kj::mv(listener)).exclusiveJoin(forkedDrainWhen.addBranch())); } diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6162b1107f8..e7499053c93 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -290,6 +290,9 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::StringPtr physicalProtocol, kj::Own rewriter); + kj::Promise listenTcp( + kj::Own listener, kj::Own service, kj::StringPtr addrStr); + kj::Promise listenDebugPort(kj::Own listener); class InvalidConfigService; @@ -302,6 +305,7 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl class WorkerEntrypointService; class WorkerdBootstrapImpl; class HttpListener; + class TcpListener; class DebugPortListener; struct ErrorReporter; diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 9e1cb23b62a..9be9b11854b 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -143,8 +143,11 @@ struct Socket { options @3 :HttpOptions; tlsOptions @4 :TlsOptions; } + tcp :group { + tlsOptions @6 :TlsOptions; + } - # TODO(someday): TCP, TCP proxy, SMTP, Cap'n Proto, ... + # TODO(someday): TCP proxy, SMTP, Cap'n Proto, ... } service @5 :ServiceDesignator; diff --git a/types/defines/rpc.d.ts b/types/defines/rpc.d.ts index fe4b87d5378..ae73fc15b97 100644 --- a/types/defines/rpc.d.ts +++ b/types/defines/rpc.d.ts @@ -239,6 +239,7 @@ declare namespace CloudflareWorkersModule { email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -260,6 +261,7 @@ declare namespace CloudflareWorkersModule { alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer diff --git a/types/defines/trace.d.ts b/types/defines/trace.d.ts index 79ce6fd5e40..f316c6a3dee 100644 --- a/types/defines/trace.d.ts +++ b/types/defines/trace.d.ts @@ -74,6 +74,10 @@ interface FetchResponseInfo { readonly statusCode: number; } +interface ConnectEventInfo { + readonly type: "connect"; +} + type EventOutcome = "ok" | "canceled" | "exception" | "unknown" | "killSwitch" | "daemonDown" | "exceededCpu" | "exceededMemory" | "loadShed" | "responseStreamDisconnected" | "scriptNotFound"; @@ -95,10 +99,10 @@ interface Onset { readonly scriptName?: string; readonly scriptTags?: string[]; readonly scriptVersion?: ScriptVersion; - readonly info: FetchEventInfo | JsRpcEventInfo | ScheduledEventInfo | - AlarmEventInfo | QueueEventInfo | EmailEventInfo | - TraceEventInfo | HibernatableWebSocketEventInfo | - CustomEventInfo; + readonly info: FetchEventInfo | ConnectEventInfo | JsRpcEventInfo | + ScheduledEventInfo | AlarmEventInfo | QueueEventInfo | + EmailEventInfo | TraceEventInfo | + HibernatableWebSocketEventInfo | CustomEventInfo; } interface Outcome { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 70d3b343bcc..b7917be341a 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -511,6 +511,11 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +type ExportedHandlerConnectHandler = ( + socket: Socket, + env: Env, + ctx: ExecutionContext, +) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -552,6 +557,7 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -583,6 +589,7 @@ declare abstract class ColoLocalActorNamespace { } interface DurableObject { fetch(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -600,7 +607,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3233,6 +3240,7 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3261,6 +3269,7 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -3808,6 +3817,7 @@ interface Socket { get secureTransport(): "on" | "off" | "starttls"; close(): Promise; startTls(options?: TlsOptions): Socket; + proxyTo(sock: Socket): Promise; } interface SocketOptions { secureTransport?: string; @@ -12616,6 +12626,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12636,6 +12647,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13646,6 +13658,9 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } + interface ConnectEventInfo { + readonly type: "connect"; + } type EventOutcome = | "ok" | "canceled" @@ -13676,6 +13691,7 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo + | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 4fb737e9da5..ae73b1daed1 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -513,6 +513,11 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +export type ExportedHandlerConnectHandler = ( + socket: Socket, + env: Env, + ctx: ExecutionContext, +) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -554,6 +559,7 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -585,6 +591,7 @@ export declare abstract class ColoLocalActorNamespace { } export interface DurableObject { fetch(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -602,7 +609,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3239,6 +3246,7 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3267,6 +3275,7 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -3814,6 +3823,7 @@ export interface Socket { get secureTransport(): "on" | "off" | "starttls"; close(): Promise; startTls(options?: TlsOptions): Socket; + proxyTo(sock: Socket): Promise; } export interface SocketOptions { secureTransport?: string; @@ -12581,6 +12591,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12601,6 +12612,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13601,6 +13613,9 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } + interface ConnectEventInfo { + readonly type: "connect"; + } type EventOutcome = | "ok" | "canceled" @@ -13631,6 +13646,7 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo + | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index 29541934341..3b4ce2b57cb 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -489,6 +489,11 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +type ExportedHandlerConnectHandler = ( + socket: Socket, + env: Env, + ctx: ExecutionContext, +) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -530,6 +535,7 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -557,6 +563,7 @@ interface Cloudflare { } interface DurableObject { fetch(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -574,7 +581,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3120,6 +3127,7 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3148,6 +3156,7 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -3685,6 +3694,7 @@ interface Socket { get secureTransport(): "on" | "off" | "starttls"; close(): Promise; startTls(options?: TlsOptions): Socket; + proxyTo(sock: Socket): Promise; } interface SocketOptions { secureTransport?: string; @@ -11961,6 +11971,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -11981,6 +11992,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -12991,6 +13003,9 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } + interface ConnectEventInfo { + readonly type: "connect"; + } type EventOutcome = | "ok" | "canceled" @@ -13021,6 +13036,7 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo + | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index c5ae3fbe58d..89846d25ece 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -491,6 +491,11 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +export type ExportedHandlerConnectHandler = ( + socket: Socket, + env: Env, + ctx: ExecutionContext, +) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -532,6 +537,7 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -559,6 +565,7 @@ export interface Cloudflare { } export interface DurableObject { fetch(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -576,7 +583,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3126,6 +3133,7 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3154,6 +3162,7 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -3691,6 +3700,7 @@ export interface Socket { get secureTransport(): "on" | "off" | "starttls"; close(): Promise; startTls(options?: TlsOptions): Socket; + proxyTo(sock: Socket): Promise; } export interface SocketOptions { secureTransport?: string; @@ -11926,6 +11936,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -11946,6 +11957,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -12946,6 +12958,9 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } + interface ConnectEventInfo { + readonly type: "connect"; + } type EventOutcome = | "ok" | "canceled" @@ -12976,6 +12991,7 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo + | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo