Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/workerd/api/unsafe.c++
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ jsg::Promise<void> UnsafeModule::abortAllDurableObjects(jsg::Lock& js) {
return js.resolvedPromise();
}

jsg::Promise<void> UnsafeModule::deleteAllDurableObjects(jsg::Lock& js) {
auto& context = IoContext::current();

auto exception =
JSG_KJ_EXCEPTION(FAILED, Error, "Application called deleteAllDurableObjects().");
context.deleteAllActors(exception);

return js.resolvedPromise();
}

bool UnsafeModule::isTestAutogateEnabled() {
return util::Autogate::isEnabled(util::AutogateKey::TEST_WORKERD);
}
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/unsafe.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,17 @@ class UnsafeModule: public jsg::Object {
UnsafeModule(jsg::Lock&, const jsg::Url&) {}
jsg::Promise<void> abortAllDurableObjects(jsg::Lock& js);

// Aborts all Durable Objects and deletes all of their underlying storage, including alarms.
// After this call, DOs can be recreated with clean state. Useful for test isolation.
jsg::Promise<void> deleteAllDurableObjects(jsg::Lock& js);

// Returns true if the TEST_WORKERD autogate is enabled.
// This is used to verify that the all-autogates test variant is working correctly.
bool isTestAutogateEnabled();

JSG_RESOURCE_TYPE(UnsafeModule) {
JSG_METHOD(abortAllDurableObjects);
JSG_METHOD(deleteAllDurableObjects);
JSG_METHOD(isTestAutogateEnabled);
}
};
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ class IoChannelFactory {
KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime");
}

// Aborts all actors and deletes all underlying storage (including alarms) for all evictable
// namespaces. After this call, DOs can be recreated with clean state.
virtual void deleteAllActors(kj::Maybe<kj::Exception&> reason) {
KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime");
}

// Use a dynamic Worker loader binding to obtain an Worker by name. If name is null, or if the named Worker doesn't already exist, the callback will be called to fetch the source code from which the Worker should be created.
virtual kj::Own<WorkerStubChannel> loadIsolate(uint loaderChannel,
kj::Maybe<kj::String> name,
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,10 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
getIoChannelFactory().abortAllActors(reason);
}

void deleteAllActors(kj::Maybe<kj::Exception&> reason) {
getIoChannelFactory().deleteAllActors(reason);
}

// Get an HttpClient to use for Cache API subrequests.
kj::Own<CacheClient> getCacheClient();

Expand Down
45 changes: 44 additions & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ class Server::WorkerService final: public Service,
using LinkCallback =
kj::Function<LinkedIoChannels(WorkerService&, Worker::ValidationErrorReporter&)>;
using AbortActorsCallback = kj::Function<void(kj::Maybe<const kj::Exception&> reason)>;
using DeleteActorsCallback = kj::Function<void(kj::Maybe<const kj::Exception&> reason)>;

WorkerService(ChannelTokenHandler& channelTokenHandler,
kj::Maybe<kj::StringPtr> serviceName,
Expand All @@ -1912,6 +1913,7 @@ class Server::WorkerService final: public Service,
kj::HashSet<kj::String> actorClassEntrypointsParam,
LinkCallback linkCallback,
AbortActorsCallback abortActorsCallback,
DeleteActorsCallback deleteActorsCallback,
kj::Maybe<kj::String> dockerPathParam,
kj::Maybe<kj::String> containerEgressInterceptorImageParam,
bool isDynamic)
Expand All @@ -1926,6 +1928,7 @@ class Server::WorkerService final: public Service,
actorClassEntrypoints(kj::mv(actorClassEntrypointsParam)),
waitUntilTasks(*this),
abortActorsCallback(kj::mv(abortActorsCallback)),
deleteActorsCallback(kj::mv(deleteActorsCallback)),
dockerPath(kj::mv(dockerPathParam)),
containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImageParam)),
isDynamic(isDynamic) {}
Expand Down Expand Up @@ -2945,6 +2948,19 @@ class Server::WorkerService final: public Service,
actors.clear();
}

// Aborts all actors and deletes all underlying storage for this namespace.
void deleteAll(kj::Maybe<const kj::Exception&> reason) {
// First, abort all running actors so they release their file handles.
abortAll(reason);

// Then delete all files in the namespace's storage directory.
KJ_IF_SOME(as, actorStorage) {
for (auto& entry: as.directory->listNames()) {
as.directory->remove(kj::Path({entry}));
}
}
}

