Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,56 @@ KJ_TEST("rejected move-later alarm scheduling request does not break gate") {
test.pollAndExpectCalls({"commit"})[0]->fulfill();
}

KJ_TEST("rapid move-later alarm changes coalesce into bounded scheduleRun calls") {
// When many commits each move the alarm time later while a scheduleRun is already in-flight,
// the scheduleLaterAlarm mechanism should coalesce them into at most one pending request,
// rather than chaining N promises (one per commit).
ActorSqliteTest test;

// Initialize alarm state to 1ms.
test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Move alarm to 2ms. The db commit completes, triggering a post-commit scheduleRun(2ms)
// since the alarm moved later.
test.setAlarm(twoMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
// The first move-later scheduleRun starts.
auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]);

// While 2ms scheduleRun is in-flight, move alarm to 3ms, 4ms, 5ms in rapid succession.
// Each commit completes immediately but the scheduleRun for 2ms is still pending.
// Only the final value (5ms) should be scheduled after the 2ms scheduleRun completes.
test.setAlarm(threeMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == threeMs);

test.setAlarm(fourMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == fourMs);

test.setAlarm(fiveMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);

// Now fulfill the 2ms scheduleRun. The coalesced pending time (5ms) should be scheduled next.
fulfiller2Ms->fulfill();
auto fulfiller5Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]);
// Importantly, there is exactly one scheduleRun(5ms), not three separate calls for 3ms, 4ms, 5ms.

fulfiller5Ms->fulfill();
test.pollAndExpectCalls({});

KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);
}

KJ_TEST("an exception thrown during merged commits does not hang") {
ActorSqliteTest test({.monitorOutputGate = false});

Expand Down
95 changes: 60 additions & 35 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,54 @@ kj::Promise<void> ActorSqlite::requestScheduledAlarm(
});
}

void ActorSqlite::scheduleLaterAlarm(
kj::Maybe<kj::Date> newAlarmTime, SpanParent parentSpan) {
if (alarmLaterIsInFlight) {
// There's already a move-later request in-flight. Just store the desired time; the in-flight
// request's completion handler will pick it up and start a new request. This overwrites any
// previously pending time, which is fine -- only the latest value matters.
pendingLaterAlarmTime = newAlarmTime;
return;
}

alarmLaterIsInFlight = true;
alarmLaterInFlight =
requestScheduledAlarm(newAlarmTime, alarmLaterInFlight.addBranch())
.attach(parentSpan.newChild("actor_sqlite_alarm_sync"_kjc))
.catch_([](kj::Exception&& e) {
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
// eventually fire at the earlier time, and the rescheduling will be retried.
// We catch here to prevent the chain from breaking on errors.
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm failed", e);
})
.fork();

commitTasks.add(alarmLaterInFlight.addBranch().then([this]() {
alarmLaterIsInFlight = false;
KJ_IF_SOME(nextTime, pendingLaterAlarmTime) {
pendingLaterAlarmTime = kj::none;
scheduleLaterAlarm(nextTime, nullptr);
}
}).catch_([](kj::Exception&& e) {
// Move-later alarm failures are non-fatal; catch here to prevent taskFailed() from
// breaking the output gate.
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm drain failed", e);
}));
}

