Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ wd_cc_library(
":io-gate",
":trace",
"//src/workerd/jsg:exception",
"//src/workerd/util:autogate",
"//src/workerd/util:duration-exceeded-logger",
"//src/workerd/util:sqlite",
"@capnp-cpp//src/capnp:capnp-rpc",
Expand Down Expand Up @@ -425,6 +424,7 @@ kj_test(
deps = [
":actor",
":io-gate",
"//src/workerd/util:autogate",
"//src/workerd/util:test",
"//src/workerd/util:test-util",
"@sqlite3",
Expand Down
114 changes: 114 additions & 0 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,16 @@ struct ActorCacheTest: public ActorCacheConvenienceWrappers {
gateBrokenPromise(options.monitorOutputGate ? eagerlyReportExceptions(gate.onBroken())
: kj::Promise<void>(kj::READY_NOW)) {}

// Simulates `count` counted alarm handler failures for `alarmTime`, leaving the cache
// in KnownAlarmTime{CLEAN, alarmTime} as AlarmManager would after each retry.
void simulateCountedAlarmRetries(kj::Date alarmTime, int count = 6) {
for (auto i = 0; i < count; i++) {
auto armResult = cache.armAlarmHandler(alarmTime, nullptr, kj::UNIX_EPOCH);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
cache.cancelDeferredAlarmDeletion();
}
}

~ActorCacheTest() noexcept(false) {
// Make sure if the output gate has been broken, the exception was reported. This is important
// to report errors thrown inside flush(), since those won't otherwise propagate into the test
Expand Down Expand Up @@ -5773,5 +5783,109 @@ KJ_TEST("ActorCache can shutdown") {
});
}

KJ_TEST("ActorCache alarm cleared by abandonAlarm after max counted retry failures") {
// After the alarm scheduler calls abandonAlarm(), the cache correctly forgets the alarm.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;

test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted handler failures.
// cancelDeferredAlarmDeletion() preserves KnownAlarmTime{CLEAN, oneMs} on each failure
// (alarm still set from cache perspective -- correct for retries 1-5).
test.simulateCountedAlarmRetries(oneMs);

// The alarm scheduler has decided to give up. It calls abandonAlarm() on the actor,
// which clears KnownAlarmTime{CLEAN, oneMs} -> KnownAlarmTime{CLEAN, null}.
test.cache.abandonAlarm(oneMs).wait(ws);

// getAlarm() now returns null from cache (no storage read).
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == kj::none);
}

KJ_TEST("ActorCache alarm preserved after ALARM_RETRY_MAX_TRIES uncounted (internal) failures") {
// When all ALARM_RETRY_MAX_TRIES failures are uncounted (retryCountsAgainstLimit=false,
// i.e. infrastructure errors), the alarm scheduler's countedRetry never reaches the limit and
// abandonAlarm is NEVER called. The alarm must remain set throughout so that the scheduler
// can keep retrying indefinitely until the infrastructure issue resolves.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto testCurrentTime = kj::UNIX_EPOCH;

test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate uncounted failures well past ALARM_RETRY_MAX_TRIES (= 6).
// countedRetry stays at 0; AlarmManager never gives up; abandonAlarm is never called.
// We've seen alarms fail hundreds of times due to infrastructure errors in production,
// so we check both at the boundary (6) and well beyond it (100).
for (auto i = 0; i < 100; i++) {
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
test.cache.cancelDeferredAlarmDeletion();

// Check at the ALARM_RETRY_MAX_TRIES boundary and at the end.
if (i == 5 || i == 99) {
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == oneMs);
}
}
}

KJ_TEST("ActorCache abandonAlarm is a no-op when a newer alarm has replaced the abandoned one") {
// If the user sets a new alarm between the last retry failure and the abandonAlarm() call,
// and that new alarm has already flushed to CLEAN, abandonAlarm() must compare the time
// and leave the new alarm untouched.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;

// Set the original alarm and flush it to CLEAN.
test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted failures.
test.simulateCountedAlarmRetries(oneMs);

// Race: user sets a new alarm (twoMs) between the last failure and abandonAlarm().
// It flushes to CLEAN before abandonAlarm() arrives, leaving KnownAlarmTime{CLEAN, twoMs}.
test.setAlarm(twoMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 2))
.thenReturn(CAPNP());
// Advance the event loop to process the storage response and complete the FLUSHING→CLEAN
// transition. Without this poll, the state is still FLUSHING when abandonAlarm runs, and
// the existing status check would protect it by accident, hiding the time-check regression.
ws.poll();

// abandonAlarm() for the original oneMs alarm must be a no-op: storedTime (twoMs) !=
// scheduledTime (oneMs), so the time check prevents clearing the new alarm.
test.cache.abandonAlarm(oneMs).wait(ws);

// getAlarm() must still return twoMs -- the new alarm was NOT incorrectly cleared.
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == twoMs);
}

} // namespace
} // namespace workerd
19 changes: 19 additions & 0 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ void ActorCache::cancelDeferredAlarmDeletion() {
}
}

