Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/workerd/api/export-loopback.c++
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@

namespace workerd::api {

jsg::Ref<Fetcher> LoopbackServiceStub::call(jsg::Lock& js, Options options) {
jsg::Ref<Fetcher> LoopbackServiceStub::callImpl(jsg::Lock& js,
jsg::Optional<jsg::JsRef<jsg::JsObject>> propsMaybe,
jsg::Optional<OptionsWithVersion::Version> versionMaybe) {
Frankenvalue props;
KJ_IF_SOME(p, options.props) {
KJ_IF_SOME(p, propsMaybe) {
props = Frankenvalue::fromJs(js, p.getHandle(js));
}
kj::Maybe<IoChannelFactory::VersionRequest> versionRequest;
KJ_IF_SOME(version, kj::mv(versionMaybe)) {
versionRequest = IoChannelFactory::VersionRequest{
.cohort = kj::mv(version.cohort).orDefault(kj::none),
};
}

IoContext& ioctx = IoContext::current();
auto channelObj = ioctx.getIoChannelFactory().getSubrequestChannel(channel, kj::mv(props));
auto channelObj = ioctx.getIoChannelFactory().getSubrequestChannel(
channel, kj::mv(props), kj::mv(versionRequest));
return js.alloc<Fetcher>(ioctx.addObject(kj::mv(channelObj)));
}

Expand Down
68 changes: 56 additions & 12 deletions src/workerd/api/export-loopback.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "actor.h"
#include "http.h"

#include <workerd/io/io-channels.h>

namespace workerd::api {

// LoopbackServiceStub is the type of a property of `ctx.exports` which points back at a stateless
Expand All @@ -26,26 +28,66 @@ class LoopbackServiceStub: public Fetcher {
JSG_STRUCT(props);
};

struct OptionsWithVersion {
struct Version {
jsg::Optional<kj::Maybe<kj::String>> cohort;

JSG_STRUCT(cohort);
};

jsg::Optional<jsg::JsRef<jsg::JsObject>> props;
jsg::Optional<Version> version;

JSG_STRUCT(props, version);
};

// Create a specialized Fetcher which can be passed over RPC.
jsg::Ref<Fetcher> call(jsg::Lock& js, Options options);
jsg::Ref<Fetcher> callImpl(jsg::Lock& js,
jsg::Optional<jsg::JsRef<jsg::JsObject>> propsMaybe,
jsg::Optional<OptionsWithVersion::Version> versionMaybe);

jsg::Ref<Fetcher> call(jsg::Lock& js, Options options) {
return callImpl(js, kj::mv(options.props), kj::none);
}

jsg::Ref<Fetcher> callWithVersion(jsg::Lock& js, OptionsWithVersion options) {
return callImpl(js, kj::mv(options.props), kj::mv(options.version));
}

// Note that `LoopbackServiceStub` is intentionally NOT serializable, unlike its parent class
// Fetcher. We want people to explicitly specialize the entrypoint with props before sending
// it off to other services.

JSG_RESOURCE_TYPE(LoopbackServiceStub) {
JSG_RESOURCE_TYPE(LoopbackServiceStub, CompatibilityFlags::Reader flags) {
JSG_INHERIT(Fetcher);
JSG_CALLABLE(call);

if (flags.getEnableVersionApi()) {
JSG_CALLABLE(callWithVersion);
} else {
JSG_CALLABLE(call);
}

JSG_TS_ROOT();
JSG_TS_OVERRIDE(
type LoopbackServiceStub<
T extends Rpc.WorkerEntrypointBranded | undefined = undefined
> = Fetcher<T> &
( T extends CloudflareWorkersModule.WorkerEntrypoint<any, infer Props>
? (opts: {props?: Props}) => Fetcher<T>
: (opts: {props?: any}) => Fetcher<T>);
);

if (flags.getEnableVersionApi()) {
JSG_TS_OVERRIDE(
type LoopbackServiceStub<
T extends Rpc.WorkerEntrypointBranded | undefined = undefined
> = Fetcher<T> &
( T extends CloudflareWorkersModule.WorkerEntrypoint<any, infer Props>
? (opts: {props?: Props, version?: { cohort?: string | null }}) => Fetcher<T>
: (opts: {props?: any, version?: { cohort?: string | null }}) => Fetcher<T>);
);
} else {
JSG_TS_OVERRIDE(
type LoopbackServiceStub<
T extends Rpc.WorkerEntrypointBranded | undefined = undefined
> = Fetcher<T> &
( T extends CloudflareWorkersModule.WorkerEntrypoint<any, infer Props>
? (opts: {props?: Props}) => Fetcher<T>
: (opts: {props?: any}) => Fetcher<T>);
);
}

// LoopbackForExport takes the type of an exported value and evaluates to the appropriate
// loopback stub for that export.
Expand Down Expand Up @@ -170,7 +212,9 @@ class LoopbackColoLocalActorNamespace: public ColoLocalActorNamespace {
};

#define EW_EXPORT_LOOPBACK_ISOLATE_TYPES \
api::LoopbackServiceStub, api::LoopbackServiceStub::Options, api::LoopbackDurableObjectClass, \
api::LoopbackServiceStub, api::LoopbackServiceStub::Options, \
api::LoopbackServiceStub::OptionsWithVersion, \
api::LoopbackServiceStub::OptionsWithVersion::Version, api::LoopbackDurableObjectClass, \
api::LoopbackDurableObjectClass::Options, api::LoopbackDurableObjectNamespace, \
api::LoopbackColoLocalActorNamespace

Expand Down
6 changes: 6 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -1420,4 +1420,10 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# reason string exceeds 123 bytes when UTF-8 encoded, as required by the
# WHATWG WebSocket spec and RFC 6455 Section 5.5. Previously, workerd allowed
# arbitrarily long close reasons without validation.

enableVersionApi @165 :Bool
$compatEnableFlag("enable_version_api")
$experimental;
# Enables version-related APIs. This currently only enables the `version` option in loopback
# bindings to specify a requested version. The behaviour of this flag will change in the future.
}
21 changes: 17 additions & 4 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ class IoChannelFactory {
double startTime = dateNow();
};

// Parameters that can influence the version of a worker that is used to serve a subrequest.
struct VersionRequest {
// Request a version within the given cohort.
kj::Maybe<kj::String> cohort;

VersionRequest clone() const {
return {
.cohort = cohort.map([](const kj::String& s) { return kj::str(s); }),
};
}
};

virtual kj::Own<WorkerInterface> startSubrequest(uint channel, SubrequestMetadata metadata) = 0;

// Get a Cap'n Proto RPC capability. Various binding types are backed by capabilities.
Expand Down Expand Up @@ -194,12 +206,13 @@ class IoChannelFactory {
// The reason to use this instead is when the channel is not necessarily going to be used to
// start a subrequest immediately, but instead is going to be passed around as a capability.
//
// `props` can only be specified if this is a loopback channel (i.e. from ctx.exports). For any
// other channel, it will throw.
// `props` and `versionRequest` can only be specified if this is a loopback channel (i.e. from
// ctx.exports). For any other channel, they will throw.
//
// TODO(cleanup): Consider getting rid of `startSubrequest()` in favor of this.
virtual kj::Own<SubrequestChannel> getSubrequestChannel(
uint channel, kj::Maybe<Frankenvalue> props = kj::none) = 0;
virtual kj::Own<SubrequestChannel> getSubrequestChannel(uint channel,
kj::Maybe<Frankenvalue> props = kj::none,
kj::Maybe<VersionRequest> versionRequest = kj::none) = 0;

// Stub for a remote actor. Allows sending requests to the actor.
class ActorChannel: public SubrequestChannel {
Expand Down
42 changes: 42 additions & 0 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4450,6 +4450,48 @@ KJ_TEST("Server: ctx.exports self-referential bindings") {
"{}, {\"foo\":123,\"bar\":\"abc\"}, false");
}

KJ_TEST("Server: loopback binding calls accept version property") {
TestServer test(R"((
services = [
( name = "hello",
worker = (
compatibilityDate = "2025-08-01",
compatibilityFlags = ["enable_ctx_exports", "enable_version_api"],
modules = [
( name = "main.js",
esModule =
`export default {
` async fetch(request, env, ctx) {
` const serviceVersions = await Promise.all([
` ctx.exports.default({ version: {} }),
` ctx.exports.default({ version: { cohort: null } }),
` ctx.exports.default({ version: { cohort: "test" } }),
` ctx.exports.default({ props: {}, version: { cohort: "test" } }),
` ].map(service => service.version));
` if (serviceVersions.every(version => version === this.version)) {
` return new Response(serviceVersions[0]);
` }
` return new Response(null, { status: 500 });
` },
` get version() { return "constant"; },
`}
)
],
)
),
],
sockets = [
( name = "main", address = "test-addr", service = "hello" ),
]
))"_kj);

test.server.allowExperimental();
test.start();

auto conn = test.connect("test-addr");
conn.httpGet200("/", "constant");
}

// =======================================================================================

// TODO(beta): Test TLS (send and receive)
Expand Down
5 changes: 3 additions & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3291,8 +3291,9 @@ class Server::WorkerService final: public Service,
co_return;
}

kj::Own<SubrequestChannel> getSubrequestChannel(
uint channel, kj::Maybe<Frankenvalue> props) override {
kj::Own<SubrequestChannel> getSubrequestChannel(uint channel,
kj::Maybe<Frankenvalue> props,
kj::Maybe<VersionRequest> versionRequest) override {
auto& channels =
KJ_REQUIRE_NONNULL(ioChannels.tryGet<LinkedIoChannels>(), "link() has not been called");

Expand Down
5 changes: 3 additions & 2 deletions src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ struct DummyIoChannelFactory final: public IoChannelFactory {
KJ_FAIL_ASSERT("no subrequests");
}

kj::Own<SubrequestChannel> getSubrequestChannel(
uint channel, kj::Maybe<Frankenvalue> props) override {
kj::Own<SubrequestChannel> getSubrequestChannel(uint channel,
kj::Maybe<Frankenvalue> props,
kj::Maybe<VersionRequest> versionRequest) override {
KJ_FAIL_ASSERT("no subrequests");
}

Expand Down
14 changes: 12 additions & 2 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4099,8 +4099,18 @@ type LoopbackServiceStub<
T extends Rpc.WorkerEntrypointBranded | undefined = undefined,
> = Fetcher<T> &
(T extends CloudflareWorkersModule.WorkerEntrypoint<any, infer Props>
? (opts: { props?: Props }) => Fetcher<T>
: (opts: { props?: any }) => Fetcher<T>);
? (opts: {
props?: Props;
version?: {
cohort?: string | null;
};
}) => Fetcher<T>
: (opts: {
props?: any;
version?: {
cohort?: string | null;
};
}) => Fetcher<T>);
type LoopbackDurableObjectClass<
T extends Rpc.DurableObjectBranded | undefined = undefined,
> = DurableObjectClass<T> &
Expand Down
14 changes: 12 additions & 2 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4108,8 +4108,18 @@ export type LoopbackServiceStub<
T extends Rpc.WorkerEntrypointBranded | undefined = undefined,
> = Fetcher<T> &
(T extends CloudflareWorkersModule.WorkerEntrypoint<any, infer Props>
? (opts: { props?: Props }) => Fetcher<T>
: (opts: { props?: any }) => Fetcher<T>);
? (opts: {
props?: Props;
version?: {
cohort?: string | null;
};
}) => Fetcher<T>
: (opts: {
props?: any;
version?: {
cohort?: string | null;
};
}) => Fetcher<T>);
export type LoopbackDurableObjectClass<
T extends Rpc.DurableObjectBranded | undefined = undefined,
> = DurableObjectClass<T> &
Expand Down