ActorSqlite::PrecommitAlarmState ActorSqlite::startPrecommitAlarmScheduling() {
PrecommitAlarmState state;
if (pendingCommit == kj::none &&
willFireEarlier(metadata.getAlarm(), alarmScheduledNoLaterThan)) {
// We must wait on the `alarmLaterChain` here, otherwise, if there is a pending "move later"
// alarm task and it fails, our "move earlier" alarm might interleave, succeed, and be followed
// by a retry of the "move later" alarm. This happens because "move later" alarms complete after
// we commit to local SQLite.
// We must wait on the `alarmLaterInFlight` promise here, otherwise, if there is a pending
// "move later" alarm task and it fails, our "move earlier" alarm might interleave, succeed,
// and be followed by a retry of the "move later" alarm. This happens because "move later"
// alarms complete after we commit to local SQLite.
//
// By waiting on any pending "move later" alarm, we correctly serialize our `scheduleRun()`
// By waiting on any in-flight "move later" alarm, we correctly serialize our `scheduleRun()`
// calls to the alarm manager.
state.schedulingPromise =
requestScheduledAlarm(metadata.getAlarm(), alarmLaterChain.addBranch());
requestScheduledAlarm(metadata.getAlarm(), alarmLaterInFlight.addBranch());
}
return kj::mv(state);
}
Expand Down Expand Up @@ -466,17 +501,18 @@ kj::Promise<void> ActorSqlite::commitImpl(
KJ_LOG(WARNING, "NOSENTRY DEBUG_ALARM: Move earlier loop iteration", syncIterations,
logDate(currentAlarmState), logDate(alarmScheduledNoLaterThan), alarmVersion);
}
// Note that we do not pass alarmLaterChain here. We don't need to for the following reasons:
// Note that we do not pass alarmLaterInFlight here. We don't need to for the following
// reasons:
//
// 1. We already waited for the chain in the precommitAlarmState promise above.
// 1. We already waited for it in the precommitAlarmState promise above.
// 2. We set the `pendingCommit` prior to yielding to the event loop earlier, so any subsequent
// commits have to wait for us to fulfill the pendingCommit promise. In short, no one could
// have added another "move-later" alarm to the chain, not until we finish.
// have started another "move-later" alarm, not until we finish.
//
// While we *could* pass the alarmLaterChain promise (it wouldn't be incorrect), when calling
// addBranch() on a resolved ForkedPromise, the continuation would be evaluated on a future turn
// of the event loop. That means we're going to suspend, even if the promise is ready, which
// means we'd take a performance hit.
// While we *could* pass the alarmLaterInFlight promise (it wouldn't be incorrect), when
// calling addBranch() on a resolved ForkedPromise, the continuation would be evaluated on a
// future turn of the event loop. That means we're going to suspend, even if the promise is
// ready, which means we'd take a performance hit.
co_await requestScheduledAlarm(metadata.getAlarm(), kj::READY_NOW);
syncIterations++;
}
Expand Down Expand Up @@ -535,20 +571,7 @@ kj::Promise<void> ActorSqlite::commitImpl(
KJ_LOG(WARNING, "NOSENTRY DEBUG_ALARM: Moving alarm later", "sqlite_has",
logDate(alarmStateForCommit), logDate(alarmScheduledNoLaterThan), alarmVersion);
}
// We need to extend our alarmLaterChain now that we're adding a new "move-later" alarm task.
//
// Technically, we don't need serialize our "move-later" alarms since SQLite has the later
// time committed locally. We could just set the `alarmLaterChain` and pass a `kj::READY_NOW`
// to requestScheduledAlarm, and so if we have a partial failure we would just recover when
// the alarm runs early. That said, it doesn't hurt to serialize on the client-side.
alarmLaterChain = requestScheduledAlarm(alarmStateForCommit, alarmLaterChain.addBranch())
.attach(commitSpan.newChild("actor_sqlite_alarm_sync"_kjc))
.catch_([](kj::Exception&& e) {
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
// eventually fire at the earlier time, and the rescheduling will be retried.
// We catch here to prevent the chain from breaking on errors.
LOG_WARNING_PERIODICALLY("NOSENTRY SQLite reschedule later alarm failed", e);
}).fork();
scheduleLaterAlarm(alarmStateForCommit, SpanParent(commitSpan));
}
}
}
Expand Down Expand Up @@ -945,14 +968,16 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
"NOSENTRY SQLite alarm handler canceled with requestScheduledAlarm.", scheduledTime,
localAlarmState.orDefault(kj::UNIX_EPOCH), actorId);

