Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/workerd/api/worker-loader.c++
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ jsg::Ref<Fetcher> WorkerStub::getEntrypoint(jsg::Lock& js,
jsg::Optional<kj::Maybe<kj::String>> name,
jsg::Optional<EntrypointOptions> options) {
Frankenvalue props;

kj::Maybe<ResourceLimits> limits;
KJ_IF_SOME(o, options) {
KJ_IF_SOME(p, o.props) {
props = Frankenvalue::fromJs(js, p.getHandle(js));
}
limits = o.limits;
}

kj::Maybe<kj::String> entrypointName;
Expand All @@ -30,19 +31,20 @@ jsg::Ref<Fetcher> WorkerStub::getEntrypoint(jsg::Lock& js,
}
}

auto subreqChannel = channel->getEntrypoint(kj::mv(entrypointName), kj::mv(props));
auto subreqChannel = channel->getEntrypoint(kj::mv(entrypointName), kj::mv(props), limits);
return js.alloc<Fetcher>(IoContext::current().addObject(kj::mv(subreqChannel)));
}

jsg::Ref<DurableObjectClass> WorkerStub::getDurableObjectClass(jsg::Lock& js,
jsg::Optional<kj::Maybe<kj::String>> name,
jsg::Optional<EntrypointOptions> options) {
Frankenvalue props;

kj::Maybe<ResourceLimits> limits;
KJ_IF_SOME(o, options) {
KJ_IF_SOME(p, o.props) {
props = Frankenvalue::fromJs(js, p.getHandle(js));
}
limits = o.limits;
}

kj::Maybe<kj::String> entrypointName;
Expand All @@ -55,7 +57,7 @@ jsg::Ref<DurableObjectClass> WorkerStub::getDurableObjectClass(jsg::Lock& js,
}

return js.alloc<DurableObjectClass>(IoContext::current().addObject(
channel->getActorClass(kj::mv(entrypointName), kj::mv(props))));
channel->getActorClass(kj::mv(entrypointName), kj::mv(props), limits)));
}