kj::Promise<void> ActorCache::abandonAlarm(kj::Date scheduledTime) {
// Called when AlarmManager has given up retrying an alarm after too many counted failures.
// Clear the in-memory alarm state so getAlarm() returns null instead of a stale time.
// Only clear if we still have a stale KnownAlarmTime whose time matches the abandoned alarm.
// Guards against three cases we must not clobber:
// - DIRTY/FLUSHING: the user set a new alarm that hasn't flushed yet (status check).
// - DeferredAlarmDelete: a handler is in progress (tryGet<KnownAlarmTime>() won't match).
// - CLEAN with a different time: the user set a new alarm that already flushed (time check).
KJ_IF_SOME(t, currentAlarmTime.tryGet<KnownAlarmTime>()) {
KJ_IF_SOME(storedTime, t.time) {
if (t.status == KnownAlarmTime::Status::CLEAN && storedTime == scheduledTime) {
currentAlarmTime = KnownAlarmTime{
.status = KnownAlarmTime::Status::CLEAN, .time = kj::none, .noCache = t.noCache};
}
}
}
return kj::READY_NOW;
}

kj::Maybe<kj::Promise<void>> ActorCache::getBackpressure() {
if (dirtyList.sizeInBytes() > lru.options.dirtyListByteLimit && !lru.options.neverFlush) {
// Wait for dirty entries to be flushed.
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ class ActorCacheInterface: public ActorCacheOps {

virtual void cancelDeferredAlarmDeletion() = 0;

// Called by AlarmManager when it has given up retrying an alarm after too many counted failures.
// Implementations should clear the alarm from their local state so getAlarm() reflects the
// deletion.
virtual kj::Promise<void> abandonAlarm(kj::Date scheduledTime) {
return kj::READY_NOW;
}

virtual kj::Maybe<kj::Promise<void>> onNoPendingFlush(SpanParent parentSpan) = 0;

// Implements the respective PITR API calls. The default implementations throw JSG errors saying
Expand Down Expand Up @@ -380,6 +387,7 @@ class ActorCache final: public ActorCacheInterface {
bool noCache = false,
kj::StringPtr actorId = "") override;
void cancelDeferredAlarmDeletion() override;
kj::Promise<void> abandonAlarm(kj::Date scheduledTime) override;
kj::Maybe<kj::Promise<void>> onNoPendingFlush(SpanParent parentSpan) override;
// See ActorCacheInterface

Expand Down
101 changes: 101 additions & 0 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2899,5 +2899,106 @@ KJ_TEST("explicit transaction: commit failure breaks output gate even for unconf
KJ_EXPECT_THROW_MESSAGE("commit failed", promise.wait(test.ws));
}

KJ_TEST("ActorSqlite alarm cleared by abandonAlarm after max counted retry failures") {

ActorSqliteTest test;

test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted handler failures.
for (auto i = 0; i < 6 /* WorkerInterface::ALARM_RETRY_MAX_TRIES */; i++) {
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.actor.cancelDeferredAlarmDeletion();
// Each failure leaves alarm in SQLite (correct for retries 1-5).
test.pollAndExpectCalls({});
}

// The alarm scheduler has decided to give up. It calls abandonAlarm() on the actor:
// setAlarm(null) -> commit -> scheduleRun(none) (move-later path).
test.actor.abandonAlarm(oneMs).wait(test.ws);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
test.pollAndExpectCalls({});

// getAlarm() now returns null (alarm deleted from SQLite).
KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("ActorSqlite alarm preserved after ALARM_RETRY_MAX_TRIES uncounted (internal) failures") {
// When all ALARM_RETRY_MAX_TRIES failures are uncounted (retryCountsAgainstLimit=false,
// i.e. infrastructure errors), the alarm scheduler's countedRetry never reaches the limit and
// abandonAlarm is NEVER called. The alarm must remain set in SQLite throughout so that
// the scheduler can keep retrying indefinitely.

ActorSqliteTest test;

test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Simulate uncounted failures well past ALARM_RETRY_MAX_TRIES (= 6).
// countedRetry stays at 0; AlarmManager never gives up; abandonAlarm is never called.
// We've seen alarms fail hundreds of times due to infrastructure errors in production,
// so we check both at the boundary (6) and well beyond it (100).
for (auto i = 0; i < 100; i++) {
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.actor.cancelDeferredAlarmDeletion();
test.pollAndExpectCalls({});

// Check at the ALARM_RETRY_MAX_TRIES boundary and at the end.
if (i == 5 || i == 99) {
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}
}
}

KJ_TEST("ActorSqlite abandonAlarm is a no-op when a newer alarm has replaced the abandoned one") {
// If the user sets a new alarm between the last retry failure and the abandonAlarm() call,
// and it has already committed to SQLite, abandonAlarm() must compare the time and leave
// the new alarm untouched.

ActorSqliteTest test;

// Set the original alarm and commit it.
test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted failures.
for (auto i = 0; i < 6 /* WorkerInterface::ALARM_RETRY_MAX_TRIES */; i++) {
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.actor.cancelDeferredAlarmDeletion();
test.pollAndExpectCalls({});
}

// Race: user sets a new alarm (twoMs) between the last failure and abandonAlarm().
// The commit fires first; then the post-commit "move-later" logic fires scheduleRun(2ms)
// because alarmScheduledNoLaterThan (oneMs) is earlier than the newly committed twoMs.
test.setAlarm(twoMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

// abandonAlarm() for the original oneMs alarm must be a no-op: storedTime (twoMs) !=
// scheduledTime (oneMs), so the time check prevents clearing the new alarm.
test.actor.abandonAlarm(oneMs).wait(test.ws);
test.pollAndExpectCalls({}); // No commit or scheduleRun -- correct no-op.

// getAlarm() must still return twoMs.
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

} // namespace
} // namespace workerd
20 changes: 19 additions & 1 deletion src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "io-gate.h"

#include <workerd/jsg/exception.h>
#include <workerd/util/autogate.h>
#include <workerd/util/sentry.h>

#include <kj/exception.h>
Expand Down Expand Up @@ -1006,6 +1005,25 @@ void ActorSqlite::cancelDeferredAlarmDeletion() {
haveDeferredDelete = false;
}

kj::Promise<void> ActorSqlite::abandonAlarm(kj::Date scheduledTime) {
// Called when AlarmManager has given up retrying an alarm after too many counted failures.
// Clear the alarm from SQLite so getAlarm() returns null instead of a stale time.
// Only clear if SQLite currently has the exact alarm being abandoned and we're not mid-handler.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much have we thought through how this interacts with the general split-brain nature of alarms stored in sqlite vs in our AlarmManager system?

Currently, the invariant that we attempt to maintain is that the scheduledTime in the sqlite DB is >= the scheduled time in the backend AlarmManager, such that we're always guaranteed to be woken up before the scheduled time in sqlite.

But the fact that the two can get out of sync makes this look very fishy, since what's stopping a scenario where the time being abandoned is earlier than our time in sqlite, so we don't clear the time in sqlite, and then we're left with a time in sqlite but no time in the upstream AlarmManager (and thus we'd still potentially be telling callers of getAlarm() that an alarm is set when an alarm will never actually be invoked).

I think it means this is an incomplete fix that will still allow some DOs to get stuck in the state that we're attempting to fix with this change. But I'll dig in a bit more to try to confirm.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, opus 4.6 agrees this is a problem: https://share.opencode.cloudflare.dev/share/ad9z88O5

Its analysis (in the second message) looks correct to me. This is a real problem, at least in the case where sqlite's persisted alarm time is in the past. But it didn't give a great idea for a fix. Its proposal to clear the alarm if sqlite's scheduledTime is less than the current time is pretty good (maybe good enough?), although still not perfect.

Alternatively, we could try returning sqlite's scheduled time back in the response to the abandonAlarm RPC such that AlarmManager can update its stored scheduled time if appropriate (i.e. if it hadn't already been updated via a separate concurrent call from setAlarm). A bit more context about that idea is discussed in the fourth message of that chat session.

What do you think?

// The time check guards against the race where the user set a new alarm (which always has a
// time >= now() > scheduledTime due to past-time clamping in setAlarm) before this call arrived.
if (inAlarmHandler) {
// Shouldn't happen -- AlarmManager shouldn't call abandonAlarm while a handler is running.
LOG_WARNING_ONCE("abandonAlarm called while alarm handler is still running");
return kj::READY_NOW;
}
KJ_IF_SOME(storedTime, metadata.getAlarm()) {
if (storedTime == scheduledTime) {
setAlarm(kj::none, {}, nullptr);
}
}
return kj::READY_NOW;
}

kj::Maybe<kj::Promise<void>> ActorSqlite::onNoPendingFlush(SpanParent parentSpan) {
// This implements sync().
//
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
bool noCache = false,
kj::StringPtr actorId = "") override;
void cancelDeferredAlarmDeletion() override;
kj::Promise<void> abandonAlarm(kj::Date scheduledTime) override;
kj::Maybe<kj::Promise<void>> onNoPendingFlush(SpanParent parentSpan) override;
kj::Promise<kj::String> getCurrentBookmark(SpanParent parentSpan) override;
kj::Promise<void> waitForBookmark(kj::StringPtr bookmark, SpanParent parentSpan) override;
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class WorkerEntrypoint final: public WorkerInterface {
kj::Promise<void> prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<void> abandonAlarm(kj::Date scheduledTime) override;
kj::Promise<bool> test() override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

Expand Down Expand Up @@ -845,6 +846,20 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarm(
co_return result;
}

kj::Promise<void> WorkerEntrypoint::abandonAlarm(kj::Date scheduledTime) {
TRACE_EVENT("workerd", "WorkerEntrypoint::abandonAlarm()");
// This does not require running the user's alarm handler -- it's a pure actor-state cleanup.
// Access the actor directly from the IoContext without going through the JS dispatch machinery.
KJ_IF_SOME(req, incomingRequest) {
auto& actor = KJ_REQUIRE_NONNULL(
req->getContext().getActor(), "abandonAlarm() should only work with actors");
KJ_IF_SOME(persistent, actor.getPersistent()) {
return persistent.abandonAlarm(scheduledTime);
}
}
return kj::READY_NOW;
}

kj::Promise<bool> WorkerEntrypoint::test() {
TRACE_EVENT("workerd", "WorkerEntrypoint::test()");
auto incomingRequest =
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class PromisedWorkerInterface final: public WorkerInterface {
}
}

kj::Promise<void> abandonAlarm(kj::Date scheduledTime) override {
KJ_IF_SOME(w, worker) {
co_return co_await w->abandonAlarm(scheduledTime);
} else {
co_await promise;
co_return co_await KJ_ASSERT_NONNULL(worker)->abandonAlarm(scheduledTime);
}
}

kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
KJ_IF_SOME(w, worker) {
co_return co_await w->customEvent(kj::mv(event));
Expand Down Expand Up @@ -417,6 +426,12 @@ kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(
});
}

kj::Promise<void> RpcWorkerInterface::abandonAlarm(kj::Date scheduledTime) {
auto req = dispatcher.abandonAlarmRequest();
req.setScheduledTimeMs((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS);
return req.send().ignoreResult();
}

kj::Promise<WorkerInterface::CustomEvent::Result> RpcWorkerInterface::customEvent(
kj::Own<CustomEvent> event) {
return event->sendRpc(httpOverCapnpFactory, byteStreamFactory, dispatcher).attach(kj::mv(event));
Expand Down
Loading
Loading