// Since we're requesting to move the alarm time to later, we need to add to our
// `alarmLaterChain`. Note that for the chain, we want to make sure any scheduling failure
// does not break us, but for the `CancelAlarmHandler`, we want the caller to receive the
// exception normally, so we do not consume the exception.
// Since we're requesting to move the alarm time to later, we need to update the
// alarmLaterInFlight promise. We issue a single requestScheduledAlarm call, fork it,
// and use one branch for tracking (with error catching) and the other for the caller
// (which propagates errors). Note that we directly update alarmLaterInFlight here
// rather than using scheduleLaterAlarm(), because we need a separate un-caught branch
// of the promise for the CancelAlarmHandler return value.
auto schedulingPromise =
requestScheduledAlarm(localAlarmState, alarmLaterChain.addBranch()).fork();
alarmLaterChain = schedulingPromise.addBranch()
.catch_([](kj::Exception&& e) {
requestScheduledAlarm(localAlarmState, alarmLaterInFlight.addBranch()).fork();
alarmLaterInFlight = schedulingPromise.addBranch()
.catch_([](kj::Exception&& e) {
// If an exception occurs when scheduling the alarm later, it's OK -- the alarm will
// eventually fire at the earlier time, and the rescheduling will be retried.
// We catch here to prevent the chain from breaking on errors.
Expand All @@ -978,7 +1003,7 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
// handler invocation.
//
// We pass kj::READY_NOW because being in this branch (SQLite is ahead of the alarm manager)
// means there's no recent move-later operation to wait for, so no need for alarmLaterChain.
// means there's no recent move-later operation to wait for, so no need for alarmLaterInFlight.
return CancelAlarmHandler{
.waitBeforeCancel = requestScheduledAlarm(localAlarmState, kj::READY_NOW)};
}
Expand Down
23 changes: 19 additions & 4 deletions src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// Makes a request to the alarm manager to run the alarm handler at the given time, returning
// a promise that resolves when the scheduling has succeeded. `priorTask` is any work we must
// wait on prior to scheduling the new request, as of this writing, this would be the
// alarmLaterChain, which holds promises to move the alarm time "later" than is currently set.
// alarmLaterInFlight promise, which tracks any in-flight request to move the alarm "later"
// than is currently set.
virtual kj::Promise<void> scheduleRun(
kj::Maybe<kj::Date> newAlarmTime, kj::Promise<void> priorTask);

Expand Down Expand Up @@ -263,9 +264,18 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// for the output gate lock hold trace when a non-allowUnconfirmed write occurs.
SpanParent currentCommitSpan = nullptr;

// Promise chain for serializing "move alarm later" operations to prevent races
// at the alarm manager. Each update waits for the previous one to complete.
kj::ForkedPromise<void> alarmLaterChain = kj::Promise<void>(kj::READY_NOW).fork();
// Promise for the currently in-flight "move alarm later" operation, if any.
// Used to serialize move-earlier operations against any pending move-later operation.
kj::ForkedPromise<void> alarmLaterInFlight = kj::Promise<void>(kj::READY_NOW).fork();

// True when a "move alarm later" request is currently in-flight via scheduleLaterAlarm().
bool alarmLaterIsInFlight = false;

// When a "move alarm later" request is already in-flight and we need to schedule another one,
// we store the desired alarm time here. When the in-flight request completes, it checks this variable and
// starts a new request if needed. The outer Maybe indicates whether there is a pending time at
// all; the inner Maybe<Date> is the alarm time to set (where kj::none means "clear the alarm").
kj::Maybe<kj::Maybe<kj::Date>> pendingLaterAlarmTime;

// Version counter that increments on every alarm change. Used to detect if another commit
// modified the alarm while we were async, allowing us to skip redundant post-commit alarm
Expand All @@ -286,6 +296,11 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
kj::Promise<void> requestScheduledAlarm(
kj::Maybe<kj::Date> requestedTime, kj::Promise<void> priorTask);

// Schedules a "move alarm later" operation. If no move-later is currently in-flight, starts one
// immediately. If one is already in-flight, stores the desired time in `pendingLaterAlarmTime`
// so it will be picked up when the current in-flight operation completes.
void scheduleLaterAlarm(kj::Maybe<kj::Date> newAlarmTime, SpanParent parentSpan);

struct PrecommitAlarmState {
// Promise for the completion of precommit alarm scheduling
kj::Maybe<kj::Promise<void>> schedulingPromise;
Expand Down
Loading