private:
kj::Own<ActorClass> actorClass;
const ActorConfig& config;
Expand Down Expand Up @@ -3238,6 +3254,7 @@ class Server::WorkerService final: public Service,
kj::HashMap<kj::StringPtr, kj::Own<ActorNamespace>> actorNamespaces;
kj::TaskSet waitUntilTasks;
AbortActorsCallback abortActorsCallback;
DeleteActorsCallback deleteActorsCallback;
kj::Maybe<kj::String> dockerPath;
kj::Maybe<kj::String> containerEgressInterceptorImage;
bool isDynamic;
Expand Down Expand Up @@ -3454,6 +3471,10 @@ class Server::WorkerService final: public Service,
abortActorsCallback(reason);
}

void deleteAllActors(kj::Maybe<kj::Exception&> reason) override {
deleteActorsCallback(reason);
}

kj::Own<WorkerStubChannel> loadIsolate(uint loaderChannel,
kj::Maybe<kj::String> name,
kj::Function<kj::Promise<DynamicWorkerSource>()> fetchSource) override;
Expand Down Expand Up @@ -3994,6 +4015,27 @@ void Server::abortAllActors(kj::Maybe<const kj::Exception&> reason) {
alarmScheduler->deleteAllAlarms();
}

void Server::deleteAllActors(kj::Maybe<const kj::Exception&> reason) {
for (auto& service: services) {
if (WorkerService* worker = dynamic_cast<WorkerService*>(&*service.value)) {
for (auto& [className, ns]: worker->getActorNamespaces()) {
bool isEvictable = true;
KJ_SWITCH_ONEOF(ns->getConfig()) {
KJ_CASE_ONEOF(c, Durable) {
isEvictable = c.isEvictable;
}
KJ_CASE_ONEOF(c, Ephemeral) {
isEvictable = c.isEvictable;
}
}
if (isEvictable) ns->deleteAll(reason);
}
}
}

alarmScheduler->deleteAllAlarms();
}
Comment on lines +4018 to +4037
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW] This method duplicates the evictable-namespace iteration from abortAllActors() right above. If a new ActorConfig variant is added or the evictability logic changes, both methods need to stay in sync.

One option: extract a helper or have deleteAllActors delegate:

Suggested change
void Server::deleteAllActors(kj::Maybe<const kj::Exception&> reason) {
for (auto& service: services) {
if (WorkerService* worker = dynamic_cast<WorkerService*>(&*service.value)) {
for (auto& [className, ns]: worker->getActorNamespaces()) {
bool isEvictable = true;
KJ_SWITCH_ONEOF(ns->getConfig()) {
KJ_CASE_ONEOF(c, Durable) {
isEvictable = c.isEvictable;
}
KJ_CASE_ONEOF(c, Ephemeral) {
isEvictable = c.isEvictable;
}
}
if (isEvictable) ns->deleteAll(reason);
}
}
}
alarmScheduler->deleteAllAlarms();
}
void Server::deleteAllActors(kj::Maybe<const kj::Exception&> reason) {
// Abort all actors and delete all alarms first (same as abortAllActors).
abortAllActors(reason);
// Then delete the underlying storage for all evictable Durable namespaces.
for (auto& service: services) {
if (WorkerService* worker = dynamic_cast<WorkerService*>(&*service.value)) {
for (auto& [className, ns]: worker->getActorNamespaces()) {
KJ_IF_SOME(as, ns->getActorStorageForTesting()) {
bool isEvictable = true;
KJ_SWITCH_ONEOF(ns->getConfig()) {
KJ_CASE_ONEOF(c, Durable) {
isEvictable = c.isEvictable;
}
KJ_CASE_ONEOF(c, Ephemeral) {
isEvictable = c.isEvictable;
}
}
if (isEvictable) {
for (auto& entry: as.directory->listNames()) {
as.directory->remove(kj::Path({entry}));
}
}
}
}
}
}
}

This is just a sketch (would need an accessor for actorStorage) — but the key idea is to avoid duplicating the abort+alarm loop. Feel free to ignore if you prefer the current factoring.


