diff --git a/.bazelrc b/.bazelrc index 84bdf679165..e5f34963cce 100644 --- a/.bazelrc +++ b/.bazelrc @@ -357,7 +357,12 @@ build:windows --extra_execution_platforms=//:x64_windows-clang-cl # workaround. build:windows_no_dbg -c opt -build:windows_no_dbg --copt='/Od' +# build:windows_no_dbg --copt='/Od' +# build:windows_no_dbg --per_file_copt=src/workerd/api@/Od +build:windows_no_dbg --per_file_copt=src/workerd/io/worker-interface@/Od +# build:windows_no_dbg --per_file_copt=src/workerd/jsg@/Od +# build:windows_no_dbg --per_file_copt=src/workerd/util@/Od +# build:windows_no_dbg --per_file_copt=src/workerd/server@/Od build:windows_no_dbg --linkopt='/INCREMENTAL:NO' build:windows_no_dbg --features=-smaller_binary @@ -431,7 +436,9 @@ build:release_windows --copt="/clang:-O3" build:release_windows --copt="-fstrict-aliasing" # This file breaks our CI windows release builds when compiled using O2/O3 # Ref: https://github.com/llvm/llvm-project/issues/136481 -build:release_windows --per_file_copt=.*capnp/rpc\.c++@/clang:-O1 +build:windows --per_file_copt=.*capnp/rpc\.c++@"/O2,/Gy,/Gw,-fstrict-aliasing,/clang:-O3" +build:windows --per_file_copt="src/workerd/server/server"@/O2,/Gy,/Gw,-fstrict-aliasing,/clang:-O3,/clang:-fno-inline +build:windows --per_file_copt="src/workerd/server/container-client"@/O2,/Gy,/Gw,-fstrict-aliasing,/clang:-O3,/clang:-fno-inline build:windows --cxxopt='/std:c++23preview' --host_cxxopt='/std:c++23preview' build:windows --copt='/D_CRT_USE_BUILTIN_OFFSETOF' --host_copt='/D_CRT_USE_BUILTIN_OFFSETOF' diff --git a/.github/workflows/_bazel.yml b/.github/workflows/_bazel.yml index 41a5ccf6136..4bc4f1a5139 100644 --- a/.github/workflows/_bazel.yml +++ b/.github/workflows/_bazel.yml @@ -126,7 +126,7 @@ jobs: bazel --nowindows_enable_symlinks build ${{ inputs.extra_bazel_args }} --config=ci --profile build-win-workaround.bazel-profile.gz --remote_cache=https://bazel:${{ secrets.BAZEL_CACHE_KEY }}@bazel-remote-cache.devprod.cloudflare.dev //src/wpt:wpt-all@tsproject //src/node:node@tsproject //src/pyodide:pyodide_static@tsproject - name: Bazel build run: | - bazel build --remote_cache=https://bazel:${{ secrets.BAZEL_CACHE_KEY }}@bazel-remote-cache.devprod.cloudflare.dev --config=ci ${{ inputs.extra_bazel_args }} //... + bazel build --remote_cache=https://bazel:${{ secrets.BAZEL_CACHE_KEY }}@bazel-remote-cache.devprod.cloudflare.dev --config=ci ${{ inputs.extra_bazel_args }} //src/workerd/api/tests/... //src/workerd/server:server-test@ //src/workerd/server:server-test@all-autogates - name: Configure Docker daemon for IPv6 if: inputs.build_container_images run: | @@ -145,7 +145,7 @@ jobs: - name: Bazel test if: inputs.run_tests run: | - bazel test --remote_cache=https://bazel:${{ secrets.BAZEL_CACHE_KEY }}@bazel-remote-cache.devprod.cloudflare.dev --config=ci ${{ inputs.extra_bazel_args }} ${{ inputs.test_target }} + bazel test --remote_cache=https://bazel:${{ secrets.BAZEL_CACHE_KEY }}@bazel-remote-cache.devprod.cloudflare.dev --config=ci ${{ inputs.extra_bazel_args }} //src/workerd/api/tests/... //src/workerd/server:server-test@ //src/workerd/server:server-test@all-autogates - name: Bazel coverage if: inputs.run_coverage run: | diff --git a/samples/tcp-ingress/README.md b/samples/tcp-ingress/README.md deleted file mode 100644 index fe332ffe8e1..00000000000 --- a/samples/tcp-ingress/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# REPL Server - -This sample contains a simple TCP server based on the connect() handler. When a connection gets -established on port 8081, it simply pipes the input stream to the output. - -## How to use -``` -./bazel-bin/src/workerd/server/workerd serve samples/tcp-ingress/config.capnp --experimental -echo "Hello World!" | nc localhost 8081 -``` diff --git a/samples/tcp-ingress/config.capnp b/samples/tcp-ingress/config.capnp deleted file mode 100644 index ab33c6ca405..00000000000 --- a/samples/tcp-ingress/config.capnp +++ /dev/null @@ -1,20 +0,0 @@ -using Workerd = import "/workerd/workerd.capnp"; - -const tcpIngressExample :Workerd.Config = ( - services = [ - (name = "main", worker = .worker), - ], - - sockets = [ - ( name = "http", address = "*:8080", http = (), service = "main" ), - ( name = "tcp", address = "*:8081", tcp = (), service = "main" ) - ] -); - -const worker :Workerd.Worker = ( - modules = [ - (name = "worker", esModule = embed "worker.js") - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - compatibilityDate = "2026-03-01", -); diff --git a/samples/tcp-ingress/worker.js b/samples/tcp-ingress/worker.js deleted file mode 100644 index 32aafb0b78d..00000000000 --- a/samples/tcp-ingress/worker.js +++ /dev/null @@ -1,11 +0,0 @@ - -export default { - async fetch(req) { - return new Response("ok"); - }, - - async connect(socket) { - // pipe the input stream to the output - await socket.readable.pipeTo(socket.writable); - } -}; diff --git a/src/workerd/api/actor.h b/src/workerd/api/actor.h index 7b2976e749e..39a6862268e 100644 --- a/src/workerd/api/actor.h +++ b/src/workerd/api/actor.h @@ -108,7 +108,6 @@ class DurableObject final: public Fetcher { JSG_TS_DEFINE(interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?(ws: WebSocket, message: string | ArrayBuffer): void | Promise; webSocketClose?(ws: WebSocket, code: number, reason: string, wasClean: boolean): void | Promise; @@ -116,7 +115,7 @@ class DurableObject final: public Fetcher { }); JSG_TS_OVERRIDE( type DurableObjectStub = - Fetcher + Fetcher & { readonly id: DurableObjectId; readonly name?: string; diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 3176e922f06..31a76beae01 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -8,32 +8,10 @@ #include #include -#include - #include namespace workerd::api { -namespace { - -kj::Maybe parseRestorePath(kj::StringPtr path) { - JSG_REQUIRE(path.size() > 0 && path[0] == '/', TypeError, - "Directory snapshot restore path must be absolute. Got: ", path); - - try { - auto parsed = kj::Path::parse(path.slice(1)); - if (parsed.size() == 0) { - return kj::none; - } - return kj::mv(parsed); - } catch (kj::Exception&) { - JSG_FAIL_REQUIRE( - TypeError, "Directory snapshot restore path contains invalid components: ", path); - } -} - -} // namespace - // ======================================================================================= // Basic lifecycle methods @@ -108,14 +86,6 @@ void Container::start(jsg::Lock& js, jsg::Optional maybeOptions) auto entry = list[i]; auto& restore = snapshots[i]; auto& snap = restore.snapshot; - auto effectiveRestoreDir = snap.dir.asPtr(); - KJ_IF_SOME(mp, restore.mountPoint) { - effectiveRestoreDir = mp.asPtr(); - } - - JSG_REQUIRE_NONNULL(parseRestorePath(effectiveRestoreDir), Error, - "Directory snapshot cannot be restored to root directory."); - double size = snap.size; JSG_REQUIRE(std::isfinite(size) && size >= 0 && size <= static_cast(jsg::MAX_SAFE_INTEGER) && std::floor(size) == size, diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index ba4cb0c9f8e..28909d5609d 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -15,7 +15,6 @@ #endif #include #include -#include #include #include #include @@ -87,7 +86,6 @@ jsg::LenientOptional mapAddRef(jsg::Lock& js, jsg::LenientOptional& functi ExportedHandler ExportedHandler::clone(jsg::Lock& js) { return ExportedHandler{ .fetch{mapAddRef(js, fetch)}, - .connect{mapAddRef(js, connect)}, .tail{mapAddRef(js, tail)}, .trace{mapAddRef(js, trace)}, .tailStream{mapAddRef(js, tailStream)}, @@ -120,49 +118,6 @@ void ServiceWorkerGlobalScope::clear() { unhandledRejections.clear(); } -kj::Promise ServiceWorkerGlobalScope::connect(kj::String host, - const kj::HttpHeaders& headers, - kj::AsyncIoStream& connection, - kj::HttpService::ConnectResponse& response, - Worker::Lock& lock, - kj::Maybe exportedHandler) { - ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error, - "Connect ingress is not currently supported with Service Workers syntax."); - KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(), - "connect handling requires the experimental flag."); - - KJ_IF_SOME(handler, eh.connect) { - // Has a connect handler! - response.accept(200, "OK", headers); - - // Using neuterable stream to manage lifetime of stream promises - auto ownConnection = newNeuterableIoStream(connection); - - auto& ioContext = IoContext::current(); - jsg::Lock& js = lock; - - // TLS support is not implemented so far. Note that setupSocket() expects the domain parameter - // to be set to the expected host name using startTLS, so that it can be provided to the TLS - // callback, so we'd need to change that or figure out a way to get the host domain. - auto nullTlsStarter = kj::heap(); - // We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to - // provide a more descriptive error message for HTTP, but this is not relevant on the TCP server - // side. - jsg::Ref jsSocket = setupSocket(js, kj::mv(ownConnection), kj::none, kj::none, - kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none); - // handleProxyStatus() is required to indicate that the socket was opened properly. Since the - // connection is already open at this point, exception handling is not required. - jsSocket->handleProxyStatus(js, kj::Promise>(kj::none)); - - kj::Maybe span = ioContext.makeTraceSpan("connect_handler"_kjc); - auto promise = handler(js, kj::mv(jsSocket), eh.env.addRef(js), eh.getCtx()); - return ioContext.awaitJs(js, kj::mv(promise)).attach(kj::mv(span)); - } - lock.logWarningOnce("Received a connect event but we lack a handler. " - "Did you remember to export a connect() function?"); - JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function."); -} - kj::Promise> ServiceWorkerGlobalScope::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index ffaf66fac9e..a25fb487818 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -357,10 +357,6 @@ struct ExportedHandler { jsg::Optional> ctx); jsg::LenientOptional> fetch; - using ConnectHandler = jsg::Promise( - jsg::Ref socket, jsg::Value env, jsg::Optional> ctx); - jsg::LenientOptional> connect; - using TailHandler = kj::Promise(kj::Array> events, jsg::Value env, jsg::Optional> ctx); @@ -400,7 +396,6 @@ struct ExportedHandler { jsg::SelfRef self; JSG_STRUCT(fetch, - connect, tail, trace, tailStream, @@ -418,7 +413,6 @@ struct ExportedHandler { JSG_STRUCT_TS_DEFINE( type ExportedHandlerFetchHandler = (request: Request>, env: Env, ctx: ExecutionContext) => Response | Promise; - type ExportedHandlerConnectHandler = (socket: Socket, env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailHandler = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTraceHandler = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailStreamHandler = (event : TailStream.TailEvent, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise; @@ -429,7 +423,6 @@ struct ExportedHandler { JSG_STRUCT_TS_OVERRIDE( { email?: EmailExportedHandler; fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -529,14 +522,6 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module // exports and move that code somewhere more appropriate. - // Received TCP/socket ingress (called from C++, not JS). - kj::Promise connect(kj::String host, - const kj::HttpHeaders& headers, - kj::AsyncIoStream& connection, - kj::HttpService::ConnectResponse& response, - Worker::Lock& lock, - kj::Maybe exportedHandler); - // Received sendTraces (called from C++, not JS). void sendTraces(kj::ArrayPtr> traces, Worker::Lock& lock, diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index ebfcd6b4ccb..874af6dece8 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -84,11 +84,11 @@ class StreamWorkerInterface; jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, kj::Maybe> maybeOpenedPrPair) { auto& ioContext = IoContext::current(); @@ -323,10 +323,10 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp secureTransport != SecureTransportKind::ON, TypeError, "Cannot startTls on a TLS socket."); JSG_REQUIRE(connectionData != kj::none, TypeError, "The connection was closed before startTls could be started."); + JSG_REQUIRE(domain != nullptr, TypeError, "startTls can only be called once."); auto invalidOptKindMsg = "The `secureTransport` socket option must be set to 'starttls' for startTls to be used."; JSG_REQUIRE(secureTransport == SecureTransportKind::STARTTLS, TypeError, invalidOptKindMsg); - JSG_REQUIRE(domain != kj::none, TypeError, "startTls can only be called once."); // The current socket's writable buffers need to be flushed. The socket's WritableStream is backed // by an AsyncIoStream which doesn't implement any buffering, so we don't need to worry about @@ -346,10 +346,10 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // flush to complete. While it is unlikely to be GC'd while we are waiting because // the user code *likely* is holding a active reference to it at this point, we // don't want to take any chances. This prevents a possible UAF. - JSG_VISITABLE_LAMBDA((self = JSG_THIS, domain = kj::heapString(KJ_ASSERT_NONNULL(domain)), - tlsOptions = kj::mv(tlsOptions), - openedResolver = openedPrPair.resolver.addRef(js), - remoteAddress = mapCopyString(remoteAddress)), + JSG_VISITABLE_LAMBDA( + (self = JSG_THIS, domain = kj::heapString(domain), tlsOptions = kj::mv(tlsOptions), + openedResolver = openedPrPair.resolver.addRef(js), + remoteAddress = kj::str(remoteAddress)), (self, openedResolver), (jsg::Lock & js) mutable { auto& context = IoContext::current(); @@ -381,7 +381,7 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // Fork the starter promise because we need to create two separate things waiting // on it below. The first is resolving the openedResolver with a JS promise that - // wraps one branch, the second is the kj::Promise that we use to resolve the + // wraps one branch, the secnod is the kj::Promise that we use to resolve the // secureStream for the promised stream. This keeps us from having to bounce in and // out of the JS isolate lock. auto forkedPromise = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname).fork(); @@ -410,9 +410,9 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // The existing tlsStarter gets consumed and we won't need it again. Pass in an empty tlsStarter // to `setupSocket`. auto newTlsStarter = kj::heap(); - return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), - mapCopyString(remoteAddress), kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON, - kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair)); + return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), kj::str(remoteAddress), + kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON, kj::mv(domain), + isDefaultFetchPort, kj::mv(openedPrPair)); } void Socket::handleProxyStatus( @@ -436,7 +436,7 @@ void Socket::handleProxyStatus( if (isDefaultFetchPort) { msg = kj::str(msg, ". It looks like you might be trying to connect to a HTTP-based service", " — consider using fetch instead"); - } else if (remoteAddress.orDefault(kj::String()).contains(".hyperdrive.local"_kj)) { + } else if (remoteAddress.contains(".hyperdrive.local"_kj)) { // No attempts to connect to Hyperdrive should end up here, since they go through the other // version of handleProxyStatus. If they end up here somehow, log about it to get some // context that can aid in debugging. @@ -450,7 +450,7 @@ void Socket::handleProxyStatus( // because there's no useful value we can provide. openedResolver.resolve(js, SocketInfo{ - .remoteAddress = mapCopyString(remoteAddress), + .remoteAddress = kj::str(remoteAddress), .localAddress = kj::none, }); } @@ -478,7 +478,7 @@ void Socket::handleProxyStatus(jsg::Lock& js, kj::Promise>> connectionStream, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Ref readableParam, jsg::Ref writable, jsg::PromiseResolverPair closedPrPair, @@ -68,7 +68,7 @@ class Socket: public jsg::Object { jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, jsg::PromiseResolverPair openedPrPair) : connectionData(context.addObject(kj::heap( @@ -200,12 +200,12 @@ class Socket: public jsg::Object { // Memoized copy that is returned by the `closed` attribute. jsg::MemoizedIdentity> closedPromise; jsg::Optional options; - kj::Maybe remoteAddress; + kj::String remoteAddress; // Set to true when the socket is upgraded to a secure one. bool upgraded = false; SecureTransportKind secureTransport; // The domain/ip this socket is connected to. Used for startTls. - kj::Maybe domain; + kj::String domain; // Whether the port this socket connected to is 80/443. Used for nicer errors. bool isDefaultFetchPort; // This fulfiller is used to resolve the `openedPromise` below. @@ -245,11 +245,11 @@ class Socket: public jsg::Object { jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, kj::Maybe> maybeOpenedPrPair); diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 853f12049cd..b1be96654d5 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -31,18 +31,6 @@ wd_test( data = ["delete-all-deletes-alarm-test.js"], ) -wd_test( - src = "connect-handler-test.wd-test", - args = ["--experimental"], - data = [ - "connect-handler-test.js", - "connect-handler-test-proxy.js", - ], - # Test uses TCP sockets for ports 8081-8083 and may fail when running concurrently with other - # tests that do so. - tags = ["exclusive"], -) - wd_test( src = "actor-alarms-test.wd-test", args = ["--experimental"], @@ -66,10 +54,7 @@ wd_test( "tail-worker-test-invalid.js", "tail-worker-test-jsrpc.js", "websocket-hibernation.js", - "connect-handler-test.js", - "connect-handler-test-proxy.js", ], - tags = ["exclusive"], ) # Test to validate timing semantics for JSRPC streaming responses. @@ -380,7 +365,6 @@ wd_test( src = "js-rpc-test.wd-test", args = ["--experimental"], data = ["js-rpc-test.js"], - tags = ["exclusive"], ) wd_test( @@ -631,7 +615,6 @@ wd_test( "--no-verbose", ], data = ["js-rpc-test.js"], - tags = ["exclusive"], ) wd_test( diff --git a/src/workerd/api/tests/connect-handler-test-proxy.js b/src/workerd/api/tests/connect-handler-test-proxy.js deleted file mode 100644 index af23f67b995..00000000000 --- a/src/workerd/api/tests/connect-handler-test-proxy.js +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2026 Cloudflare, Inc. -// Licensed under the Apache 2.0 license found in the LICENSE file or at: -// https://opensource.org/licenses/Apache-2.0 -import { connect } from 'cloudflare:sockets'; -import { WorkerEntrypoint } from 'cloudflare:workers'; - -export class ConnectProxy extends WorkerEntrypoint { - async connect(socket) { - // proxy for ConnectEndpoint instance on port 8083. - let upstream = connect('localhost:8083'); - await Promise.all([ - socket.readable.pipeTo(upstream.writable), - upstream.readable.pipeTo(socket.writable), - ]); - } -} - -export class ConnectEndpoint extends WorkerEntrypoint { - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello-from-endpoint')); - await writer.close(); - } -} diff --git a/src/workerd/api/tests/connect-handler-test.js b/src/workerd/api/tests/connect-handler-test.js deleted file mode 100644 index f21a2930144..00000000000 --- a/src/workerd/api/tests/connect-handler-test.js +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2026 Cloudflare, Inc. -// Licensed under the Apache 2.0 license found in the LICENSE file or at: -// https://opensource.org/licenses/Apache-2.0 -import { connect } from 'cloudflare:sockets'; -import { strictEqual } from 'assert'; - -export const connectHandler = { - async test() { - // Check that the connect handler can send a message through a socket - const socket = connect('localhost:8081'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - strictEqual(result, 'hello'); - await socket.closed; - }, -}; - -export const connectHandlerProxy = { - async test() { - // Check that we can get a message proxied through a connect handler. This call connects us with - // an instance of Server, which serves as a proxy for an instance of OtherServer, as defined in - // connect-handler-test-proxy.js. - const socket = connect('localhost:8082'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - strictEqual(result, 'hello-from-endpoint'); - await socket.closed; - }, -}; - -export default { - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello')); - await writer.close(); - }, -}; diff --git a/src/workerd/api/tests/connect-handler-test.wd-test b/src/workerd/api/tests/connect-handler-test.wd-test deleted file mode 100644 index 98b517a861a..00000000000 --- a/src/workerd/api/tests/connect-handler-test.wd-test +++ /dev/null @@ -1,36 +0,0 @@ -using Workerd = import "/workerd/workerd.capnp"; - -const unitTests :Workerd.Config = ( - services = [ - ( name = "connect-handler-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-proxy", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-endpoint", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "internet", network = ( allow = ["private"] ) ) - ], - sockets = [ - (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), - (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), - (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) - ] -); diff --git a/src/workerd/api/tests/js-rpc-socket-test.wd-test b/src/workerd/api/tests/js-rpc-socket-test.wd-test index 4c871606ed7..a13544d8cf6 100644 --- a/src/workerd/api/tests/js-rpc-socket-test.wd-test +++ b/src/workerd/api/tests/js-rpc-socket-test.wd-test @@ -72,7 +72,6 @@ const unitTests :Workerd.Config = ( http = (capnpConnectHost = "cappy") ) ), - ( name = "internet", network = ( allow = ["private"] ) ), ], sockets = [ ( name = "MyService-loop", @@ -100,8 +99,6 @@ const unitTests :Workerd.Config = ( service = (name = "js-rpc-test", entrypoint = "GreeterFactory"), http = (capnpConnectHost = "cappy") ), - # For testing connect() handler - (name = "tcp", address = "*:8081", tcp = (), service = "js-rpc-test") ], v8Flags = [ "--expose-gc" ], ); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index 4558cd517fa..a58506b249a 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -197,13 +197,6 @@ export class MyService extends WorkerEntrypoint { return new Response('method = ' + req.method + ', url = ' + req.url); } - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello')); - await writer.close(); - } - // Define a property to test behavior of property accessors. get nonFunctionProperty() { return { foo: 123 }; @@ -641,20 +634,6 @@ export let extendingEntrypointClasses = { assert.equal(svc instanceof WorkerEntrypoint, true); }, }; -export let connectBinding = { - async test(controller, env, ctx) { - let socket = await env.MyService.connect('localhost:8081'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - assert.strictEqual(result, 'hello'); - await socket.closed; - }, -}; export let namedServiceBinding = { async test(controller, env, ctx) { diff --git a/src/workerd/api/tests/tail-worker-test-receiver.js b/src/workerd/api/tests/tail-worker-test-receiver.js index 7cbf5082685..a71d5dfdeca 100644 --- a/src/workerd/api/tests/tail-worker-test-receiver.js +++ b/src/workerd/api/tests/tail-worker-test-receiver.js @@ -32,7 +32,7 @@ export const test = { // The shared tail worker we configured only produces onset and outcome events, so every trace is identical here. // Number of traces based on how often main tail worker is invoked from previous tests - let numTraces = 32; + let numTraces = 29; let basicTrace = '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}'; assert.deepStrictEqual( diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index 434d8357db0..a99f08bb12b 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -132,10 +132,6 @@ export const test = { // Test for transient objects - getCounter returns an object with methods // All transient calls happen in a single trace event, with only the entrypoint method reported '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - // tests/connect-handler-test.js: connect events - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandler","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"connect"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandlerProxy","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', ]; assert.deepStrictEqual(response, expected); diff --git a/src/workerd/api/tests/tail-worker-test.wd-test b/src/workerd/api/tests/tail-worker-test.wd-test index 75aed0f3abe..f58e8f8d999 100644 --- a/src/workerd/api/tests/tail-worker-test.wd-test +++ b/src/workerd/api/tests/tail-worker-test.wd-test @@ -32,32 +32,6 @@ const unitTests :Workerd.Config = ( (name = "alarms", worker = .alarmsWorker), (name = "hiber", worker = .hiberWorker), (name = "js-rpc-test", worker = .jsRpcWorker), - ( name = "connect-handler-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - streamingTails = ["log"], - ) - ), - ( name = "connect-handler-test-proxy", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-endpoint", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "internet", network = ( allow = ["private"] ) ), (name = "TEST_TMPDIR", disk = (writable = true)), # Dummy buffered tail worker (gets traces from alarms worker and produces trace for main tracer) (name = "buffered", worker = .logBuffered, ), @@ -76,11 +50,6 @@ const unitTests :Workerd.Config = ( ), ) ], - sockets = [ - (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), - (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), - (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) - ] ); const alarmsWorker :Workerd.Worker = ( diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 0885754f7d9..53cbd8f14fe 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -164,9 +164,6 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { return kj::Maybe(js.alloc(trace, scheduled)); } - KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { - return kj::Maybe(jsg::alloc(js, trace, connect)); - } KJ_CASE_ONEOF(alarm, tracing::AlarmEventInfo) { return kj::Maybe(js.alloc(trace, alarm)); } @@ -253,9 +250,6 @@ kj::Maybe TraceItem::getEvent(jsg::Lock& js) { KJ_CASE_ONEOF(info, jsg::Ref) { return info.addRef(); } - KJ_CASE_ONEOF(info, jsg::Ref) { - return info.addRef(); - } } KJ_UNREACHABLE; }); @@ -755,9 +749,6 @@ void TraceItem::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_CASE_ONEOF(info, jsg::Ref) { tracker.trackField("eventInfo", info); } - KJ_CASE_ONEOF(info, jsg::Ref) { - tracker.trackField("eventInfo", info); - } } } for (const auto& log: logs) { @@ -816,7 +807,4 @@ void TraceItem::HibernatableWebSocketEventInfo::visitForMemoryInfo( } } -TraceItem::ConnectEventInfo::ConnectEventInfo( - jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo) {} - } // namespace workerd::api diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 1fb7f17e3b7..7f2a8c537a0 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -73,7 +73,6 @@ class TraceItem final: public jsg::Object { class FetchEventInfo; class JsRpcEventInfo; - class ConnectEventInfo; class ScheduledEventInfo; class AlarmEventInfo; class QueueEventInfo; @@ -86,7 +85,6 @@ class TraceItem final: public jsg::Object { using EventInfo = kj::OneOf, jsg::Ref, - jsg::Ref, jsg::Ref, jsg::Ref, jsg::Ref, @@ -289,14 +287,6 @@ class TraceItem::JsRpcEventInfo final: public jsg::Object { kj::String rpcMethod; }; -class TraceItem::ConnectEventInfo final: public jsg::Object { - public: - explicit ConnectEventInfo( - jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo); - - JSG_RESOURCE_TYPE(ConnectEventInfo) {} -}; - class TraceItem::ScheduledEventInfo final: public jsg::Object { public: explicit ScheduledEventInfo(const Trace& trace, const tracing::ScheduledEventInfo& eventInfo); @@ -660,12 +650,12 @@ class TraceCustomEvent final: public WorkerInterface::CustomEvent { #define EW_TRACE_ISOLATE_TYPES \ api::ScriptVersion, api::TailEvent, api::TraceItem, api::TraceItem::AlarmEventInfo, \ - api::TraceItem::ConnectEventInfo, api::TraceItem::CustomEventInfo, \ - api::TraceItem::ScheduledEventInfo, api::TraceItem::QueueEventInfo, \ - api::TraceItem::EmailEventInfo, api::TraceItem::TailEventInfo, \ - api::TraceItem::TailEventInfo::TailItem, api::TraceItem::FetchEventInfo, \ - api::TraceItem::FetchEventInfo::Request, api::TraceItem::FetchEventInfo::Response, \ - api::TraceItem::JsRpcEventInfo, api::TraceItem::HibernatableWebSocketEventInfo, \ + api::TraceItem::CustomEventInfo, api::TraceItem::ScheduledEventInfo, \ + api::TraceItem::QueueEventInfo, api::TraceItem::EmailEventInfo, \ + api::TraceItem::TailEventInfo, api::TraceItem::TailEventInfo::TailItem, \ + api::TraceItem::FetchEventInfo, api::TraceItem::FetchEventInfo::Request, \ + api::TraceItem::FetchEventInfo::Response, api::TraceItem::JsRpcEventInfo, \ + api::TraceItem::HibernatableWebSocketEventInfo, \ api::TraceItem::HibernatableWebSocketEventInfo::Message, \ api::TraceItem::HibernatableWebSocketEventInfo::Close, \ api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \ diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index cb7dcae0dae..d743c3ac575 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -24,7 +24,6 @@ namespace { V(CFJSON, "cfJson") \ V(CLOSE, "close") \ V(CODE, "code") \ - V(CONNECT, "connect") \ V(COUNT, "count") \ V(CPUTIME, "cpuTime") \ V(CRON, "cron") \ @@ -293,12 +292,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const HibernatableWebSocketEventInfo& info, Str return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const ConnectEventInfo& info, StringCache& cache) { - auto obj = js.obj(); - obj.set(js, TYPE_STR, cache.get(js, CONNECT_STR)); - return obj; -} - jsg::JsValue ToJs(jsg::Lock& js, const CustomEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, CUSTOM_STR)); @@ -392,9 +385,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - obj.set(js, INFO_STR, ToJs(js, connect, cache)); - } KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 32f72e026b6..08f2734ba7d 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -326,16 +326,6 @@ static kj::HttpMethod validateMethod(capnp::HttpMethod method) { } // namespace -ConnectEventInfo::ConnectEventInfo() {} - -ConnectEventInfo::ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader) {} - -void ConnectEventInfo::copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const {} - -ConnectEventInfo ConnectEventInfo::clone() const { - return ConnectEventInfo(); -} - FetchEventInfo::FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers) : method(method), @@ -791,10 +781,6 @@ void Trace::copyTo(rpc::Trace::Builder builder) const { auto jsRpcBuilder = eventInfoBuilder.initJsRpc(); jsRpc.copyTo(jsRpcBuilder); } - KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { - auto connectBuilder = eventInfoBuilder.initConnect(); - connect.copyTo(connectBuilder); - } KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { auto scheduledBuilder = eventInfoBuilder.initScheduled(); scheduled.copyTo(scheduledBuilder); @@ -905,9 +891,6 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev case rpc::Trace::EventInfo::Which::JS_RPC: eventInfo = tracing::JsRpcEventInfo(e.getJsRpc()); break; - case rpc::Trace::EventInfo::Which::CONNECT: - eventInfo = tracing::ConnectEventInfo(e.getConnect()); - break; case rpc::Trace::EventInfo::Which::SCHEDULED: eventInfo = tracing::ScheduledEventInfo(e.getScheduled()); break; @@ -1124,9 +1107,6 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { case rpc::Trace::Onset::Info::JS_RPC: { return JsRpcEventInfo(info.getJsRpc()); } - case rpc::Trace::Onset::Info::CONNECT: { - return ConnectEventInfo(info.getConnect()); - } case rpc::Trace::Onset::Info::SCHEDULED: { return ScheduledEventInfo(info.getScheduled()); } @@ -1157,9 +1137,6 @@ void writeOnsetInfo(const Onset::Info& info, rpc::Trace::Onset::Info::Builder& i KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - connect.copyTo(infoBuilder.initConnect()); - } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { jsrpc.copyTo(infoBuilder.initJsRpc()); } @@ -1312,9 +1289,6 @@ EventInfo cloneEventInfo(const EventInfo& info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - return connect.clone(); - } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.clone(); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index e6377dc6e70..75aac486182 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -362,15 +362,6 @@ struct JsRpcEventInfo final { kj::String toString() const; }; -class ConnectEventInfo { - public: - explicit ConnectEventInfo(); - explicit ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader); - - void copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const; - ConnectEventInfo clone() const; -}; - // Describes a scheduled request struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); @@ -586,7 +577,6 @@ using EventInfo = kj::OneOf; EventInfo cloneEventInfo(const EventInfo& info); diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 17fc33247a7..d83937f30a0 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -249,30 +249,6 @@ void WorkerEntrypoint::init(kj::Own worker, .attach(kj::mv(actor)); } -kj::Exception exceptionToPropagate(bool isInternalException, kj::Exception&& exception) { - if (isInternalException) { - // We've already logged it here, the only thing that matters to the client is that we failed - // due to an internal error. Note that this does not need to be labeled "remote." since jsg - // will sanitize it as an internal error. Note that we use `setDescription()` to preserve - // the exception type for `jsg::exceptionToJs(...)` downstream. - exception.setDescription(kj::str("worker_do_not_log; Request failed due to internal error")); - return kj::mv(exception); - } else { - // We do not care how many remote capnp servers this went through since we are returning - // it to the worker via jsg. - // TODO(someday) We also do this stripping when making the tunneled exception for - // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type - // instead of `loggedExceptionEarlier`. It would save use some work. - auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); - if (!description.startsWith("remote.")) { - // If we already were annotated as remote from some other worker entrypoint, no point - // adding an additional prefix. - exception.setDescription(kj::str("remote.", description)); - } - return kj::mv(exception); - } -} - kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, @@ -459,6 +435,31 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, } } + auto exceptionToPropagate = [&]() { + if (isInternalException) { + // We've already logged it here, the only thing that matters to the client is that we failed + // due to an internal error. Note that this does not need to be labeled "remote." since jsg + // will sanitize it as an internal error. Note that we use `setDescription()` to preserve + // the exception type for `jsg::exceptionToJs(...)` downstream. + exception.setDescription( + kj::str("worker_do_not_log; Request failed due to internal error")); + return kj::mv(exception); + } else { + // We do not care how many remote capnp servers this went through since we are returning + // it to the worker via jsg. + // TODO(someday) We also do this stripping when making the tunneled exception for + // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type + // instead of `loggedExceptionEarlier`. It would save use some work. + auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); + if (!description.startsWith("remote.")) { + // If we already were annotated as remote from some other worker entrypoint, no point + // adding an additional prefix. + exception.setDescription(kj::str("remote.", description)); + } + return kj::mv(exception); + } + }; + if (wrappedResponse->isSent()) { // We can't fail open if the response was already sent, so set `failOpenService` null so that // that branch isn't taken below. @@ -470,7 +471,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate(isInternalException, kj::mv(exception)); + return exceptionToPropagate(); } else KJ_IF_SOME(service, failOpenService) { // Fall back to origin. @@ -504,7 +505,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Like with the isActor check, we want to return exceptions back to the caller. // We don't want to handle this case the same as the isActor case though, since we want // fail-open to operate normally, which means this case must happen after fail-open handling. - return exceptionToPropagate(isInternalException, kj::mv(exception)); + return exceptionToPropagate(); } else { // Return error. @@ -541,17 +542,19 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "connect() can only be called once")); this->incomingRequest = kj::none; + // Whenever we implement incoming connections over the `connect` handler we need to remember to + // add tracing `onset` and `return` events using setEventInfo()/setReturn(), as with the other + // event types here. + incomingRequest->delivered(); auto& context = incomingRequest->getContext(); - auto featureFlags = context.getWorker().getIsolate().getApi().getFeatureFlags(); - if (featureFlags.getConnectPassThrough()) { - incomingRequest->delivered(); + KJ_DEFER({ + // Since we called incomingRequest->delivered, we are obliged to call `drain()`. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + }); - KJ_DEFER({ - // Since we called incomingRequest->delivered, we are obliged to call `drain()`. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - }); + if (context.getWorker().getIsolate().getApi().getFeatureFlags().getConnectPassThrough()) { // connect_pass_through feature flag means we should just forward the connect request on to // the global outbound. @@ -561,100 +564,9 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, // Note: Intentionally return without co_await so that the `incomingRequest` is destroyed, // because we don't have any need to keep the context around. return next->connect(host, headers, connection, response, settings); - } else if (!featureFlags.getWorkerdExperimental()) { - JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); - } - - // TODO(soon): Implement basic TLS support for connect handler. - JSG_REQUIRE(!settings.useTls, Error, "Incoming CONNECT with TLS not supported"); - // Capture workerTracer, see request() for rationale. - kj::Maybe workerTracer; - - bool isActor = context.getActor() != kj::none; - - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { - t.setEventInfo(*incomingRequest, tracing::ConnectEventInfo()); - workerTracer = t; } - incomingRequest->delivered(); - - auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); - - return context - .run( - [this, &headers, &context, &connection, &response, entrypointName = entrypointName, - versionInfo = kj::mv(versionInfo), host = kj::str(host)](Worker::Lock& lock) mutable { - jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); - - return lock.getGlobalScope().connect(kj::mv(host), headers, connection, response, lock, - lock.getExportedHandler( - entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor())); - }) - .then([&context, workerTracer]() { - KJ_IF_SOME(t, workerTracer) { - t.setReturn(context.now()); - } - }) - .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { - // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of - // logging internal errors to syslog. - loggedExceptionEarlier = true; - context.logUncaughtExceptionAsync(UncaughtExceptionSource::REQUEST_HANDLER, kj::cp(exception)); - - // Do not allow the exception to escape the isolate without waiting for the output gate to - // open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`. - return context.waitForOutputLocks().then( - [exception = kj::mv(exception)]() mutable -> kj::Promise { - return kj::mv(exception); - }); - }) - .attach(kj::defer([this, incomingRequest = kj::mv(incomingRequest), &context]() mutable { - // The request has been canceled, but allow it to continue executing in the background. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - })) - .catch_([this, isActor, &response, metrics = kj::mv(metricsForCatch), workerTracer]( - kj::Exception&& exception) mutable -> kj::Promise { - // Don't return errors to end user. - auto isInternalException = !jsg::isTunneledException(exception.getDescription()) && - !jsg::isDoNotLogException(exception.getDescription()); - if (!loggedExceptionEarlier) { - // This exception seems to have originated during the deferred proxy task, so it was not - // logged to the IoContext earlier. - if (exception.getType() != kj::Exception::Type::DISCONNECTED && isInternalException) { - LOG_EXCEPTION("workerEntrypoint", exception); - } else { - KJ_LOG(INFO, exception); // Run with --verbose to see exception logs. - } - } - if (isActor || tunnelExceptions) { - // We want to tunnel exceptions from actors back to the caller. - // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another - // worker, not just for actors (and W2W below), but getting that right will require cleaning - // up error handling more generally. - return exceptionToPropagate(isInternalException, kj::mv(exception)); - } else { - // Return error. - - // We're catching the exception and replacing it with 5xx, but metrics should still indicate - // an exception. - metrics->reportFailure(exception); - - kj::HttpHeaders headers(threadContext.getHeaderTable()); - if (exception.getType() == kj::Exception::Type::OVERLOADED) { - response.reject(503, "Service Unavailable", headers, static_cast(0)); - } else { - response.reject(500, "Internal Server Error", headers, static_cast(0)); - } - // TODO(o11y): Should we also indicate a return response code for TCP? - KJ_IF_SOME(t, workerTracer) { - t.setReturn(kj::none); - } - - return kj::READY_NOW; - } - }); + JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); } kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 6d70a4a90e5..05d88c0b91c 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -97,7 +97,6 @@ struct Trace @0x8e8d911203762d34 { email @16 :EmailEventInfo; trace @18 :TraceEventInfo; hibernatableWebSocket @20 :HibernatableWebSocketEventInfo; - connect @29 :ConnectEventInfo; } struct FetchEventInfo { method @0 :HttpMethod; @@ -115,9 +114,6 @@ struct Trace @0x8e8d911203762d34 { methodName @0 :Text; } - struct ConnectEventInfo { - } - struct ScheduledEventInfo { scheduledTime @0 :Float64; cron @1 :Text; @@ -274,7 +270,6 @@ struct Trace @0x8e8d911203762d34 { email @5 :EmailEventInfo; trace @6 :TraceEventInfo; hibernatableWebSocket @7 :HibernatableWebSocketEventInfo; - connect @9 :ConnectEventInfo; custom @8 :CustomEventInfo; } } diff --git a/src/workerd/server/alarm-scheduler.c++ b/src/workerd/server/alarm-scheduler.c++ index 47fd97377f1..bc5b2f384fe 100644 --- a/src/workerd/server/alarm-scheduler.c++ +++ b/src/workerd/server/alarm-scheduler.c++ @@ -28,15 +28,11 @@ std::default_random_engine makeSeededRandomEngine() { } // namespace -AlarmScheduler::AlarmScheduler(const kj::Clock& clock, - kj::Timer& timer, - const SqliteDatabase::Vfs& vfs, - kj::Path path, - GetActorFn getActor) +AlarmScheduler::AlarmScheduler( + const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path) : clock(clock), timer(timer), random(makeSeededRandomEngine()), - getActor(kj::mv(getActor)), db([&] { auto db = kj::heap(vfs, kj::mv(path), kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT); @@ -53,8 +49,10 @@ void AlarmScheduler::ensureInitialized(SqliteDatabase& db) { db.run(R"( CREATE TABLE IF NOT EXISTS _cf_ALARM ( - actor_id TEXT PRIMARY KEY, - scheduled_time INTEGER + actor_unique_key TEXT, + actor_id TEXT, + scheduled_time INTEGER, + PRIMARY KEY (actor_unique_key, actor_id) ) WITHOUT ROWID; )"); } @@ -65,22 +63,27 @@ void AlarmScheduler::loadAlarmsFromDb() { // TODO(someday): don't maintain the entire alarm set in memory -- right now for the usecase of // local development, doing so is sufficient. auto query = db->run(R"( - SELECT actor_id, scheduled_time FROM _cf_ALARM; + SELECT actor_unique_key, actor_id, scheduled_time FROM _cf_ALARM; )"); while (!query.isDone()) { - auto date = kj::UNIX_EPOCH + (kj::NANOSECONDS * query.getInt64(1)); + auto date = kj::UNIX_EPOCH + (kj::NANOSECONDS * query.getInt64(2)); - auto ownActorId = kj::str(query.getText(0)); - auto actor = kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId)); - auto& actorRef = *actor; + auto ownUniqueKey = kj::str(query.getText(0)); + auto ownActorId = kj::str(query.getText(1)); + auto actor = kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId}, + kj::mv(ownUniqueKey), kj::mv(ownActorId)); - alarms.insert(actorRef, scheduleAlarm(now, kj::mv(actor), date)); + alarms.insert(*actor, scheduleAlarm(now, kj::mv(actor), date)); query.nextRow(); } } +void AlarmScheduler::registerNamespace(kj::StringPtr uniqueKey, GetActorFn getActor) { + namespaces.insert(uniqueKey, Namespace{.getActor = kj::mv(getActor)}); +} + kj::Maybe AlarmScheduler::getAlarm(ActorKey actor) { // TODO(someday): Might be able to simplify AlarmScheduler somewhat, now that ActorSqlite no // longer relies on it for getAlarm()? @@ -100,14 +103,16 @@ kj::Maybe AlarmScheduler::getAlarm(ActorKey actor) { bool AlarmScheduler::setAlarm(ActorKey actor, kj::Date scheduledTime) { int64_t scheduledTimeNs = (scheduledTime - kj::UNIX_EPOCH) / kj::NANOSECONDS; - auto query = stmtSetAlarm.run(actor.actorId, scheduledTimeNs); + auto query = stmtSetAlarm.run(actor.uniqueKey, actor.actorId, scheduledTimeNs); bool existing = true; auto& entry = alarms.findOrCreate(actor, [&]() { existing = false; + auto ownUniqueKey = kj::str(actor.uniqueKey); auto ownActorId = kj::str(actor.actorId); - auto ownActor = kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId)); + auto ownActor = kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId}, + kj::mv(ownUniqueKey), kj::mv(ownActorId)); return decltype(alarms)::Entry{ *ownActor, scheduleAlarm(clock.now(), kj::mv(ownActor), scheduledTime)}; @@ -127,7 +132,7 @@ bool AlarmScheduler::setAlarm(ActorKey actor, kj::Date scheduledTime) { } bool AlarmScheduler::deleteAlarm(ActorKey actor) { - auto query = stmtDeleteAlarm.run(actor.actorId); + auto query = stmtDeleteAlarm.run(actor.uniqueKey, actor.actorId); KJ_IF_SOME(entry, alarms.findEntry(actor)) { KJ_IF_SOME(queued, entry.value.queuedAlarm) { @@ -150,10 +155,14 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) { kj::Promise AlarmScheduler::runAlarm( const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount) { - auto result = co_await getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount); + KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) { + auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount); - co_return RetryInfo{.retry = result.outcome != EventOutcome::OK && result.retry, - .retryCountsAgainstLimit = result.retryCountsAgainstLimit}; + co_return RetryInfo{.retry = result.outcome != EventOutcome::OK && result.retry, + .retryCountsAgainstLimit = result.retryCountsAgainstLimit}; + } else { + throw KJ_EXCEPTION(FAILED, "uniqueKey for stored alarm was not registered?"); + } } AlarmScheduler::ScheduledAlarm AlarmScheduler::scheduleAlarm( diff --git a/src/workerd/server/alarm-scheduler.h b/src/workerd/server/alarm-scheduler.h index 6b330894891..9c9dc996d1a 100644 --- a/src/workerd/server/alarm-scheduler.h +++ b/src/workerd/server/alarm-scheduler.h @@ -17,22 +17,27 @@ namespace workerd::server { +using byte = kj::byte; + struct ActorKey { + kj::StringPtr uniqueKey; kj::StringPtr actorId; bool operator==(const ActorKey& other) const { - return actorId == other.actorId; + return uniqueKey == other.uniqueKey && actorId == other.actorId; } kj::Own clone() const { + auto ownUniqueKey = kj::str(uniqueKey); auto ownActorId = kj::str(actorId); - return kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId)); + return kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId}, + kj::mv(ownUniqueKey), kj::mv(ownActorId)); } }; inline uint KJ_HASHCODE(const ActorKey& k) { - return kj::hashCode(k.actorId); + return kj::hashCode(k.uniqueKey, k.actorId); } // Allows scheduling alarm executions at specific times, returning a promise representing @@ -55,22 +60,25 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler { using GetActorFn = kj::Function(kj::String)>; - AlarmScheduler(const kj::Clock& clock, - kj::Timer& timer, - const SqliteDatabase::Vfs& vfs, - kj::Path path, - GetActorFn getActor); + AlarmScheduler( + const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path); kj::Maybe getAlarm(ActorKey actor); bool setAlarm(ActorKey actor, kj::Date scheduledTime); bool deleteAlarm(ActorKey actor); + void registerNamespace(kj::StringPtr uniqueKey, GetActorFn getActor); + private: enum class AlarmStatus { WAITING, STARTED, FINISHED }; const kj::Clock& clock; kj::Timer& timer; std::default_random_engine random; - GetActorFn getActor; + + struct Namespace { + GetActorFn getActor; + }; + kj::HashMap namespaces; kj::Own db; kj::TaskSet tasks; @@ -104,6 +112,8 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler { kj::Promise runAlarm( const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount); + void setAlarmInMemory(kj::Own actor, kj::Date scheduledTime); + ScheduledAlarm scheduleAlarm(kj::Date now, kj::Own actor, kj::Date scheduledTime); kj::Promise makeAlarmTask( @@ -112,11 +122,11 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler { kj::Promise checkTimestamp(kj::Duration delay, kj::Date scheduledTime); SqliteDatabase::Statement stmtSetAlarm = db->prepare(R"( - INSERT INTO _cf_ALARM VALUES(?, ?) + INSERT INTO _cf_ALARM VALUES(?, ?, ?) ON CONFLICT DO UPDATE SET scheduled_time = excluded.scheduled_time; )"); SqliteDatabase::Statement stmtDeleteAlarm = db->prepare(R"( - DELETE FROM _cf_ALARM WHERE actor_id = ? + DELETE FROM _cf_ALARM WHERE actor_unique_key = ? AND actor_id = ? )"); void taskFailed(kj::Exception&& exception) override; diff --git a/src/workerd/server/container-client-test.c++ b/src/workerd/server/container-client-test.c++ index c9b4e968429..122e4185c24 100644 --- a/src/workerd/server/container-client-test.c++ +++ b/src/workerd/server/container-client-test.c++ @@ -134,38 +134,5 @@ KJ_TEST("decodeJsonResponse ContainerMonitorResponse - non-zero exit") { KJ_EXPECT(root.getStatusCode() == 137); } -KJ_TEST("ContainerCreateRequest encodes structured mounts with NoCopy") { - capnp::JsonCodec codec; - codec.handleByAnnotation(); - - capnp::MallocMessageBuilder message; - auto root = message.initRoot(); - root.setImage("test-image"); - - auto mounts = root.initHostConfig().initMounts(1); - auto mount = mounts[0]; - mount.setType("volume"); - mount.setSource("snapshot-clone-volume"); - mount.setTarget("/app/data"); - mount.initVolumeOptions().setNoCopy(true); - - auto json = codec.encode(root); - auto jsonText = json.asPtr(); - - KJ_EXPECT(jsonText.contains("\"Mounts\"")); - KJ_EXPECT(jsonText.contains("\"VolumeOptions\"")); - KJ_EXPECT(jsonText.contains("\"NoCopy\":true")); - - auto decoded = decodeJsonResponse(jsonText); - auto decodedRoot = decoded->getRoot(); - auto decodedMounts = decodedRoot.getHostConfig().getMounts(); - - KJ_REQUIRE(decodedMounts.size() == 1); - KJ_EXPECT(decodedMounts[0].getType() == "volume"); - KJ_EXPECT(decodedMounts[0].getSource() == "snapshot-clone-volume"); - KJ_EXPECT(decodedMounts[0].getTarget() == "/app/data"); - KJ_EXPECT(decodedMounts[0].getVolumeOptions().getNoCopy()); -} - } // namespace } // namespace workerd::server diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index ac143ccaa46..fb6358b2c9f 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -37,7 +37,6 @@ constexpr uint16_t SIDECAR_INGRESS_PORT = 39001; constexpr uint64_t MAX_JSON_RESPONSE_SIZE = 16ULL * 1024 * 1024; constexpr kj::StringPtr SNAPSHOT_VOLUME_PREFIX = "workerd-snap-"_kj; -constexpr kj::StringPtr SNAPSHOT_CLONE_VOLUME_PREFIX = "workerd-snap-clone-"_kj; constexpr kj::StringPtr SNAPSHOT_VOLUME_CREATED_AT_LABEL = "dev.workerd.snapshot-created-at"_kj; constexpr auto SNAPSHOT_STALE_AGE = 30 * kj::DAYS; @@ -52,19 +51,32 @@ constexpr size_t MAX_TAR_CONTENT_SIZE = 8ull * 1024 * 1024 * 1024; // Ensures the stale-volume check runs at most once per process. std::atomic_bool staleSnapshotVolumeCheckScheduled = false; -// Validates an absolute path for snapshot use and returns the parsed component path. +// Strip trailing slashes from a path, preserving bare "/". +kj::String normalizePath(kj::String path) { + auto len = path.size(); + while (len > 1 && path[len - 1] == '/') { + --len; + } + if (len == path.size()) return kj::mv(path); + return kj::str(path.first(len)); +} + +// Validate an absolute path for snapshot use (both creation dir and restore mount point). // Rejects relative paths, embedded null bytes, and path traversal components (".."). -kj::Path parseAbsolutePath(kj::StringPtr path) { +// Accepts "/" as a valid path. +void validateAbsolutePath(kj::StringPtr path) { JSG_REQUIRE( path.size() > 0 && path[0] == '/', Error, "Snapshot path must be absolute, got: ", path); JSG_REQUIRE(path.findFirst('\0') == kj::none, Error, "Snapshot path must not contain null bytes"); - try { - return kj::Path::parse(path.slice(1)); - } catch (kj::Exception& e) { - JSG_FAIL_REQUIRE( - Error, "Snapshot path contains invalid components: ", path, "; ", e.getDescription()); + // "/" is valid (root). For longer paths, kj::Path::parse rejects ".." and other + // dangerous components, but throws a raw KJ exception. Wrap it for a user-friendly error. + if (path.size() > 1) { + KJ_IF_SOME(exc, kj::runCatchingExceptions([&]() { (void)kj::Path::parse(path.slice(1)); })) { + JSG_FAIL_REQUIRE( + Error, "Snapshot path contains invalid components: ", path, "; ", exc.getDescription()); + } } } @@ -411,51 +423,6 @@ kj::Promise dockerApiBinaryRequest(kj::Network& network, bodyBytes, "application/x-tar"_kj, maxResponseSize); } -kj::Promise deleteVolume(kj::Network& network, kj::String dockerPath, kj::String volumeName) { - auto response = co_await dockerApiRequest( - network, kj::mv(dockerPath), kj::HttpMethod::DELETE, kj::str("/volumes/", volumeName)); - if (response.statusCode != 204 && response.statusCode != 404) { - KJ_LOG(WARNING, "failed to delete volume", volumeName, response.statusCode, response.body); - } -} - -kj::Promise deleteVolumes( - kj::Network& network, kj::String dockerPath, kj::Array snapshotCloneVolumes) { - kj::Vector> volumeDeletes; - volumeDeletes.reserve(snapshotCloneVolumes.size()); - for (auto& volumeName: snapshotCloneVolumes) { - auto logName = kj::str(volumeName); - volumeDeletes.add(deleteVolume(network, kj::str(dockerPath), kj::mv(volumeName)) - .catch_([logName = kj::mv(logName)](kj::Exception&& e) { - KJ_LOG(WARNING, "failed to delete volume", logName, e); - })); - } - co_await kj::joinPromises(volumeDeletes.releaseAsArray()); -} - -kj::Promise removeContainer( - kj::Network& network, kj::String dockerPath, kj::String containerName, bool wait = true) { - auto endpoint = kj::str("/containers/", containerName, "?force=true"); - auto response = co_await dockerApiRequest( - network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)); - // 204 means the container was removed. - // 404 means it was already gone. - // 409 means removal is already in progress, which is fine for our teardown paths. - KJ_REQUIRE(response.statusCode == 204 || response.statusCode == 404 || response.statusCode == 409, - "Removing a container failed with: ", response.body); - - // If removal succeeded or is already in progress, wait for Docker to report the container as - // fully removed before proceeding with any follow-up cleanup like deleting mounted volumes. - if (wait && (response.statusCode == 204 || response.statusCode == 409)) { - response = co_await dockerApiRequest(network, kj::mv(dockerPath), kj::HttpMethod::POST, - kj::str("/containers/", containerName, "/wait?condition=removed")); - // 200 means Docker observed the removal. 404 means the container disappeared before the wait - // request was processed, which is also fine. - KJ_REQUIRE(response.statusCode == 200 || response.statusCode == 404, - "Waiting for container removal failed with: ", response.statusCode, response.body); - } -} - kj::String currentSnapshotVolumeTimestamp() { return kj::str((kj::systemPreciseCalendarClock().now() - kj::UNIX_EPOCH) / kj::SECONDS); } @@ -514,62 +481,8 @@ kj::Promise warnAboutStaleSnapshotVolumes(kj::Network& network, kj::String } } -// Returns the gateway IP on Linux for direct container access. -// Returns kj::none on macOS where Docker Desktop routes host-gateway to host loopback. -kj::Maybe gatewayForPlatform(kj::String gateway) { -#ifdef __APPLE__ - return kj::none; -#else - return kj::mv(gateway); -#endif -} - -kj::Maybe tryParsePublishedHostPort(capnp::json::Value::Reader portMappingValue) { - if (portMappingValue.isNull()) { - return kj::none; - } - - JSG_REQUIRE( - portMappingValue.isArray(), Error, "Malformed ContainerInspect port mapping response"); - auto bindings = portMappingValue.getArray(); - if (bindings.size() == 0) { - return kj::none; - } - - auto binding = bindings[0]; - JSG_REQUIRE(binding.isObject(), Error, "Malformed ContainerInspect port binding response"); - for (auto field: binding.getObject()) { - if (field.getName() == "HostPort") { - auto value = field.getValue(); - JSG_REQUIRE(value.isString(), Error, "Malformed ContainerInspect port binding response"); - kj::StringPtr hostPort = value.getString(); - return KJ_REQUIRE_NONNULL( - hostPort.tryParseAs(), "Malformed ContainerInspect host port"); - } - } - - KJ_FAIL_REQUIRE("Malformed ContainerInspect port binding response: missing HostPort"); -} - } // namespace -// Represents a parsed egress mapping. IP/CIDR mappings match destination IPs, -// while hostnameGlob mappings match either HTTP hostnames or TLS SNI depending on `tls`. -// Defined here (not in the header) to avoid pulling kj::OneOf, kj::CidrRange, and -// kj::Vector into server.c++ which includes container-client.h. -struct ContainerClient::EgressMapping { - kj::OneOf destination; - uint16_t port; // 0 means match all ports - bool tls; - kj::Own channel; -}; - -// Holds all egress mapping state. Stored via kj::Own in ContainerClient -// so that the EgressMapping type is not visible in container-client.h. -struct ContainerClient::EgressState { - kj::Vector mappings; -}; - ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::Timer& timer, kj::Network& network, @@ -592,8 +505,7 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, waitUntilTasks(waitUntilTasks), pendingCleanup(kj::mv(pendingCleanup).fork()), cleanupCallback(kj::mv(cleanupCallback)), - channelTokenHandler(channelTokenHandler), - egressState(kj::heap()) { + channelTokenHandler(channelTokenHandler) { if (!staleSnapshotVolumeCheckScheduled.exchange(true)) { waitUntilTasks.add(warnAboutStaleSnapshotVolumes(network, kj::str(this->dockerPath)) .catch_([](kj::Exception&& e) { @@ -606,18 +518,15 @@ ContainerClient::~ContainerClient() noexcept(false) { stopEgressListener(); // Best-effort cleanup for both containers. - auto sidecarCleanup = - removeContainer(network, kj::str(dockerPath), kj::str(sidecarContainerName), false) - .catch_([](kj::Exception&&) {}); - - // Also try to delete any cloned snapshot volumes. - auto volumes = snapshotClones.releaseAsArray(); - auto mainCleanup = removeContainer(network, kj::str(dockerPath), kj::str(containerName)) - .catch_([](kj::Exception&&) {}) - .then([&network = network, dockerPath = kj::str(dockerPath), - volumes = kj::mv(volumes)]() mutable { - return deleteVolumes(network, kj::mv(dockerPath), kj::mv(volumes)); - }).catch_([](kj::Exception&&) {}); + auto sidecarCleanup = dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, + kj::str("/containers/", sidecarContainerName, "?force=true")) + .ignoreResult() + .catch_([](kj::Exception&&) {}); + + auto mainCleanup = dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, + kj::str("/containers/", containerName, "?force=true")) + .ignoreResult() + .catch_([](kj::Exception&&) {}); // Pass the joined cleanup promise to the callback. The callback wraps it with the // canceler (so a future client creation can cancel it), stores it so the next @@ -907,6 +816,43 @@ kj::Promise ContainerClient::isDaemonIpv6Enabled() { co_return false; } +// Returns the gateway IP on Linux for direct container access. +// Returns kj::none on macOS where Docker Desktop routes host-gateway to host loopback. +static kj::Maybe gatewayForPlatform(kj::String gateway) { +#ifdef __APPLE__ + return kj::none; +#else + return kj::mv(gateway); +#endif +} + +kj::Maybe tryParsePublishedHostPort(capnp::json::Value::Reader portMappingValue) { + if (portMappingValue.isNull()) { + return kj::none; + } + + JSG_REQUIRE( + portMappingValue.isArray(), Error, "Malformed ContainerInspect port mapping response"); + auto bindings = portMappingValue.getArray(); + if (bindings.size() == 0) { + return kj::none; + } + + auto binding = bindings[0]; + JSG_REQUIRE(binding.isObject(), Error, "Malformed ContainerInspect port binding response"); + for (auto field: binding.getObject()) { + if (field.getName() == "HostPort") { + auto value = field.getValue(); + JSG_REQUIRE(value.isString(), Error, "Malformed ContainerInspect port binding response"); + kj::StringPtr hostPort = value.getString(); + return KJ_REQUIRE_NONNULL( + hostPort.tryParseAs(), "Malformed ContainerInspect host port"); + } + } + + KJ_FAIL_REQUIRE("Malformed ContainerInspect port binding response: missing HostPort"); +} + kj::Promise ContainerClient::startEgressListener( kj::String listenAddress, uint16_t port) { auto service = kj::heap(*this, headerTable); @@ -1134,7 +1080,6 @@ kj::Promise ContainerClient::updateSidecarEgressConfig( kj::Promise ContainerClient::createContainer( kj::Maybe::Reader> entrypoint, kj::Maybe::Reader> environment, - kj::ArrayPtr restoreMounts, rpc::Container::StartParams::Reader params) { capnp::JsonCodec codec; codec.handleByAnnotation(); @@ -1185,18 +1130,6 @@ kj::Promise ContainerClient::createContainer( hostConfig.setPidMode("host"); } - if (restoreMounts.size() > 0) { - auto mounts = hostConfig.initMounts(restoreMounts.size()); - for (auto i: kj::indices(restoreMounts)) { - auto mount = mounts[i]; - auto& restoreMount = restoreMounts[i]; - mount.setType("volume"); - mount.setSource(restoreMount.cloneVolume); - mount.setTarget(restoreMount.restorePath.toString(true)); - mount.initVolumeOptions().setNoCopy(true); - } - } - auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", containerName), codec.encode(jsonRoot)); @@ -1207,7 +1140,7 @@ kj::Promise ContainerClient::createContainer( constexpr auto RETRY_DELAY = 100 * kj::MILLISECONDS; for (int attempt = 0; response.statusCode == 409 && attempt < MAX_RETRIES; ++attempt) { - co_await removeContainer(network, kj::str(dockerPath), kj::str(containerName)); + co_await destroyContainer(); co_await timer.afterDelay(RETRY_DELAY); response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", containerName), codec.encode(jsonRoot)); @@ -1258,8 +1191,24 @@ kj::Promise ContainerClient::killContainer(uint32_t signal) { // No-op when the container does not exist. // Wait for the container to actually be stopped and removed when it exists. kj::Promise ContainerClient::destroyContainer() { - co_await removeContainer(network, kj::str(dockerPath), kj::str(containerName)); - co_await deleteVolumes(network, kj::str(dockerPath), snapshotClones.releaseAsArray()); + auto endpoint = kj::str("/containers/", containerName, "?force=true"); + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)); + // statusCode 204 refers to "no error" + // statusCode 404 refers to "no such container" + // statusCode 409 refers to "removal already in progress" (race between concurrent destroys) + // All of which are fine for us since we're tearing down the container anyway. + JSG_REQUIRE( + response.statusCode == 204 || response.statusCode == 404 || response.statusCode == 409, Error, + "Removing a container failed with: ", response.body); + // Do not send a wait request if container doesn't exist. This avoids sending an + // unnecessary request. + if (response.statusCode == 204 || response.statusCode == 409) { + response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", containerName, "/wait?condition=removed")); + JSG_REQUIRE(response.statusCode == 200 || response.statusCode == 404, Error, + "Waiting for container removal failed with: ", response.statusCode, response.body); + } } // Creates the sidecar container that owns the shared network namespace. @@ -1335,7 +1284,21 @@ kj::Promise ContainerClient::startSidecarContainer() { } kj::Promise ContainerClient::destroySidecarContainer() { - co_await removeContainer(network, kj::str(dockerPath), kj::str(sidecarContainerName)); + auto endpoint = kj::str("/containers/", sidecarContainerName, "?force=true"); + auto responseDestroy = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)); + // statusCode 204 refers to "no error" + // statusCode 404 refers to "no such container" + // statusCode 409 refers to "removal already in progress" (race between concurrent destroys) + // All of which are fine for us since we're tearing down the sidecar + JSG_REQUIRE(responseDestroy.statusCode == 204 || responseDestroy.statusCode == 404 || + responseDestroy.statusCode == 409, + Error, "Destroying network sidecar container failed with: ", responseDestroy.statusCode, + responseDestroy.body); + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", sidecarContainerName, "/wait?condition=removed")); + JSG_REQUIRE(response.statusCode == 200 || response.statusCode == 404, Error, + "Destroying docker network sidecar container failed: ", response.statusCode, response.body); } kj::Promise ContainerClient::createDockerVolume(kj::StringPtr volumeName) { @@ -1387,76 +1350,6 @@ kj::Promise ContainerClient::createTempContainerWithVolume( co_return kj::str(respRoot.getId()); } -kj::Promise ContainerClient::cloneSnapshot(SnapshotRestoreMount& snapshot) { - co_await createDockerVolume(snapshot.cloneVolume); - - bool cloneCommitted = false; - KJ_DEFER(if (!cloneCommitted) { - waitUntilTasks.add( - deleteDockerVolume(kj::str(snapshot.cloneVolume)).catch_([](kj::Exception&&) { - }).attach(addRef())); - }); - - capnp::JsonCodec codec; - codec.handleByAnnotation(); - capnp::MallocMessageBuilder message; - auto jsonRoot = message.initRoot(); - jsonRoot.setImage(containerEgressInterceptorImage); - jsonRoot.setEntrypoint("/bin/cp"); - - // Run `/bin/cp -a /src/. /dst/` so the clone volume gets the snapshot contents directly. - auto cmd = jsonRoot.initCmd(3); - cmd.set(0, "-a"); - cmd.set(1, "/src/."); - cmd.set(2, "/dst/"); - - auto hostConfig = jsonRoot.initHostConfig(); - auto binds = hostConfig.initBinds(2); - binds.set(0, kj::str(snapshot.sourceVolume, ":/src:ro")); - binds.set(1, kj::str(snapshot.cloneVolume, ":/dst")); - - auto createResponse = co_await dockerApiRequest(network, kj::str(dockerPath), - kj::HttpMethod::POST, kj::str("/containers/create"), codec.encode(jsonRoot)); - JSG_REQUIRE(createResponse.statusCode == 201, Error, - "Failed to create snapshot clone helper container for volume '", snapshot.sourceVolume, - "': ", createResponse.statusCode, " ", createResponse.body); - - auto createMessage = - decodeJsonResponse(createResponse.body); - auto createRoot = createMessage->getRoot(); - auto helperContainerId = kj::str(createRoot.getId()); - bool helperDeleted = false; - KJ_DEFER(if (!helperDeleted) { - waitUntilTasks.add( - deleteTempContainer(kj::str(helperContainerId)).catch_([](kj::Exception&&) { - }).attach(addRef())); - }); - - auto startResponse = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, - kj::str("/containers/", helperContainerId, "/start"), kj::str("")); - JSG_REQUIRE(startResponse.statusCode == 204, Error, - "Failed to start snapshot clone helper container '", helperContainerId, - "': ", startResponse.statusCode, " ", startResponse.body); - - auto waitResponse = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, - kj::str("/containers/", helperContainerId, "/wait?condition=not-running")); - JSG_REQUIRE(waitResponse.statusCode == 200, Error, - "Failed waiting for snapshot clone helper container '", helperContainerId, - "': ", waitResponse.statusCode, " ", waitResponse.body); - - auto waitMessage = - decodeJsonResponse(waitResponse.body); - auto waitRoot = waitMessage->getRoot(); - // A non-zero exit means the copy failed and the clone volume contents are incomplete. - JSG_REQUIRE(waitRoot.getStatusCode() == 0, Error, "Snapshot clone helper container '", - helperContainerId, "' exited with status ", waitRoot.getStatusCode()); - - co_await deleteTempContainer(kj::str(helperContainerId)); - helperDeleted = true; - cloneCommitted = true; - snapshotClones.add(kj::str(snapshot.cloneVolume)); -} - kj::Promise ContainerClient::deleteTempContainer(kj::String tempContainerId) { auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", tempContainerId, "?force=true")); @@ -1524,16 +1417,37 @@ kj::Promise ContainerClient::start(StartContext context) { internetEnabled = params.getEnableInternet(); - // If startup fails after we clone any snapshot volumes, tear down the app container first and - // then delete those clone volumes so we don't leave mounted Docker volumes behind. + co_await ensureEgressListenerStarted(); + containerSidecarStarted = false; + co_await ensureSidecarStarted(); + + caCertInjected.store(false, std::memory_order_release); + co_await createContainer(entrypoint, environment, params); + + // If anything after container creation fails (CA cert injection, snapshot + // restore, startContainer), destroy the half-created Docker container so we + // don't leave a zombie in "Created" state that would cause monitor() to hang. + // Attach addRef(*this) so the ContainerClient stays alive until the coroutine + // completes — without it, ContainerClient could be freed before destroyContainer() + // resumes after its first co_await, causing a use-after-free / segfault. KJ_DEFER(if (!containerStarted.load(std::memory_order_acquire)) { waitUntilTasks.add(destroyContainer().attach(addRef())); }); - kj::Vector restoreMounts; + bool hasTlsMappings = false; + for (auto& mapping: egressMappings) { + if (mapping.tls) { + hasTlsMappings = true; + break; + } + } + + if (hasTlsMappings) { + co_await injectCACert(); + } + if (params.hasSnapshots()) { auto snapshotList = params.getSnapshots(); - restoreMounts.reserve(snapshotList.size()); for (auto i: kj::zeroTo(snapshotList.size())) { auto entry = snapshotList[i]; auto snapshot = entry.getSnapshot(); @@ -1548,36 +1462,53 @@ kj::Promise ContainerClient::start(StartContext context) { } const auto mountPointText = entry.getMountPoint(); - auto restorePath = - parseAbsolutePath(mountPointText.size() > 0 ? mountPointText : snapshot.getDir()); + const auto restoreDir = + normalizePath(mountPointText.size() > 0 ? kj::str(mountPointText) : kj::str(dir)); + validateAbsolutePath(restoreDir); - auto sourceVolume = kj::str(SNAPSHOT_VOLUME_PREFIX, snapshotId); + auto volumeName = kj::str(SNAPSHOT_VOLUME_PREFIX, snapshotId); + // Docker auto-creates named volumes on container create, so we must + // explicitly verify the snapshot volume exists before using it. auto inspectResp = co_await dockerApiRequest( - network, kj::str(dockerPath), kj::HttpMethod::GET, kj::str("/volumes/", sourceVolume)); + network, kj::str(dockerPath), kj::HttpMethod::GET, kj::str("/volumes/", volumeName)); JSG_REQUIRE(inspectResp.statusCode == 200, Error, "Snapshot '", snapshotId, - "' not found (volume '", sourceVolume, "' does not exist)"); - - restoreMounts.add(SnapshotRestoreMount{kj::mv(restorePath), kj::mv(sourceVolume), - kj::str(SNAPSHOT_CLONE_VOLUME_PREFIX, randomUUID(kj::none))}); - } + "' not found (volume '", volumeName, "' does not exist)"); - for (auto& restoreMount: restoreMounts) { - co_await cloneSnapshot(restoreMount); - } - } + // The volume stores raw directory contents (no directory wrapper) at /mnt. + // Mount it at /mnt{restoreDir} so Docker creates the target directory hierarchy, + // then GET from the first path component to obtain a tar with the full path. + // Special case: restoreDir == "/" → mount at /mnt, GET with path=/mnt/. to + // avoid the extra mnt/ wrapper. + auto mountPath = restoreDir == "/" ? kj::str("/mnt") : kj::str("/mnt", restoreDir); - co_await ensureEgressListenerStarted(); - containerSidecarStarted.store(false, std::memory_order_release); - co_await ensureSidecarStarted(); + auto tempId = co_await createTempContainerWithVolume(volumeName, mountPath); + KJ_DEFER(waitUntilTasks.add(deleteTempContainer(kj::str(tempId)).attach(addRef()))); - caCertInjected.store(false, std::memory_order_release); - co_await createContainer(entrypoint, environment, restoreMounts.asPtr(), params); + kj::String archiveGetPath; + if (restoreDir == "/") { + archiveGetPath = kj::str("/containers/", tempId, "/archive?path=/mnt/."); + } else { + auto firstSeparator = restoreDir.slice(1).findFirst('/'); + auto archiveRoot = + firstSeparator.map([&](size_t pos) { + return kj::str(restoreDir.slice(1, pos + 1)); + }).orDefault(kj::str(restoreDir.slice(1))); + archiveGetPath = kj::str( + "/containers/", tempId, "/archive?path=/mnt/", kj::encodeUriComponent(archiveRoot)); + } - for (auto& mapping: egressState->mappings) { - if (mapping.tls) { - co_await injectCACert(); - break; + auto tarResponse = co_await dockerApiBinaryRequest(network, kj::str(dockerPath), + kj::HttpMethod::GET, kj::mv(archiveGetPath), kj::none, MAX_SNAPSHOT_TAR_SIZE); + JSG_REQUIRE(tarResponse.statusCode == 200, Error, "Failed to read snapshot '", snapshotId, + "' from volume '", volumeName, "': ", tarResponse.statusCode); + + auto putResponse = + co_await dockerApiBinaryRequest(network, kj::str(dockerPath), kj::HttpMethod::PUT, + kj::str("/containers/", containerName, "/archive?path=%2F&noOverwriteDirNonDir=true"), + kj::mv(tarResponse.body), MAX_JSON_RESPONSE_SIZE); + JSG_REQUIRE(putResponse.statusCode == 200, Error, "Failed to restore snapshot '", snapshotId, + "' to '", restoreDir, "': ", putResponse.statusCode); } } @@ -1658,7 +1589,8 @@ kj::Promise ContainerClient::snapshotDirectory(SnapshotDirectoryContext co const auto params = context.getParams(); - const auto dir = parseAbsolutePath(params.getDir()).toString(true); + const auto dir = normalizePath(kj::str(params.getDir())); + validateAbsolutePath(dir); auto name = params.hasName() && params.getName().size() > 0 ? kj::Maybe(kj::str(params.getName())) @@ -1702,7 +1634,7 @@ kj::Promise ContainerClient::snapshotDirectory(SnapshotDirectoryContext co }); // Store the contents tar in the volume via a temp container mounted at /mnt. - auto tempId = co_await createTempContainerWithVolume(volumeName, "/mnt"); + auto tempId = co_await createTempContainerWithVolume(volumeName); KJ_DEFER(waitUntilTasks.add(deleteTempContainer(kj::str(tempId)).attach(addRef()))); auto putResponse = co_await dockerApiBinaryRequest(network, kj::str(dockerPath), @@ -1740,7 +1672,7 @@ kj::Promise ContainerClient::listenTcp(ListenTcpContext context) { } void ContainerClient::upsertEgressMapping(EgressMapping mapping) { - for (auto& m: egressState->mappings) { + for (auto& m: egressMappings) { // If the mapping differs in port or needing TLS, we skip it as it's // not the same. if (m.port != mapping.port || m.tls != mapping.tls) { @@ -1767,14 +1699,14 @@ void ContainerClient::upsertEgressMapping(EgressMapping mapping) { } } - egressState->mappings.add(kj::mv(mapping)); + egressMappings.add(kj::mv(mapping)); } kj::Vector ContainerClient::getDnsAllowHostnames() const { - // result N can be at most size of egressState->mappings. + // result N can be at most size of egressMappings. kj::Vector result; - for (auto& mapping: egressState->mappings) { + for (auto& mapping: egressMappings) { KJ_SWITCH_ONEOF(mapping.destination) { KJ_CASE_ONEOF(_, kj::CidrRange) { result.add(kj::str("*")); @@ -1810,7 +1742,7 @@ kj::Maybe> ContainerClient normalizedHostname = normalizeHostname(hostnameValue); } - for (auto& mapping: egressState->mappings) { + for (auto& mapping: egressMappings) { // Mappings can differ in port, whether to do tls and the cidr/hostname. // Users can specify things like google.com:7070, or 0.0.0.0:7070. On top of that, // they might want TLS interception. diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 637a0565900..9e8e666ad2b 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -15,8 +15,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -126,19 +126,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte uint16_t ingressHostPort; }; - struct SnapshotRestoreMount { - kj::Path restorePath; - kj::String sourceVolume; - kj::String cloneVolume; - }; - kj::Promise inspectContainer(); kj::Promise updateSidecarEgressPort(uint16_t ingressHostPort, uint16_t egressPort); kj::Promise updateSidecarEgressConfig(uint16_t ingressHostPort, uint16_t egressPort); kj::Promise createContainer(kj::Maybe::Reader> entrypoint, kj::Maybe::Reader> environment, - kj::ArrayPtr restoreMounts, rpc::Container::StartParams::Reader params); kj::Promise startContainer(); kj::Promise stopContainer(); @@ -149,11 +142,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise createDockerVolume(kj::StringPtr volumeName); kj::Promise deleteDockerVolume(kj::String volumeName); kj::Promise createTempContainerWithVolume( - kj::StringPtr volumeName, kj::StringPtr mountPath); - // Creates a writable clone volume by copying an existing snapshot volume through a - // short-lived helper container. The caller mounts the returned clone into the app - // container with NoCopy=true so the restored path masks any image contents there. - kj::Promise cloneSnapshot(SnapshotRestoreMount& snapshot); + kj::StringPtr volumeName, kj::StringPtr mountPath = "/mnt"_kj); kj::Promise deleteTempContainer(kj::String tempContainerId); // Sidecar container management (for egress proxy) @@ -172,14 +161,19 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // For redeeming channel tokens received via setEgressHttp / setEgressHttps. ChannelTokenHandler& channelTokenHandler; - // Opaque implementation struct holding egress mappings. Defined in container-client.c++ to - // avoid pulling heavy types (kj::OneOf, kj::CidrRange, kj::Vector) into server.c++ which - // includes this header. - struct EgressState; - kj::Own egressState; + // Represents a parsed egress mapping. IP/CIDR mappings match destination IPs, + // while hostnameGlob mappings match either HTTP hostnames or TLS SNI depending on `tls`. + struct EgressMapping { + kj::OneOf destination; + uint16_t port; // 0 means match all ports + bool tls; + kj::Own channel; + }; - // Insert or replace an egress mapping. - struct EgressMapping; + kj::Vector egressMappings; + + // Insert or replace an egress mapping. If a mapping with the same destination, port, and TLS + // mode already exists, its channel is replaced; otherwise a new mapping is added. void upsertEgressMapping(EgressMapping mapping); kj::Vector getDnsAllowHostnames() const; @@ -203,10 +197,6 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte std::atomic_bool egressListenerStarted = false; std::atomic_bool caCertInjected = false; - // Writable clone volumes currently owned by the app container, or by an in-flight start() - // that still needs failure cleanup. - kj::Vector snapshotClones; - // CA cert read from the sidecar after it starts. kj::Maybe caCert; diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index fe8f6f1468b..b6b0db39735 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -105,18 +105,6 @@ struct Docker { # Networking configuration # networkingConfig @22 :NetworkingConfig $Json.name("NetworkingConfig"); - struct Mount { - type @0 :Text $Json.name("Type"); - source @1 :Text $Json.name("Source"); - target @2 :Text $Json.name("Target"); - readOnly @3 :Bool = false $Json.name("ReadOnly"); - volumeOptions @4 :VolumeOptions $Json.name("VolumeOptions"); - - struct VolumeOptions { - noCopy @0 :Bool = false $Json.name("NoCopy"); - } - } - struct HostConfig { # Container configuration that depends on the host binds @0 :List(Text) $Json.name("Binds"); # Volume bindings @@ -170,7 +158,6 @@ struct Docker { volumeDriver @48 :Text $Json.name("VolumeDriver"); shmSize @49 :UInt32 $Json.name("ShmSize"); extraHosts @50 :List(Text) $Json.name("ExtraHosts"); # --add-host entries in "host:ip" format - mounts @51 :List(Mount) $Json.name("Mounts"); } } diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 1edc8047138..d25a05600f8 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -1998,9 +1998,9 @@ KJ_TEST("Server: Durable Objects (in memory)") { ` throw new Error("durable ID should be type DurableObjectId, " + ` `got: ${this.id.constructor.name}`); ` } - ` if (typeof this.id.name !== "string" || this.id.name.length === 0) { - ` throw new Error("ctx.id.name should be a non-empty string for " + - ` `named DOs, got: ${JSON.stringify(this.id.name)}`); + ` if (this.id.name) { + ` throw new Error("ctx.id for Durable Object should not have a .name " + + ` `property, got: ${this.id.name}`); ` } ` } ` async fetch(request) { @@ -2047,151 +2047,6 @@ KJ_TEST("Server: Durable Objects (in memory)") { "/bar", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79: http://foo/bar 2"); } -KJ_TEST("Server: Durable Objects keep ctx.id.name undefined for unique IDs") { - TestServer test(R"(( - services = [ - ( name = "hello", - worker = ( - compatibilityDate = "2022-08-17", - modules = [ - ( name = "main.js", - esModule = - `export default { - ` async fetch(request, env) { - ` let actor = env.ns.get(env.ns.newUniqueId()) - ` return await actor.fetch(request) - ` } - `} - `export class MyActorClass { - ` constructor(state, env) { - ` this.name = state.id.name; - ` } - ` async fetch(request) { - ` return new Response(String(this.name)); - ` } - `} - ) - ], - bindings = [(name = "ns", durableObjectNamespace = "MyActorClass")], - durableObjectNamespaces = [ - ( className = "MyActorClass", - uniqueKey = "mykey", - ) - ], - durableObjectStorage = (inMemory = void) - ) - ), - ], - sockets = [ - ( name = "main", - address = "test-addr", - service = "hello" - ) - ] - ))"_kj); - - test.start(); - auto conn = test.connect("test-addr"); - conn.httpGet200("/", "undefined"); -} - -KJ_TEST("Server: Durable Objects retain ctx.id.name for short names") { - TestServer test(R"(( - services = [ - ( name = "hello", - worker = ( - compatibilityDate = "2022-08-17", - modules = [ - ( name = "main.js", - esModule = - `const name = "retained-name-123" - `export default { - ` async fetch(request, env) { - ` let actor = env.ns.get(env.ns.idFromName(name)) - ` return await actor.fetch(request) - ` } - `} - `export class MyActorClass { - ` constructor(state, env) { - ` this.name = state.id.name; - ` } - ` async fetch(request) { - ` return new Response(String(this.name)); - ` } - `} - ) - ], - bindings = [(name = "ns", durableObjectNamespace = "MyActorClass")], - durableObjectNamespaces = [ - ( className = "MyActorClass", - uniqueKey = "mykey", - ) - ], - durableObjectStorage = (inMemory = void) - ) - ), - ], - sockets = [ - ( name = "main", - address = "test-addr", - service = "hello" - ) - ] - ))"_kj); - - test.start(); - auto conn = test.connect("test-addr"); - conn.httpGet200("/", "retained-name-123"); -} - -KJ_TEST("Server: Durable Objects drop ctx.id.name for long names") { - TestServer test(R"(( - services = [ - ( name = "hello", - worker = ( - compatibilityDate = "2022-08-17", - modules = [ - ( name = "main.js", - esModule = - `export default { - ` async fetch(request, env) { - ` let actor = env.ns.get(env.ns.idFromName("a".repeat(1025))) - ` return await actor.fetch(request) - ` } - `} - `export class MyActorClass { - ` constructor(state, env) { - ` this.name = state.id.name; - ` } - ` async fetch(request) { - ` return new Response(String(this.name)); - ` } - `} - ) - ], - bindings = [(name = "ns", durableObjectNamespace = "MyActorClass")], - durableObjectNamespaces = [ - ( className = "MyActorClass", - uniqueKey = "mykey", - ) - ], - durableObjectStorage = (inMemory = void) - ) - ), - ], - sockets = [ - ( name = "main", - address = "test-addr", - service = "hello" - ) - ] - ))"_kj); - - test.start(); - auto conn = test.connect("test-addr"); - conn.httpGet200("/", "undefined"); -} - KJ_TEST("Server: Simultaneous requests to a DO that hasn't started don't cause split brain") { TestServer test(R"(( services = [ @@ -2392,13 +2247,12 @@ KJ_TEST("Server: Durable Objects (on disk)") { conn.httpGet200("/bar", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79: http://foo/bar 2"); - // The storage directory contains .sqlite and .sqlite-wal files for both objects, plus the - // per-namespace metadata.sqlite (alarm scheduler) and its WAL file. Note that the `-shm` - // files are missing because SQLite doesn't actually tell the VFS to create these as separate - // files, it leaves it up to the VFS to decide how shared memory works, and our KJ-wrapping - // VFS currently doesn't put this in SHM files. If we were using a real disk directory, - // though, they would be there. - KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 6); + // The storage directory contains .sqlite and .sqlite-wal files for both objects. Note that + // the `-shm` files are missing because SQLite doesn't actually tell the VFS to create these + // as separate files, it leaves it up to the VFS to decide how shared memory works, and our + // KJ-wrapping VFS currently doesn't put this in SHM files. If we were using a real disk + // directory, though, they would be there. + KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 4); KJ_EXPECT(dir->exists(kj::Path( {"mykey", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79.sqlite"}))); KJ_EXPECT(dir->exists(kj::Path( @@ -2407,12 +2261,10 @@ KJ_TEST("Server: Durable Objects (on disk)") { {"mykey", "59002eb8cf872e541722977a258a12d6a93bbe8192b502e1c0cb250aa91af234.sqlite"}))); KJ_EXPECT(dir->exists(kj::Path( {"mykey", "59002eb8cf872e541722977a258a12d6a93bbe8192b502e1c0cb250aa91af234.sqlite-wal"}))); - KJ_EXPECT(dir->exists(kj::Path({"mykey", "metadata.sqlite"}))); - KJ_EXPECT(dir->exists(kj::Path({"mykey", "metadata.sqlite-wal"}))); } // Having torn everything down, the WAL files should be gone. - KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 3); + KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 2); KJ_EXPECT(dir->exists(kj::Path( {"mykey", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79.sqlite"}))); KJ_EXPECT(dir->exists(kj::Path( @@ -2438,102 +2290,6 @@ KJ_TEST("Server: Durable Objects (on disk)") { } } -KJ_TEST("Server: Durable Object alarm persistence (on disk)") { - kj::StringPtr config = R"(( - services = [ - ( name = "hello", - worker = ( - compatibilityDate = "2024-01-01", - modules = [ - ( name = "main.js", - esModule = - `export default { - ` async fetch(request, env) { - ` let id = env.ns.idFromName("alarm-actor") - ` let actor = env.ns.get(id) - ` return await actor.fetch(request) - ` } - `} - `export class MyActorClass { - ` constructor(state, env) { - ` this.storage = state.storage; - ` } - ` async fetch(request) { - ` let url = new URL(request.url); - ` if (url.pathname === "/set") { - ` let time = parseInt(url.searchParams.get("t")); - ` await this.storage.setAlarm(time); - ` return new Response("alarm set to " + time); - ` } else if (url.pathname === "/get") { - ` let alarm = await this.storage.getAlarm(); - ` return new Response("alarm=" + alarm); - ` } else { - ` return new Response("unknown path", {status: 404}); - ` } - ` } - ` async alarm() {} - `} - ) - ], - bindings = [(name = "ns", durableObjectNamespace = "MyActorClass")], - durableObjectNamespaces = [ - ( className = "MyActorClass", - uniqueKey = "alarmkey", - ) - ], - durableObjectStorage = (localDisk = "my-disk") - ) - ), - ( name = "my-disk", - disk = ( - path = "../../var/do-storage", - writable = true, - ) - ), - ], - sockets = [ - ( name = "main", - address = "test-addr", - service = "hello" - ) - ] - ))"_kj; - - auto dir = kj::newInMemoryDirectory(kj::nullClock()); - - // A far-future alarm time (won't fire during the test). - kj::StringPtr alarmTime = "4102444800000"; - - { - TestServer test(config); - test.root->transfer(kj::Path({"var"_kj, "do-storage"_kj}), - kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT, *dir, nullptr, - kj::TransferMode::LINK); - - test.start(); - auto conn = test.connect("test-addr"); - - conn.httpGet200(kj::str("/set?t=", alarmTime), kj::str("alarm set to ", alarmTime)); - conn.httpGet200("/get", kj::str("alarm=", alarmTime)); - } - - // Verify metadata.sqlite exists on disk in the namespace directory. - KJ_EXPECT(dir->exists(kj::Path({"alarmkey", "metadata.sqlite"}))); - - // Start a new server and verify the alarm is still there. - { - TestServer test(config); - test.root->transfer(kj::Path({"var"_kj, "do-storage"_kj}), - kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT, *dir, nullptr, - kj::TransferMode::LINK); - - test.start(); - auto conn = test.connect("test-addr"); - - conn.httpGet200("/get", kj::str("alarm=", alarmTime)); - } -} - KJ_TEST("Server: Ephemeral Objects") { TestServer test(R"(( services = [ @@ -5218,11 +4974,9 @@ KJ_TEST("Server: Durable Object facets") { kj::Path({"3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a.5.sqlite"}))); // We didn't create any other durable objects in the namespace. All files in the namespace should - // be prefixed with our one DO ID, except for metadata.sqlite (the per-namespace alarm scheduler). + // be prefixed with our one DO ID. for (auto& name: nsDir->listNames()) { - KJ_EXPECT( - name.startsWith("3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a.") || - name.startsWith("metadata.sqlite"), + KJ_EXPECT(name.startsWith("3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a."), "unexpected file found in namespace storage", name); } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index c1c9581b566..608c3a9c72d 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4,7 +4,6 @@ #include "server.h" -#include "alarm-scheduler.h" #include "container-client.h" #include "pyodide.h" #include "workerd-api.h" @@ -34,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -1894,6 +1892,7 @@ class Server::WorkerService final: public Service, kj::Array> actorClass; kj::Maybe> cache; kj::Maybe actorStorage; + AlarmScheduler& alarmScheduler; kj::Array> tails; kj::Array> streamingTails; kj::Array> workerLoaders; @@ -1950,9 +1949,8 @@ class Server::WorkerService final: public Service, auto actorClass = kj::refcounted(*this, entry.key, Frankenvalue()); auto ns = kj::heap(kj::mv(actorClass), entry.value, - kj::systemPreciseCalendarClock(), threadContext.getUnsafeTimer(), - threadContext.getByteStreamFactory(), channelTokenHandler, network, dockerPath, - containerEgressInterceptorImage, waitUntilTasks); + threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), channelTokenHandler, + network, dockerPath, containerEgressInterceptorImage, waitUntilTasks); actorNamespaces.insert(entry.key, kj::mv(ns)); } } @@ -2056,7 +2054,7 @@ class Server::WorkerService final: public Service, auto linked = callback(*this, errorReporter); for (auto& ns: actorNamespaces) { - ns.value->link(linked.actorStorage); + ns.value->link(linked.actorStorage, linked.alarmScheduler); } ioChannels = kj::mv(linked); @@ -2216,7 +2214,6 @@ class Server::WorkerService final: public Service, public: ActorNamespace(kj::Own actorClass, const ActorConfig& config, - const kj::Clock& clock, kj::Timer& timer, capnp::ByteStreamFactory& byteStreamFactory, ChannelTokenHandler& channelTokenHandler, @@ -2226,7 +2223,6 @@ class Server::WorkerService final: public Service, kj::TaskSet& waitUntilTasks) : actorClass(kj::mv(actorClass)), config(config), - clock(clock), timer(timer), byteStreamFactory(byteStreamFactory), channelTokenHandler(channelTokenHandler), @@ -2235,40 +2231,18 @@ class Server::WorkerService final: public Service, containerEgressInterceptorImage(containerEgressInterceptorImage), waitUntilTasks(waitUntilTasks) {} - void link(kj::Maybe serviceActorStorage) { + // Called at link time to provide needed resources. + void link(kj::Maybe serviceActorStorage, + kj::Maybe alarmScheduler) { KJ_IF_SOME(dir, serviceActorStorage) { KJ_IF_SOME(d, config.tryGet()) { + // Create a subdirectory for this namespace based on the unique key. this->actorStorage.emplace(dir.openSubdir( kj::Path({d.uniqueKey}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY)); } } - KJ_IF_SOME(d, config.tryGet()) { - auto idFactory = kj::heap(d.uniqueKey); - AlarmScheduler::GetActorFn getActor = - [this, idFactory = kj::mv(idFactory)]( - kj::String idStr) mutable -> kj::Own { - Worker::Actor::Id id = idFactory->idFromString(kj::mv(idStr)); - auto actorContainer = this->getActorContainer(kj::mv(id)); - return newPromisedWorkerInterface(actorContainer->startRequest({})); - }; - - KJ_IF_SOME(as, this->actorStorage) { - // Create per-namespace alarm scheduler backed by on-disk storage in the - // namespace directory, alongside the per-actor .sqlite files. - this->ownAlarmScheduler = kj::heap( - clock, timer, as.vfs, kj::Path({"metadata.sqlite"}), kj::mv(getActor)); - } else { - // No on-disk storage -- create an in-memory alarm scheduler. - auto memDir = kj::newInMemoryDirectory(clock); - auto vfs = kj::heap(*memDir); - this->ownAlarmScheduler = kj::heap( - clock, timer, *vfs, kj::Path({"metadata.sqlite"}), kj::mv(getActor)) - .attach(kj::mv(vfs), kj::mv(memDir)); - } - - this->alarmScheduler = *KJ_ASSERT_NONNULL(ownAlarmScheduler); - } + this->alarmScheduler = alarmScheduler; } const ActorConfig& getConfig() { @@ -2277,14 +2251,11 @@ class Server::WorkerService final: public Service, kj::Own getActorChannel(Worker::Actor::Id id) { KJ_IF_SOME(doId, id.tryGet>()) { - KJ_IF_SOME(name, doId->getName()) { - // To emulate production, we preserve the name on the id, but only if it's <= 1024 bytes. - if (name.size() > 1024) { - auto* idImpl = dynamic_cast(doId.get()); - KJ_ASSERT(idImpl != nullptr, "Unexpected ActorId type?"); - idImpl->clearName(); - } - } + // To emulate production, we have to recreate this ID. + ActorIdFactoryImpl::ActorIdImpl* idImpl = + dynamic_cast(doId.get()); + KJ_ASSERT(idImpl != nullptr, "Unexpected ActorId type?"); + idImpl->clearName(); } return kj::refcounted(getActorContainer(kj::mv(id))); @@ -2772,7 +2743,8 @@ class Server::WorkerService final: public Service, kj::Own sqliteHooks; if (parent == kj::none) { KJ_IF_SOME(a, ns.alarmScheduler) { - sqliteHooks = kj::heap(a, ActorKey{.actorId = key}); + sqliteHooks = kj::heap( + a, ActorKey{.uniqueKey = d.uniqueKey, .actorId = key}); } else { // No alarm scheduler available, use default hooks instance. sqliteHooks = fakeOwn(ActorSqlite::Hooks::getDefaultHooks()); @@ -2976,7 +2948,6 @@ class Server::WorkerService final: public Service, private: kj::Own actorClass; const ActorConfig& config; - const kj::Clock& clock; struct ActorStorage { kj::Own directory; @@ -2987,10 +2958,9 @@ class Server::WorkerService final: public Service, vfs(*directory) {} }; - // Note: The Vfs, actorStorage, and ownAlarmScheduler must not be torn down until all actors - // have been torn down, so we declare them before `actors`. + // Note: The Vfs must not be torn down until all actors have been torn down, so we have to + // declare `actorStorage` before `actors`. kj::Maybe actorStorage; - kj::Maybe> ownAlarmScheduler; // Tracks the canceler and cleanup promise for a Docker container's lifecycle cleanup. // Useful to await on async calls of a ContainerClient destructor when the new @@ -4703,7 +4673,7 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr auto linkCallback = [this, def = kj::mv(def), totalActorChannels](WorkerService& workerService, Worker::ValidationErrorReporter& errorReporter) mutable { - WorkerService::LinkedIoChannels result; + WorkerService::LinkedIoChannels result{.alarmScheduler = *alarmScheduler}; auto entrypointNames = workerService.getEntrypointNames(); auto actorClassNames = workerService.getActorClassNames(); @@ -4808,6 +4778,24 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr } } + kj::HashMap durableNamespacesByUniqueKey; + for (auto& [className, ns]: workerService.getActorNamespaces()) { + KJ_IF_SOME(config, ns->getConfig().tryGet()) { + auto& actorNs = + ns; // clangd gets confused trying to use ns directly in the capture below?? + + auto idFactory = kj::heap(config.uniqueKey); + + alarmScheduler->registerNamespace(config.uniqueKey, + [&actorNs, idFactory = kj::mv(idFactory)]( + kj::String idStr) mutable -> kj::Own { + Worker::Actor::Id id = idFactory->idFromString(kj::mv(idStr)); + auto actorContainer = actorNs->getActorContainer(kj::mv(id)); + return newPromisedWorkerInterface(actorContainer->startRequest({})); + }); + } + } + result.tails = KJ_MAP(tail, def.tails) { return kj::mv(tail).lookup(*this); }; result.streamingTails = KJ_MAP(tail, def.streamingTails) { return kj::mv(tail).lookup(*this); }; @@ -5350,20 +5338,17 @@ class Server::HttpListener final: public kj::Refcounted { kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override { - TRACE_EVENT("workerd", "Connection:connect()"); KJ_IF_SOME(h, parent.rewriter->getCapnpConnectHost()) { if (h == host) { // Client is requesting to open a capnp session! response.accept(200, "OK", kj::HttpHeaders(parent.headerTable)); - co_return co_await parent.acceptCapnpConnection(connection); + return parent.acceptCapnpConnection(connection); } } - IoChannelFactory::SubrequestMetadata metadata; - metadata.cfBlobJson = mapCopyString(cfBlobJson); - - auto worker = parent.service->startRequest(kj::mv(metadata)); - co_return co_await worker->connect(host, headers, connection, response, kj::mv(settings)); + // TODO(someday): Deliver connect() event to to worker? For now we call the default + // implementation which throws an exception. + return kj::HttpService::connect(host, headers, connection, response, kj::mv(settings)); } // --------------------------------------------------------------------------- @@ -5383,57 +5368,6 @@ class Server::HttpListener final: public kj::Refcounted { }; }; -class Server::TcpListener final: public kj::Refcounted { - public: - TcpListener(Server& owner, - kj::Own listener, - kj::Own service, - kj::HttpHeaderTable& headerTable, - kj::StringPtr addrStr) - : owner(owner), - listener(kj::mv(listener)), - service(kj::mv(service)), - headerTable(headerTable), - addrStr(addrStr) {} - - kj::Promise run() { - TRACE_EVENT("workerd", "TcpListener::run"); - for (;;) { - kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); - TRACE_EVENT("workerd", "TcpListener handle connection"); - - IoChannelFactory::SubrequestMetadata metadata; - auto req = service->startRequest(kj::mv(metadata)); - auto response = kj::heap(); - kj::HttpHeaders headers(headerTable); - owner.tasks.add(req->connect(addrStr, headers, *stream.stream, *response, {}) - .attach(kj::mv(stream.stream), kj::mv(response)) - .attach(kj::mv(req))); - } - } - - private: - Server& owner; - kj::Own listener; - kj::Own service; - kj::HttpHeaderTable& headerTable; - kj::StringPtr addrStr; - - struct ResponseWrapper final: public kj::HttpService::ConnectResponse { - void accept( - uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers) override { - // Ok.. we're accepting the connection... anything to do? - } - kj::Own reject(uint statusCode, - kj::StringPtr statusText, - const kj::HttpHeaders& headers, - kj::Maybe expectedBodySize = kj::none) override { - // Doh... we're rejecting the connection... anything to do? - return newNullOutputStream(); - } - }; -}; - kj::Promise Server::listenHttp(kj::Own listener, kj::Own service, kj::StringPtr physicalProtocol, @@ -5444,13 +5378,6 @@ kj::Promise Server::listenHttp(kj::Own listener, co_return co_await obj->run(); } -kj::Promise Server::listenTcp( - kj::Own listener, kj::Own service, kj::StringPtr addrStr) { - auto obj = kj::refcounted( - *this, kj::mv(listener), kj::mv(service), globalContext->headerTable, addrStr); - co_return co_await obj->run(); -} - // ======================================================================================= // Debug port for exposing all services via RPC @@ -5646,6 +5573,17 @@ kj::Promise Server::run( co_return co_await listenPromise.exclusiveJoin(kj::mv(fatalPromise)); } +void Server::startAlarmScheduler(config::Config::Reader config) { + auto& clock = kj::systemPreciseCalendarClock(); + auto dir = kj::newInMemoryDirectory(clock); + auto vfs = kj::heap(*dir).attach(kj::mv(dir)); + + // TODO(someday): support persistent storage for alarms + + alarmScheduler = + kj::heap(clock, timer, *vfs, kj::Path({"alarms.sqlite"})).attach(kj::mv(vfs)); +} + // Configure and start the inspector socket, returning the port the socket started on. uint startInspector( kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar) { @@ -5844,6 +5782,9 @@ kj::Promise Server::startServices(jsg::V8System& v8System, return decltype(services)::Entry{kj::str("internet"_kj), kj::mv(service)}; }); + // Start the alarm scheduler before linking services + startAlarmScheduler(config); + // Third pass: Cross-link services. for (auto& service: services) { ConfigErrorReporter errorReporter(*this, service.key); @@ -5851,39 +5792,6 @@ kj::Promise Server::startServices(jsg::V8System& v8System, } } -kj::Maybe Server::parseSocketType( - config::Socket::Reader sock, kj::StringPtr name) { - switch (sock.which()) { - case config::Socket::HTTP: { - SocketTypeConfig result; - result.defaultPort = 80; - result.httpOptions = sock.getHttp(); - result.physicalProtocol = "http"; - return kj::mv(result); - } - case config::Socket::HTTPS: { - auto https = sock.getHttps(); - SocketTypeConfig result; - result.defaultPort = 443; - result.httpOptions = https.getOptions(); - result.tls = makeTlsContext(https.getTlsOptions()); - result.physicalProtocol = "https"; - return kj::mv(result); - } - case config::Socket::TCP: { - auto tcp = sock.getTcp(); - SocketTypeConfig result; - if (tcp.hasTlsOptions()) { - result.tls = makeTlsContext(tcp.getTlsOptions()); - } - return kj::mv(result); - } - } - reportConfigError(kj::str("Encountered unknown socket type in \"", name, - "\". Was the config compiled with a newer version of the schema?")); - return kj::none; -} - kj::Promise Server::listenOnSockets(config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen, @@ -5920,10 +5828,31 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, continue; } - auto maybeSocketConfig = parseSocketType(sock, name); - if (maybeSocketConfig == kj::none) continue; - auto& socketConfig = KJ_ASSERT_NONNULL(maybeSocketConfig); + uint defaultPort = 0; + config::HttpOptions::Reader httpOptions; + kj::Maybe> tls; + kj::StringPtr physicalProtocol; + switch (sock.which()) { + case config::Socket::HTTP: + defaultPort = 80; + httpOptions = sock.getHttp(); + physicalProtocol = "http"; + goto validSocket; + case config::Socket::HTTPS: { + auto https = sock.getHttps(); + defaultPort = 443; + httpOptions = https.getOptions(); + tls = makeTlsContext(https.getTlsOptions()); + physicalProtocol = "https"; + goto validSocket; + } + } + reportConfigError(kj::str("Encountered unknown socket type in \"", name, + "\". Was the config compiled with a " + "newer version of the schema?")); + continue; + validSocket: using PromisedReceived = kj::Promise>; PromisedReceived listener = nullptr; KJ_IF_SOME(l, listenerOverride) { @@ -5932,10 +5861,10 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, listener = ([](kj::Promise> promise) -> PromisedReceived { auto parsed = co_await promise; co_return parsed->listen(); - })(network.parseAddress(addrStr, socketConfig.defaultPort)); + })(network.parseAddress(addrStr, defaultPort)); } - KJ_IF_SOME(t, socketConfig.tls) { + KJ_IF_SOME(t, tls) { listener = ([](kj::Promise> promise, kj::Own tls) -> PromisedReceived { auto port = co_await promise; @@ -5945,19 +5874,12 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, // Need to create rewriter before waiting on anything since `headerTableBuilder` will no longer // be available later. - auto rewriter = kj::heap(socketConfig.httpOptions, headerTableBuilder); + auto rewriter = kj::heap(httpOptions, headerTableBuilder); auto handle = kj::coCapture( - [this, service = kj::mv(service), rewriter = kj::mv(rewriter), - physicalProtocol = socketConfig.physicalProtocol, name, - isHttp = sock.which() != config::Socket::TCP, addrStr]( + [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name]( kj::Promise> promise) mutable -> kj::Promise { - if (isHttp) { - TRACE_EVENT("workerd", "setup listenHttp"); - } else { - TRACE_EVENT("workerd", "setup listenTcp"); - } - + TRACE_EVENT("workerd", "setup listenHttp"); auto listener = co_await promise; KJ_IF_SOME(stream, controlOverride) { auto message = kj::str("{\"event\":\"listen\",\"socket\":\"", name, @@ -5968,12 +5890,7 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, KJ_LOG(ERROR, e); } } - - if (isHttp) { - co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); - } else { - co_await listenTcp(kj::mv(listener), kj::mv(service), addrStr); - } + co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); }); tasks.add(handle(kj::mv(listener)).exclusiveJoin(forkedDrainWhen.addBranch())); } diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 7f17abdd66f..6162b1107f8 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -195,6 +196,9 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::Own> fatalFulfiller; + // Initialized in startAlarmScheduler(). + kj::Own alarmScheduler; + // An HttpServer object maintained in a linked list. struct ListedHttpServer { Server& owner; @@ -286,9 +290,6 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::StringPtr physicalProtocol, kj::Own rewriter); - kj::Promise listenTcp( - kj::Own listener, kj::Own service, kj::StringPtr addrStr); - kj::Promise listenDebugPort(kj::Own listener); class InvalidConfigService; @@ -301,7 +302,6 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl class WorkerEntrypointService; class WorkerdBootstrapImpl; class HttpListener; - class TcpListener; class DebugPortListener; struct ErrorReporter; @@ -318,21 +318,14 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen); + // Must be called after startServices! + void startAlarmScheduler(config::Config::Reader config); + kj::Promise listenOnSockets(config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen, bool forTest = false); - // Parsed socket protocol/TLS config. Extracted from the switch in listenOnSockets() to avoid - // goto-over-initialization inside a coroutine, which triggers a clang optimizer crash. - struct SocketTypeConfig { - uint defaultPort = 0; - config::HttpOptions::Reader httpOptions; - kj::Maybe> tls; - kj::StringPtr physicalProtocol; - }; - kj::Maybe parseSocketType(config::Socket::Reader sock, kj::StringPtr name); - void unlinkWorkerLoaders(); kj::Promise preloadPython( diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index bc2f5472a6f..782b863fefc 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -1201,7 +1201,7 @@ export class DurableObjectExample extends DurableObject { await monitor2; } - async testSnapshotOverlappingMounts() { + async testSnapshotRestoreToRoot() { const container = this.ctx.container; if (container.running) { const monitor = container.monitor().catch((_err) => {}); @@ -1209,240 +1209,44 @@ export class DurableObjectExample extends DurableObject { await monitor; } + assert.strictEqual(container.running, false); + container.start({ enableInternet: true }); const monitor = container.monitor().catch((_err) => {}); await this.waitUntilContainerIsHealthy(); - await container - .getTcpPort(8080) - .fetch('http://foo/write-file?path=/tmp/parent-src/root.txt', { - method: 'POST', - body: 'parent-root', - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - }); - await container - .getTcpPort(8080) - .fetch( - 'http://foo/write-file?path=/tmp/parent-src/child/from-parent.txt', - { - method: 'POST', - body: 'masked-by-child-mount', - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - } - ); - await container + const writeResp = await container .getTcpPort(8080) - .fetch('http://foo/write-file?path=/tmp/child-src/from-child.txt', { + .fetch('http://foo/write-file?path=/app/data/root-test.txt', { method: 'POST', - body: 'child-wins', + body: 'restore-to-root', signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), }); + assert.equal(writeResp.status, 200); - const parentSnapshot = await container.snapshotDirectory({ - dir: '/tmp/parent-src', - }); - const childSnapshot = await container.snapshotDirectory({ - dir: '/tmp/child-src', - }); + const snapshot = await container.snapshotDirectory({ dir: '/app/data' }); await container.destroy(); await monitor; - const assertRestoredTree = async () => { - const rootResp = await container - .getTcpPort(8080) - .fetch('http://foo/read-file?path=/tmp/restored/root.txt', { - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - }); - assert.equal(rootResp.status, 200); - assert.strictEqual(await rootResp.text(), 'parent-root'); - - const childResp = await container - .getTcpPort(8080) - .fetch('http://foo/read-file?path=/tmp/restored/child/from-child.txt', { - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - }); - assert.equal(childResp.status, 200); - assert.strictEqual(await childResp.text(), 'child-wins'); - - const maskedResp = await container - .getTcpPort(8080) - .fetch( - 'http://foo/read-file?path=/tmp/restored/child/from-parent.txt', - { - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - } - ); - assert.equal(maskedResp.status, 404); - }; - - const startAndAssert = async (snapshots) => { - container.start({ enableInternet: true, snapshots }); - const restoreMonitor = container.monitor().catch((_err) => {}); - await this.waitUntilContainerIsHealthy(); - await assertRestoredTree(); - await container.destroy(); - await restoreMonitor; - }; - - await startAndAssert([ - { snapshot: childSnapshot, mountPoint: '/tmp/restored/child' }, - { snapshot: parentSnapshot, mountPoint: '/tmp/restored' }, - ]); - - await startAndAssert([ - { snapshot: parentSnapshot, mountPoint: '/tmp/restored' }, - { snapshot: childSnapshot, mountPoint: '/tmp/restored/child' }, - ]); - } - - async testSnapshotDuplicateRestoreDirsRejected() { - const container = this.ctx.container; - if (container.running) { - const monitor = container.monitor().catch((_err) => {}); - await container.destroy(); - await monitor; - } - - container.start({ enableInternet: true }); - const monitor = container.monitor().catch((_err) => {}); + // Restoring to "/" places snapshot contents directly at the filesystem root + container.start({ + enableInternet: true, + snapshots: [{ snapshot, mountPoint: '/' }], + }); + const monitor2 = container.monitor().catch((_err) => {}); await this.waitUntilContainerIsHealthy(); - await container - .getTcpPort(8080) - .fetch('http://foo/write-file?path=/tmp/dup-a/file-a.txt', { - method: 'POST', - body: 'dup-a', - signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), - }); - await container + const readResp = await container .getTcpPort(8080) - .fetch('http://foo/write-file?path=/tmp/dup-b/file-b.txt', { - method: 'POST', - body: 'dup-b', + .fetch('http://foo/read-file?path=/root-test.txt', { signal: AbortSignal.timeout(DEFAULT_TIMEOUT_DURATION), }); - - const firstSnapshot = await container.snapshotDirectory({ - dir: '/tmp/dup-a', - }); - const secondSnapshot = await container.snapshotDirectory({ - dir: '/tmp/dup-b', - }); + assert.equal(readResp.status, 200); + assert.strictEqual(await readResp.text(), 'restore-to-root'); await container.destroy(); - await monitor; - - await assert.rejects( - () => - new Promise((resolve, reject) => { - try { - container.start({ - enableInternet: true, - snapshots: [ - { snapshot: firstSnapshot, mountPoint: '/tmp/duplicate' }, - { snapshot: secondSnapshot, mountPoint: '/tmp/duplicate/' }, - ], - }); - } catch (err) { - return reject(err); - } - container.monitor().then(resolve).catch(reject); - }), - (err) => { - assert.strictEqual(err.message, 'Container failed to start'); - return true; - } - ); - - assert.strictEqual(container.running, false); - } - - async testSnapshotRestoreToRoot() { - const container = this.ctx.container; - if (container.running) { - const monitor = container.monitor().catch((_err) => {}); - await container.destroy(); - await monitor; - } - - assert.strictEqual(container.running, false); - - const fakeSnapshot = { - id: '01234567-89ab-cdef-0123-456789abcdef', - size: 1024, - dir: '/app/data', - }; - - assert.throws( - () => - container.start({ - enableInternet: true, - snapshots: [{ snapshot: fakeSnapshot, mountPoint: '/' }], - }), - { message: /Directory snapshot cannot be restored to root directory\./ } - ); - - assert.strictEqual(container.running, false); - } - - async testSnapshotRestoreImplicitRootRejected() { - const container = this.ctx.container; - if (container.running) { - const monitor = container.monitor().catch((_err) => {}); - await container.destroy(); - await monitor; - } - - assert.strictEqual(container.running, false); - - const fakeSnapshot = { - id: '11111111-2222-3333-4444-555555555555', - size: 1024, - dir: '/', - }; - - assert.throws( - () => - container.start({ - enableInternet: true, - snapshots: [{ snapshot: fakeSnapshot }], - }), - { message: /Directory snapshot cannot be restored to root directory\./ } - ); - - assert.strictEqual(container.running, false); - } - - async testSnapshotRestoreRelativeMountPointRejected() { - const container = this.ctx.container; - if (container.running) { - const monitor = container.monitor().catch((_err) => {}); - await container.destroy(); - await monitor; - } - - assert.strictEqual(container.running, false); - - const fakeSnapshot = { - id: 'aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee', - size: 1024, - dir: '/app/data', - }; - - assert.throws( - () => - container.start({ - enableInternet: true, - snapshots: [{ snapshot: fakeSnapshot, mountPoint: 'tmp/restored' }], - }), - { - message: - /Directory snapshot restore path must be absolute\. Got: tmp\/restored/, - } - ); - - assert.strictEqual(container.running, false); + await monitor2; } async testSnapshotStoppedContainer() { @@ -1904,7 +1708,7 @@ export const testSnapshotCustomMountPoint = { }, }; -// Test that start() rejects an explicit root restore mount point. +// Test restoring a snapshot to / (root), exercising the restoreDir == "/" special case export const testSnapshotRestoreToRoot = { async test(_ctrl, env) { const id = env.MY_CONTAINER.idFromName( @@ -1915,52 +1719,6 @@ export const testSnapshotRestoreToRoot = { }, }; -// Test that overlapping restore paths work regardless of the user-supplied mount order. -export const testSnapshotOverlappingMounts = { - async test(_ctrl, env) { - const id = env.MY_CONTAINER.idFromName( - getRandomDurableObjectName('testSnapshotOverlappingMounts') - ); - const stub = env.MY_CONTAINER.get(id); - await stub.testSnapshotOverlappingMounts(); - }, -}; - -// Test that duplicate effective restore paths are rejected after normalization. -export const testSnapshotDuplicateRestoreDirsRejected = { - async test(_ctrl, env) { - const id = env.MY_CONTAINER.idFromName( - getRandomDurableObjectName('testSnapshotDuplicateRestoreDirsRejected') - ); - const stub = env.MY_CONTAINER.get(id); - await stub.testSnapshotDuplicateRestoreDirsRejected(); - }, -}; - -// Test that start() also rejects implicit root restore via snapshot.dir. -export const testSnapshotRestoreImplicitRootRejected = { - async test(_ctrl, env) { - const id = env.MY_CONTAINER.idFromName( - getRandomDurableObjectName('testSnapshotRestoreImplicitRootRejected') - ); - const stub = env.MY_CONTAINER.get(id); - await stub.testSnapshotRestoreImplicitRootRejected(); - }, -}; - -// Test that start() rejects relative restore mount points at the API boundary. -export const testSnapshotRestoreRelativeMountPointRejected = { - async test(_ctrl, env) { - const id = env.MY_CONTAINER.idFromName( - getRandomDurableObjectName( - 'testSnapshotRestoreRelativeMountPointRejected' - ) - ); - const stub = env.MY_CONTAINER.get(id); - await stub.testSnapshotRestoreRelativeMountPointRejected(); - }, -}; - // Test that snapshotDirectory() on a stopped container gives a clear error export const testSnapshotStoppedContainer = { async test(_ctrl, env) { diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 9be9b11854b..9e1cb23b62a 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -143,11 +143,8 @@ struct Socket { options @3 :HttpOptions; tlsOptions @4 :TlsOptions; } - tcp :group { - tlsOptions @6 :TlsOptions; - } - # TODO(someday): TCP proxy, SMTP, Cap'n Proto, ... + # TODO(someday): TCP, TCP proxy, SMTP, Cap'n Proto, ... } service @5 :ServiceDesignator; diff --git a/types/defines/rpc.d.ts b/types/defines/rpc.d.ts index ae73fc15b97..fe4b87d5378 100644 --- a/types/defines/rpc.d.ts +++ b/types/defines/rpc.d.ts @@ -239,7 +239,6 @@ declare namespace CloudflareWorkersModule { email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -261,7 +260,6 @@ declare namespace CloudflareWorkersModule { alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer diff --git a/types/defines/trace.d.ts b/types/defines/trace.d.ts index f316c6a3dee..79ce6fd5e40 100644 --- a/types/defines/trace.d.ts +++ b/types/defines/trace.d.ts @@ -74,10 +74,6 @@ interface FetchResponseInfo { readonly statusCode: number; } -interface ConnectEventInfo { - readonly type: "connect"; -} - type EventOutcome = "ok" | "canceled" | "exception" | "unknown" | "killSwitch" | "daemonDown" | "exceededCpu" | "exceededMemory" | "loadShed" | "responseStreamDisconnected" | "scriptNotFound"; @@ -99,10 +95,10 @@ interface Onset { readonly scriptName?: string; readonly scriptTags?: string[]; readonly scriptVersion?: ScriptVersion; - readonly info: FetchEventInfo | ConnectEventInfo | JsRpcEventInfo | - ScheduledEventInfo | AlarmEventInfo | QueueEventInfo | - EmailEventInfo | TraceEventInfo | - HibernatableWebSocketEventInfo | CustomEventInfo; + readonly info: FetchEventInfo | JsRpcEventInfo | ScheduledEventInfo | + AlarmEventInfo | QueueEventInfo | EmailEventInfo | + TraceEventInfo | HibernatableWebSocketEventInfo | + CustomEventInfo; } interface Outcome { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 87d7dc1376f..b42c699ec90 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -511,11 +511,6 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -557,7 +552,6 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -590,7 +584,6 @@ declare abstract class ColoLocalActorNamespace { } interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -608,7 +601,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3280,7 +3273,6 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3310,7 +3302,6 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -13262,7 +13253,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -13283,7 +13273,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -14283,9 +14272,6 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -14316,7 +14302,6 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index d52a5c56a9b..646e4246f84 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -513,11 +513,6 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -export type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -559,7 +554,6 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -592,7 +586,6 @@ export declare abstract class ColoLocalActorNamespace { } export interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -610,7 +603,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3286,7 +3279,6 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3316,7 +3308,6 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -13229,7 +13220,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -13250,7 +13240,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -14240,9 +14229,6 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -14273,7 +14259,6 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index 8e6b76d564d..65c259fb2b3 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -489,11 +489,6 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -535,7 +530,6 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -564,7 +558,6 @@ interface Cloudflare { } interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -582,7 +575,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3129,7 +3122,6 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3159,7 +3151,6 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12549,7 +12540,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12570,7 +12560,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13570,9 +13559,6 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -13603,7 +13589,6 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index f99255aea9e..223e1c0539a 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -491,11 +491,6 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -export type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -537,7 +532,6 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -566,7 +560,6 @@ export interface Cloudflare { } export interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -584,7 +577,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3135,7 +3128,6 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3165,7 +3157,6 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12516,7 +12507,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12537,7 +12527,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13527,9 +13516,6 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -13560,7 +13546,6 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo