Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions samples/tcp-ingress/config.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Workerd = import "/workerd/workerd.capnp";

const tcpIngressExample :Workerd.Config = (
services = [
(name = "main", worker = .worker),
],

sockets = [
( name = "http", address = "*:8080", http = (), service = "main" ),
( name = "tcp", address = "*:8081", tcp = (), service = "main" )
]
);

const worker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "worker.js")
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
compatibilityDate = "2023-02-28",
);
11 changes: 11 additions & 0 deletions samples/tcp-ingress/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

export default {
async fetch(req) {
return new Response("ok");
},

async connect(socket) {
// pipe the input stream to the output
await socket.readable.pipeTo(socket.writable);
}
};
3 changes: 2 additions & 1 deletion src/workerd/api/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ class DurableObject final: public Fetcher {

JSG_TS_DEFINE(interface DurableObject {
fetch(request: Request): Response | Promise<Response>;
connect?(socket: Socket): void | Promise<void>;
alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise<void>;
webSocketMessage?(ws: WebSocket, message: string | ArrayBuffer): void | Promise<void>;
webSocketClose?(ws: WebSocket, code: number, reason: string, wasClean: boolean): void | Promise<void>;
webSocketError?(ws: WebSocket, error: unknown): void | Promise<void>;
});
JSG_TS_OVERRIDE(
type DurableObjectStub<T extends Rpc.DurableObjectBranded | undefined = undefined> =
Fetcher<T, "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError">
Fetcher<T, "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError">
& {
readonly id: DurableObjectId;
readonly name?: string;
Expand Down
50 changes: 50 additions & 0 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jsg::LenientOptional<T> mapAddRef(jsg::Lock& js, jsg::LenientOptional<T>& functi
ExportedHandler ExportedHandler::clone(jsg::Lock& js) {
return ExportedHandler{
.fetch{mapAddRef(js, fetch)},
.connect{mapAddRef(js, connect)},
.tail{mapAddRef(js, tail)},
.trace{mapAddRef(js, trace)},
.tailStream{mapAddRef(js, tailStream)},
Expand Down Expand Up @@ -118,6 +119,55 @@ void ServiceWorkerGlobalScope::clear() {
unhandledRejections.clear();
}

kj::Promise<void> ServiceWorkerGlobalScope::connect(kj::String host,
const kj::HttpHeaders& headers,
kj::AsyncIoStream& connection,
kj::HttpService::ConnectResponse& response,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler) {
ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error,
"Connect ingress is not currently supported with Service Workers syntax.");
KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(),
"connect handling requires the experimental flag.");

KJ_IF_SOME(handler, eh.connect) {
// Has a connect handler!
response.accept(200, "OK", headers);

// Using neuterable stream to manage lifetime of stream promises
auto ownConnection = newNeuterableIoStream(connection);

auto& ioContext = IoContext::current();
jsg::Lock& js = lock;

auto input = kj::str("fake://", host);
auto url = JSG_REQUIRE_NONNULL(
jsg::Url::tryParse(input.asPtr()), TypeError, "Specified address could not be parsed.");
auto hostName = url.getHostname();
auto port = url.getPort();
JSG_REQUIRE(hostName != ""_kj, TypeError, "Specified address is missing hostname.");
JSG_REQUIRE(port != ""_kj, TypeError, "Specified address is missing port.");

// TLS support is not implemented so far.
auto nullTlsStarter = kj::heap<kj::TlsStarterCallback>();
// We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to
// provide a more descriptive error message for HTTP, but this is not relevant on the TCP server
// side.
jsg::Ref<Socket> jsSocket = setupSocket(js, kj::mv(ownConnection), kj::mv(host), kj::none,
kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::str(hostName), false, kj::none);
// handleProxyStatus() is required to indicate that the socket was opened properly. Since the
// connection is already open at this point, exception handling is not required.
jsSocket->handleProxyStatus(js, kj::Promise<kj::Maybe<kj::Exception>>(kj::none));

kj::Maybe<SpanBuilder> span = ioContext.makeTraceSpan("connect_handler"_kjc);
auto promise = handler(js, kj::mv(jsSocket), eh.env.addRef(js), eh.getCtx());
return ioContext.awaitJs(js, kj::mv(promise)).attach(kj::mv(span));
}
lock.logWarningOnce("Received a connect event but we lack a handler. "
"Did you remember to export a connect() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function.");
}

kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(kj::HttpMethod method,
kj::StringPtr url,
const kj::HttpHeaders& headers,
Expand Down
16 changes: 16 additions & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "http.h"
#include "messagechannel.h"
#include "performance.h"
#include "sockets.h"

#include <workerd/api/hibernation-event-params.h>
#ifdef WORKERD_FUZZILLI
Expand Down Expand Up @@ -350,6 +351,10 @@ struct ExportedHandler {
jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
jsg::LenientOptional<jsg::Function<FetchHandler>> fetch;

using ConnectHandler = jsg::Promise<void>(
jsg::Ref<Socket> socket, jsg::Value env, jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
jsg::LenientOptional<jsg::Function<ConnectHandler>> connect;

using TailHandler = kj::Promise<void>(kj::Array<jsg::Ref<TraceItem>> events,
jsg::Value env,
jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
Expand Down Expand Up @@ -389,6 +394,7 @@ struct ExportedHandler {
jsg::SelfRef self;

JSG_STRUCT(fetch,
connect,
tail,
trace,
tailStream,
Expand All @@ -406,6 +412,7 @@ struct ExportedHandler {

JSG_STRUCT_TS_DEFINE(
type ExportedHandlerFetchHandler<Env = unknown, CfHostMetadata = unknown, Props = unknown> = (request: Request<CfHostMetadata, IncomingRequestCfProperties<CfHostMetadata>>, env: Env, ctx: ExecutionContext<Props>) => Response | Promise<Response>;
type ExportedHandlerConnectHandler<Env = unknown, Props = unknown> = (socket: Socket, env: Env, ctx: ExecutionContext<Props>) => void | Promise<void>;
type ExportedHandlerTailHandler<Env = unknown, Props = unknown> = (events: TraceItem[], env: Env, ctx: ExecutionContext<Props>) => void | Promise<void>;
type ExportedHandlerTraceHandler<Env = unknown, Props = unknown> = (traces: TraceItem[], env: Env, ctx: ExecutionContext<Props>) => void | Promise<void>;
type ExportedHandlerTailStreamHandler<Env = unknown, Props = unknown> = (event : TailStream.TailEvent<TailStream.Onset>, env: Env, ctx: ExecutionContext<Props>) => TailStream.TailEventHandlerType | Promise<TailStream.TailEventHandlerType>;
Expand All @@ -416,6 +423,7 @@ struct ExportedHandler {
JSG_STRUCT_TS_OVERRIDE(<Env = unknown, QueueHandlerMessage = unknown, CfHostMetadata = unknown, Props = unknown> {
email?: EmailExportedHandler<Env, Props>;
fetch?: ExportedHandlerFetchHandler<Env, CfHostMetadata, Props>;
connect?: ExportedHandlerConnectHandler<Env, Props>;
tail?: ExportedHandlerTailHandler<Env, Props>;
trace?: ExportedHandlerTraceHandler<Env, Props>;
tailStream?: ExportedHandlerTailStreamHandler<Env, Props>;
Expand Down Expand Up @@ -515,6 +523,14 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module
// exports and move that code somewhere more appropriate.

// Received TCP/socket ingress (called from C++, not JS).
kj::Promise<void> connect(kj::String host,
const kj::HttpHeaders& headers,
kj::AsyncIoStream& connection,
kj::HttpService::ConnectResponse& response,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

// Received sendTraces (called from C++, not JS).
void sendTraces(kj::ArrayPtr<kj::Own<Trace>> traces,
Worker::Lock& lock,
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ jsg::Promise<void> Socket::close(jsg::Lock& js) {
}).catch_(js, [this](jsg::Lock& js, jsg::Value err) { errorHandler(js, kj::mv(err)); });
}

jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) {
auto pipeA = sock->readable->pipeTo(js, writable.addRef(), {});
auto pipeB = readable->pipeTo(js, sock->writable.addRef(), {});
return pipeA.then(js, [pipeB = kj::mv(pipeB)](jsg::Lock& js) mutable { kj::mv(pipeB); });
}

jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOptions) {
JSG_REQUIRE(
secureTransport != SecureTransportKind::ON, TypeError, "Cannot startTls on a TLS socket.");
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/sockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class Socket: public jsg::Object {
// closing.
jsg::Promise<void> close(jsg::Lock& js);

// Proxies to the other socket. Equivalent to:
// a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable);
// TODO: May want to add jsg::Optional<PipeToOptions> options?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about this – one issue with taking options is that we can't apply them to both streams since the AbortSignal field can't be copied/shared between the two streams(?), having two options parameters instead seems highly inelegant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the single AbortSignal be used to abort both directions? I think that's what people would want.

That said, I don't think we necessarily need to support the options. People can always call pipeTo() manually if they want options.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should absolutely be just a single AbortSignal. Users should not have to pass two and there really should not be a reason to.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I agree, there is no reason to pass PipeToOptions. AbortSignal is really the only option in this case that matters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we accept an AbortSignal it should still be part of an options struct.

If we're accepting an options struct then I think it might as well be PipeToOptions, and we might as well pass all the options into both pipeTo()s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The doc comment says this is equivalent to a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); (which reads as concurrent), but the implementation chains them sequentially. If the implementation is intentionally sequential, the comment should say so; if concurrent, the implementation needs fixing (see comment on the .c++ file).

jsg::Promise<void> proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this should be a method on Socket itself (which requires that we also add it to the pending standard draft (https://sockets-api.proposal.wintertc.org/). Instead, I think I'd rather prefer an API on the cloudflare:sockets module namespace like:

import { connect, proxy } from 'cloudflare:sockets';

// ...

export default {
  async connect(socket) {
    proxy(socket, connect('...'));
  }
}

Notice I also dropped the To off of that... Since the proxy is bidirectional, I'm not sure if proxyTo is the ideal name here when just proxy is descriptive enough.

Having proxyTo be a method on the Socket itself sets up a bit of an awkward question: if I have socketA or socketB, should I use socketA.proxyTo(socketB) or socketB.proxyTo(socketA)? But with a proxy export the semantic just becomes proxy(socketA, socketB) or proxy(socketB, socketA) where both are exactly equivalent and the order does not matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to add methods without standardizing them first. In fact that's how it's supposed to work -- somebody implements the new feature first, then if desired it gets standardized later.

That said I'm not necessarily against making proxy a freestanding function. I think it's less convenient but it's not a hill I'd die on.

I don't think the a.proxyTo(b) vs. b.proxyTo(a) thing is that confusing -- they are in fact equivalent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the spec also cover whatever methods are in the sockets module? I'd expect it to be added to the spec eventually too even if it isn't a method on Socket.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the spec also cover whatever methods are in the sockets module?

Not necessarily. The spec currently only covers the connect and Socket; helper/convenience functions are not necessarily in scope.


// Flushes write buffers then performs a TLS handshake on the current Socket connection.
// The current `Socket` instance is closed and its readable/writable instances are also closed.
// All new operations should be performed on the new `Socket` instance.
Expand Down Expand Up @@ -155,6 +160,7 @@ class Socket: public jsg::Object {
JSG_READONLY_PROTOTYPE_PROPERTY(secureTransport, getSecureTransport);
JSG_METHOD(close);
JSG_METHOD(startTls);
JSG_METHOD(proxyTo);

JSG_TS_OVERRIDE({
get secureTransport(): 'on' | 'off' | 'starttls';
Expand Down
17 changes: 17 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ wd_test(
data = ["delete-all-deletes-alarm-test.js"],
)

wd_test(
src = "connect-handler-test.wd-test",
args = ["--experimental"],
data = [
"connect-handler-test.js",
"connect-handler-test-proxy.js",
],
# Test uses TCP sockets for ports 8081-8083 and may fail when running concurrently with other
# tests that do so.
tags = ["exclusive"],
)

wd_test(
src = "actor-alarms-test.wd-test",
args = ["--experimental"],
Expand All @@ -42,7 +54,10 @@ wd_test(
"tail-worker-test-invalid.js",
"tail-worker-test-jsrpc.js",
"websocket-hibernation.js",
"connect-handler-test.js",
"connect-handler-test-proxy.js",
],
tags = ["exclusive"],
)

# Test to validate timing semantics for JSRPC streaming responses.
Expand Down Expand Up @@ -327,6 +342,7 @@ wd_test(
src = "js-rpc-test.wd-test",
args = ["--experimental"],
data = ["js-rpc-test.js"],
tags = ["exclusive"],
)

wd_test(
Expand Down Expand Up @@ -577,6 +593,7 @@ wd_test(
"--no-verbose",
],
data = ["js-rpc-test.js"],
tags = ["exclusive"],
)

wd_test(
Expand Down
19 changes: 19 additions & 0 deletions src/workerd/api/tests/connect-handler-test-proxy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { connect } from 'cloudflare:sockets';
import { WorkerEntrypoint } from 'cloudflare:workers';

export class ConnectProxy extends WorkerEntrypoint {
async connect(socket) {
// proxy for ConnectEndpoint instance on port 8083.
let upstream = connect('localhost:8083');
await socket.proxyTo(upstream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern with the await will make this difficult to optimize around using deferred proxy. Or, more specifically, this ends up requiring that the entire proxy run with the isolate keep around for the lifetime of the pipeline which is not what we want.

If a worker is just connecting two sockets together and not doing anything else, we need to be able to make the connection at the kj streams level, allow the IoContext to go away while the proxy pipeline is still flowing, just like we would do if we had something like return fetch('whatever') in a regular handler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is arguably an advantage of the version of the connect handler that returns a socket instead of receiving it as a parameter, and if we can't figure out a way to make this work, we may have to switch back to that approach. Deferred proxying is very important here.

That said, I wonder if there's any way we can dig through the promise chain and discover if we're waiting on a promise that is actually just a KJ promise.

This could benefit the return fetch(req) use case too. Currently, we cannot start deferred proxying for HTTP until fetch() returns (i.e. response headers are received), because we have to await the fetch promise in JS land. Once we get a repsonse, then we can deferred-proxy its body. But it would be really nice if we could start deferred proxying much earlier, by figuring out that the promise we're waiting on is just a fetch promise and therefore we don't need to keep running JS.

Or another idea: Maybe proxyTo() just shouldn't return a promise. We could document that there's no way to wait for it to complete. The connection has been handed off to the runtime to proxy, and it'll push bytes until there's no more bytes to push, then close.

The latter idea is obviously easier to implement and seems pretty OK to me for now (though it would still be useful to explore promise-unwrapping in the future).

Copy link
Collaborator

@jasnell jasnell Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... that might be a bit tricky. Imagine a case like:

// In the fetch handler
const responsePromise = fetch('...');
responsePromise.then((response) => {
  // This is guaranteed to run before our internal handling of the response
  // and may have observable side effects
});
return responsePromise;

I think the only way for it to work would be also determining that there are no other reactions on the promise that could trigger observable side effects.

Maybe proxyTo() just shouldn't return a promise.

I'm leaning this direction also. So long as there's a way to optionally cancel the proxy (e.g. proxyTo(A, B, { signal: abortSignal }) then I don't actually think there's a really strong reason this MUST return a promise at all.

But then again, there's a certain simple elegance if we could just return the Socket we're wiring up and have the runtime set it up... But this raises a question: do we want to support cases like:

export default {
  fetch() {
    // Intentionally using fetch here, not connect
    const socketA = connect('...');
    const socketB = connect('...');
    socketA.proxyTo(socketB);
    // ...
  }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to be clear, I know that latter pattern is already possible with the current API... I just don't know if we want to encourage it by making it easier and more ergonomic. I don't have a an immediate good reason to say no, just being thorough in thinking through the new API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support proxying between any two sockets.

Seems like just returning void from proxyTo() is the best option here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

}
}

export class ConnectEndpoint extends WorkerEntrypoint {
async connect(socket) {
const enc = new TextEncoder();
let writer = socket.writable.getWriter();
await writer.write(enc.encode('hello-from-endpoint'));
await writer.close();
}
}
45 changes: 45 additions & 0 deletions src/workerd/api/tests/connect-handler-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { connect } from 'cloudflare:sockets';
import { ok, strictEqual } from 'assert';

export const connectHandler = {
async test() {
// Check that the connect handler can send a message through a socket
const socket = connect('localhost:8081');
await socket.opened;
const dec = new TextDecoder();
let result = '';
for await (const chunk of socket.readable) {
result += dec.decode(chunk, { stream: true });
}
result += dec.decode();
strictEqual(result, 'hello');
await socket.closed;
},
};

export const connectHandlerProxy = {
async test() {
// Check that we can get a message proxied through a connect handler. This call connects us with
// an instance of Server, which serves as a proxy for an instance of OtherServer, as defined in
// connect-handler-test-proxy.js.
const socket = connect('localhost:8082');
await socket.opened;
const dec = new TextDecoder();
let result = '';
for await (const chunk of socket.readable) {
result += dec.decode(chunk, { stream: true });
}
result += dec.decode();
strictEqual(result, 'hello-from-endpoint');
await socket.closed;
},
};

export default {
async connect(socket) {
const enc = new TextEncoder();
let writer = socket.writable.getWriter();
await writer.write(enc.encode('hello'));
await writer.close();
},
};
36 changes: 36 additions & 0 deletions src/workerd/api/tests/connect-handler-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "connect-handler-test",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-test.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
)
),
( name = "connect-handler-test-proxy",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-test-proxy.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
)
),
( name = "connect-handler-test-endpoint",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-test-proxy.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
)
),
( name = "internet", network = ( allow = ["private"] ) )
],
sockets = [
(name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"),
(name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")),
(name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint"))
]
);
3 changes: 3 additions & 0 deletions src/workerd/api/tests/js-rpc-socket-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const unitTests :Workerd.Config = (
http = (capnpConnectHost = "cappy")
)
),
( name = "internet", network = ( allow = ["private"] ) ),
],
sockets = [
( name = "MyService-loop",
Expand Down Expand Up @@ -99,6 +100,8 @@ const unitTests :Workerd.Config = (
service = (name = "js-rpc-test", entrypoint = "GreeterFactory"),
http = (capnpConnectHost = "cappy")
),
# For testing connect() handler
(name = "tcp", address = "*:8081", tcp = (), service = "js-rpc-test")
],
v8Flags = [ "--expose-gc" ],
);
Loading
Loading