// WorkerDef is an intermediate representation of everything from `config::Worker::Reader` that
// `Server::makeWorkerImpl()` needs. Similar to `WorkerSource`, we factor out this intermediate
// representation so that we can potentially build it dynamically from input that isn't a
Expand Down Expand Up @@ -4853,7 +4895,8 @@ kj::Promise<kj::Own<Server::WorkerService>> Server::makeWorkerImpl(kj::StringPtr
kj::refcounted<WorkerService>(channelTokenHandler, serviceName, globalContext->threadContext,
monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint),
kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses),
kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath),
kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors),
KJ_BIND_METHOD(*this, deleteAllActors), kj::mv(dockerPath),
kj::mv(containerEgressInterceptorImage), def.isDynamic);
result->initActorNamespaces(def.localActorConfigs, network);
co_return result;
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl
// Aborts all actors in this server except those in namespaces marked with `preventEviction`.
void abortAllActors(kj::Maybe<const kj::Exception&> reason);

// Aborts all actors, deletes all alarms, and deletes all underlying storage for evictable
// namespaces. After this, DOs can be recreated with clean state.
void deleteAllActors(kj::Maybe<const kj::Exception&> reason);

// Can only be called in the link stage.
//
// May return a new object or may return a fake-own around a long-lived object.
Expand Down
62 changes: 62 additions & 0 deletions src/workerd/server/tests/unsafe-module/unsafe-module-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ export const TestEphemeralObjectPreventEviction = createTestObject(
'ephemeral-prevent-eviction'
);

export class StorageObject extends DurableObject {
async getValue() {
return (await this.ctx.storage.get('key')) ?? null;
}
async setValue(value) {
await this.ctx.storage.put('key', value);
}
}

let alarmTriggers = 0;
export class AlarmObject extends DurableObject {
get scheduledTime() {
Expand Down Expand Up @@ -120,3 +129,56 @@ export const test_abort_all_durable_objects_alarms = {
assert.strictEqual(alarmTriggers, 1); // (same as before)
},
};

export const test_delete_all_durable_objects = {
async test(ctrl, env, ctx) {
// Write some data to a durable object.
const id = env.STORAGE.idFromName('test-delete');
let stub = env.STORAGE.get(id);
await stub.setValue('hello');
assert.strictEqual(await stub.getValue(), 'hello');

// Delete all durable objects.
await unsafe.deleteAllDurableObjects();

// Old stub should be broken.
await assert.rejects(() => stub.getValue(), {
name: 'Error',
message: 'Application called deleteAllDurableObjects().',
});

// Recreate the stub — storage should be gone.
stub = env.STORAGE.get(id);
assert.strictEqual(await stub.getValue(), null);
},
};

export const test_delete_all_durable_objects_alarms = {
async test(ctrl, env, ctx) {
const id = env.ALARM.newUniqueId();
const stub = env.ALARM.get(id);

const alarmsBefore = alarmTriggers;
await stub.scheduleIn(500);
assert.notStrictEqual(await stub.scheduledTime, null);

// Delete everything — alarms included.
await unsafe.deleteAllDurableObjects();
await scheduler.wait(1000);
assert.strictEqual(alarmTriggers, alarmsBefore); // alarm did not fire
},
};

export const test_delete_all_durable_objects_respects_prevent_eviction = {
async test(ctrl, env, ctx) {
const id = env.DURABLE_PREVENT_EVICTION.newUniqueId();
const stub = env.DURABLE_PREVENT_EVICTION.get(id);
const res1 = await (await stub.fetch('http://x')).text();

await unsafe.deleteAllDurableObjects();

// preventEviction namespace should be untouched — same response (same instance).
const res2 = await (await stub.fetch('http://x')).text();
assert.strictEqual(res1, res2);
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const unitTests :Workerd.Config = (
( className = "TestEphemeralObject", ephemeralLocal = void ),
( className = "TestEphemeralObjectPreventEviction", ephemeralLocal = void, preventEviction = true ),
( className = "AlarmObject", uniqueKey = "alarm" ),
( className = "StorageObject", uniqueKey = "storage" ),
],
durableObjectStorage = ( localDisk = "TEST_TMPDIR" ),
bindings = [
Expand All @@ -23,6 +24,7 @@ const unitTests :Workerd.Config = (
( name = "EPHEMERAL", durableObjectNamespace = "TestEphemeralObject" ),
( name = "EPHEMERAL_PREVENT_EVICTION", durableObjectNamespace = "TestEphemeralObjectPreventEviction" ),
( name = "ALARM", durableObjectNamespace = "AlarmObject" ),
( name = "STORAGE", durableObjectNamespace = "StorageObject" ),
],
)
),
Expand Down
Loading