jsg::Ref<WorkerStub> WorkerLoader::get(
Expand Down Expand Up @@ -157,6 +159,7 @@ DynamicWorkerSource WorkerLoader::toDynamicWorkerSource(jsg::Lock& js,

return {.source = kj::mv(extractedSource),
.compatibilityFlags = compatFlags,
.limits = code.limits,
.env = kj::mv(env),
.globalOutbound = kj::mv(globalOutbound),
.tails = kj::mv(tailChannels),
Expand Down
8 changes: 6 additions & 2 deletions src/workerd/api/worker-loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class WorkerStub: public jsg::Object {

struct EntrypointOptions {
jsg::Optional<jsg::JsRef<jsg::JsObject>> props;
jsg::Optional<ResourceLimits> limits;

JSG_STRUCT(props);
JSG_STRUCT(props, limits);
};

jsg::Ref<Fetcher> getEntrypoint(jsg::Lock& js,
Expand Down Expand Up @@ -90,6 +91,8 @@ class WorkerLoader: public jsg::Object {
jsg::Optional<kj::Array<kj::String>> compatibilityFlags;
jsg::Optional<bool> allowExperimental = false;

jsg::Optional<ResourceLimits> limits;

kj::String mainModule;

// Modules are specified as an object mapping names to content. If the content is just a
Expand All @@ -116,6 +119,7 @@ class WorkerLoader: public jsg::Object {
JSG_STRUCT(compatibilityDate,
compatibilityFlags,
allowExperimental,
limits,
mainModule,
modules,
env,
Expand Down Expand Up @@ -156,6 +160,6 @@ class WorkerLoader: public jsg::Object {

#define EW_WORKER_LOADER_ISOLATE_TYPES \
api::WorkerStub, api::WorkerStub::EntrypointOptions, api::WorkerLoader, \
api::WorkerLoader::Module, api::WorkerLoader::WorkerCode
api::WorkerLoader::Module, api::WorkerLoader::WorkerCode, workerd::ResourceLimits

} // namespace workerd::api
20 changes: 18 additions & 2 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,30 @@ class IoChannelFactory {
ChannelTokenUsage usage, kj::ArrayPtr<const byte> token);
};

// ResourceLimits provides a means to control the resource allocation for a worker stage via a
// set of optionally overridden parameters.
struct ResourceLimits {
jsg::Optional<uint32_t> cpuMs;
jsg::Optional<uint32_t> subRequests;

JSG_STRUCT(cpuMs, subRequests);

ResourceLimits clone() const {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need a clone() method since the struct is trivially copyable anyway.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd actually shifted the struct out of edgeworker and this method was being used in quite a few places so I let it be as-is for consistency

return {cpuMs, subRequests};
}
};

// Represents a dynamically-loaded Worker to which requests can be sent.
//
// This object is returned before the Worker actually loads, so if any errors occur while loading,
// any requests sent to the Worker will fail, propagating the exception.
class WorkerStubChannel {
public:
virtual kj::Own<IoChannelFactory::SubrequestChannel> getEntrypoint(
kj::Maybe<kj::String> name, Frankenvalue props) = 0;
kj::Maybe<kj::String> name, Frankenvalue props, kj::Maybe<ResourceLimits> limits) = 0;

virtual kj::Own<IoChannelFactory::ActorClassChannel> getActorClass(
kj::Maybe<kj::String> name, Frankenvalue props) = 0;
kj::Maybe<kj::String> name, Frankenvalue props, kj::Maybe<ResourceLimits> limits) = 0;

// TODO(someday): Allow caller to enumerate entrypoints?
};
Expand All @@ -317,6 +330,8 @@ struct DynamicWorkerSource {
WorkerSource source;
CompatibilityFlags::Reader compatibilityFlags;

kj::Maybe<ResourceLimits> limits;

// `env` object to pass to the loaded worker. Can contain anything that can be serialized to
// a `Frankenvalue` (which should eventually include all binding types, RPC stubs, etc.).
Frankenvalue env;
Expand Down Expand Up @@ -347,6 +362,7 @@ struct DynamicWorkerSource {
return {
.source = source.clone(),
.compatibilityFlags = compatibilityFlags,
.limits = limits.map([](auto& limits) { return limits.clone(); }),
.env = env.clone(),
.globalOutbound = mapAddRef(globalOutbound),
.tails = KJ_MAP(t, tails) { return kj::addRef(*t); },
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4135,12 +4135,12 @@ class Server::WorkerLoaderNamespace: public kj::Refcounted {
}

kj::Own<IoChannelFactory::SubrequestChannel> getEntrypoint(
kj::Maybe<kj::String> name, Frankenvalue props) override {
kj::Maybe<kj::String> name, Frankenvalue props, kj::Maybe<ResourceLimits> limits) override {
return kj::refcounted<SubrequestChannelImpl>(addRefToThis(), kj::mv(name), kj::mv(props));
}

kj::Own<IoChannelFactory::ActorClassChannel> getActorClass(
kj::Maybe<kj::String> name, Frankenvalue props) override {
kj::Maybe<kj::String> name, Frankenvalue props, kj::Maybe<ResourceLimits> limits) override {
return kj::refcounted<ActorClassImpl>(addRefToThis(), kj::mv(name), kj::mv(props));
}

Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4249,6 +4249,7 @@ interface WorkerStub {
}
interface WorkerStubEntrypointOptions {
props?: any;
limits?: workerdResourceLimits;
}
interface WorkerLoader {
get(
Expand All @@ -4270,13 +4271,18 @@ interface WorkerLoaderWorkerCode {
compatibilityDate: string;
compatibilityFlags?: string[];
allowExperimental?: boolean;
limits?: workerdResourceLimits;
mainModule: string;
modules: Record<string, WorkerLoaderModule | string>;
env?: any;
globalOutbound?: Fetcher | null;
tails?: Fetcher[];
streamingTails?: Fetcher[];
}
interface workerdResourceLimits {
cpuMs?: number;
subRequests?: number;
}
/**
* The Workers runtime supports a subset of the Performance API, used to measure timing and performance,
* as well as timing of subrequests and other operations.
Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4255,6 +4255,7 @@ export interface WorkerStub {
}
export interface WorkerStubEntrypointOptions {
props?: any;
limits?: workerdResourceLimits;
}
export interface WorkerLoader {
get(
Expand All @@ -4276,13 +4277,18 @@ export interface WorkerLoaderWorkerCode {
compatibilityDate: string;
compatibilityFlags?: string[];
allowExperimental?: boolean;
limits?: workerdResourceLimits;
mainModule: string;
modules: Record<string, WorkerLoaderModule | string>;
env?: any;
globalOutbound?: Fetcher | null;
tails?: Fetcher[];
streamingTails?: Fetcher[];
}
export interface workerdResourceLimits {
cpuMs?: number;
subRequests?: number;
}
/**
* The Workers runtime supports a subset of the Performance API, used to measure timing and performance,
* as well as timing of subrequests and other operations.
Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/latest/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3882,6 +3882,7 @@ interface WorkerStub {
}
interface WorkerStubEntrypointOptions {
props?: any;
limits?: workerdResourceLimits;
}
interface WorkerLoader {
get(
Expand All @@ -3903,13 +3904,18 @@ interface WorkerLoaderWorkerCode {
compatibilityDate: string;
compatibilityFlags?: string[];
allowExperimental?: boolean;
limits?: workerdResourceLimits;
mainModule: string;
modules: Record<string, WorkerLoaderModule | string>;
env?: any;
globalOutbound?: Fetcher | null;
tails?: Fetcher[];
streamingTails?: Fetcher[];
}
interface workerdResourceLimits {
cpuMs?: number;
subRequests?: number;
}
/**
* The Workers runtime supports a subset of the Performance API, used to measure timing and performance,
* as well as timing of subrequests and other operations.
Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/latest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3888,6 +3888,7 @@ export interface WorkerStub {
}
export interface WorkerStubEntrypointOptions {
props?: any;
limits?: workerdResourceLimits;
}
export interface WorkerLoader {
get(
Expand All @@ -3909,13 +3910,18 @@ export interface WorkerLoaderWorkerCode {
compatibilityDate: string;
compatibilityFlags?: string[];
allowExperimental?: boolean;
limits?: workerdResourceLimits;
mainModule: string;
modules: Record<string, WorkerLoaderModule | string>;
env?: any;
globalOutbound?: Fetcher | null;
tails?: Fetcher[];
streamingTails?: Fetcher[];
}
export interface workerdResourceLimits {
cpuMs?: number;
subRequests?: number;
}
/**
* The Workers runtime supports a subset of the Performance API, used to measure timing and performance,
* as well as timing of subrequests and other operations.
Expand Down
Loading