Skip to content

Commit f1a865f

Browse files
Ensure a job enqueued on a worker must be run within the same macro task
1 parent 3b92b8c commit f1a865f

File tree

2 files changed

+225
-28
lines changed

2 files changed

+225
-28
lines changed

Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift

+61-28
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ import WASILibc
8787
/// }
8888
/// ```
8989
///
90+
/// ## Scheduling invariants
91+
///
92+
/// * Jobs enqueued on a worker are guaranteed to run within the same macrotask in which they were scheduled.
93+
///
9094
/// ## Known limitations
9195
///
9296
/// Currently, the Cooperative Global Executor of Swift runtime has a bug around
@@ -135,22 +139,26 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
135139
/// +---------+ +------------+
136140
/// +----->| Idle |--[terminate]-->| Terminated |
137141
/// | +---+-----+ +------------+
138-
/// | |
139-
/// | [enqueue]
140-
/// | |
141-
/// [no more job] |
142-
/// | v
143-
/// | +---------+
144-
/// +------| Running |
145-
/// +---------+
142+
/// | | \
143+
/// | | \------------------+
144+
/// | | |
145+
/// | [enqueue] [enqueue] (on other thread)
146+
/// | | |
147+
/// [no more job] | |
148+
/// | v v
149+
/// | +---------+ +---------+
150+
/// +------| Running |<--[wake]--| Ready |
151+
/// +---------+ +---------+
146152
///
147153
enum State: UInt32, AtomicRepresentable {
148154
/// The worker is idle and waiting for a new job.
149155
case idle = 0
156+
/// A wake message is sent to the worker, but it has not been received it yet
157+
case ready = 1
150158
/// The worker is processing a job.
151-
case running = 1
159+
case running = 2
152160
/// The worker is terminated.
153-
case terminated = 2
161+
case terminated = 3
154162
}
155163
let state: Atomic<State> = Atomic(.idle)
156164
/// TODO: Rewrite it to use real queue :-)
@@ -197,32 +205,42 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
197205
func enqueue(_ job: UnownedJob) {
198206
statsIncrement(\.enqueuedJobs)
199207
var locked: Bool
208+
let onTargetThread = Self.currentThread === self
209+
// If it's on the thread and it's idle, we can directly schedule a `Worker/run` microtask.
210+
let desiredState: State = onTargetThread ? .running : .ready
200211
repeat {
201212
let result: Void? = jobQueue.withLockIfAvailable { queue in
202213
queue.append(job)
214+
trace("Worker.enqueue idle -> running")
203215
// Wake up the worker to process a job.
204-
switch state.exchange(.running, ordering: .sequentiallyConsistent) {
205-
case .idle:
206-
if Self.currentThread === self {
216+
trace("Worker.enqueue idle -> \(desiredState)")
217+
switch state.compareExchange(expected: .idle, desired: desiredState, ordering: .sequentiallyConsistent) {
218+
case (true, _):
219+
if onTargetThread {
207220
// Enqueueing a new job to the current worker thread, but it's idle now.
208221
// This is usually the case when a continuation is resumed by JS events
209222
// like `setTimeout` or `addEventListener`.
210223
// We can run the job and subsequently spawned jobs immediately.
211-
// JSPromise.resolve(JSValue.undefined).then { _ in
212-
_ = JSObject.global.queueMicrotask!(
213-
JSOneshotClosure { _ in
214-
self.run()
215-
return JSValue.undefined
216-
}
217-
)
224+
scheduleRunWithinMacroTask()
218225
} else {
219226
let tid = self.tid.load(ordering: .sequentiallyConsistent)
220227
swjs_wake_up_worker_thread(tid)
221228
}
222-
case .running:
229+
case (false, .idle):
230+
preconditionFailure("unreachable: idle -> \(desiredState) should return exchanged=true")
231+
case (false, .ready):
232+
// A wake message is sent to the worker, but it has not been received it yet
233+
if onTargetThread {
234+
// This means the job is enqueued outside of `Worker/run` (typically triggered
235+
// JS microtasks not awaited by Swift), then schedule a `Worker/run` within
236+
// the same macrotask.
237+
state.store(.running, ordering: .sequentiallyConsistent)
238+
scheduleRunWithinMacroTask()
239+
}
240+
case (false, .running):
223241
// The worker is already running, no need to wake up.
224242
break
225-
case .terminated:
243+
case (false, .terminated):
226244
// Will not wake up the worker because it's already terminated.
227245
break
228246
}
@@ -231,7 +249,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
231249
} while !locked
232250
}
233251

234-
func scheduleNextRun() {
252+
func scheduleRunWithinMacroTask() {
235253
_ = JSObject.global.queueMicrotask!(
236254
JSOneshotClosure { _ in
237255
self.run()
@@ -265,12 +283,27 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
265283
trace("Worker.start tid=\(tid)")
266284
}
267285

286+
/// On receiving a wake-up message from other thread
287+
func wakeUpFromOtherThread() {
288+
let (exchanged, _) = state.compareExchange(
289+
expected: .ready,
290+
desired: .running,
291+
ordering: .sequentiallyConsistent
292+
)
293+
guard exchanged else {
294+
// `Worker/run` was scheduled on the thread before JS event loop starts
295+
// a macrotask handling wake-up message.
296+
return
297+
}
298+
run()
299+
}
300+
268301
/// Process jobs in the queue.
269302
///
270303
/// Return when the worker has no more jobs to run or terminated.
271304
/// This method must be called from the worker thread after the worker
272305
/// is started by `start(executor:)`.
273-
func run() {
306+
private func run() {
274307
trace("Worker.run")
275308
guard let executor = parentTaskExecutor else {
276309
preconditionFailure("The worker must be started with a parent executor.")
@@ -290,7 +323,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
290323
queue.removeFirst()
291324
return job
292325
}
293-
// No more jobs to run now. Wait for a new job to be enqueued.
326+
// No more jobs to run now.
294327
let (exchanged, original) = state.compareExchange(
295328
expected: .running,
296329
desired: .idle,
@@ -301,7 +334,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
301334
case (true, _):
302335
trace("Worker.run exited \(original) -> idle")
303336
return nil // Regular case
304-
case (false, .idle):
337+
case (false, .idle), (false, .ready):
305338
preconditionFailure("unreachable: Worker/run running in multiple threads!?")
306339
case (false, .running):
307340
preconditionFailure("unreachable: running -> idle should return exchanged=true")
@@ -657,12 +690,12 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) {
657690
@_expose(wasm, "swjs_wake_worker_thread")
658691
#endif
659692
func _swjs_wake_worker_thread() {
660-
WebWorkerTaskExecutor.Worker.currentThread!.run()
693+
WebWorkerTaskExecutor.Worker.currentThread!.wakeUpFromOtherThread()
661694
}
662695

663696
private func trace(_ message: String) {
664697
#if JAVASCRIPTKIT_TRACE
665-
JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
698+
_ = JSObject.global.console.warn("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
666699
#endif
667700
}
668701

Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift

+164
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#if compiler(>=6.1) && _runtime(_multithreaded)
2+
import Synchronization
23
import XCTest
34
import _CJavaScriptKit // For swjs_get_worker_thread_id
45
@testable import JavaScriptKit
@@ -22,6 +23,7 @@ func pthread_mutex_lock(_ mutex: UnsafeMutablePointer<pthread_mutex_t>) -> Int32
2223
}
2324
#endif
2425

26+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
2527
final class WebWorkerTaskExecutorTests: XCTestCase {
2628
func testTaskRunOnMainThread() async throws {
2729
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
@@ -97,6 +99,168 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
9799
executor.terminate()
98100
}
99101

102+
func testScheduleJobWithinMacroTask1() async throws {
103+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
104+
defer { executor.terminate() }
105+
106+
final class Context: @unchecked Sendable {
107+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
108+
let hasEnqueuedFromMain = Atomic<Bool>(false)
109+
let hasReachedNextMacroTask = Atomic<Bool>(false)
110+
let hasJobBEnded = Atomic<Bool>(false)
111+
let hasJobCEnded = Atomic<Bool>(false)
112+
}
113+
114+
// Scenario 1.
115+
// | Main | Worker |
116+
// | +---------------------+--------------------------+
117+
// | | | Start JS macrotask |
118+
// | | | Start 1st wake-loop |
119+
// | | | Enq JS microtask A |
120+
// | | | End 1st wake-loop |
121+
// | | | Start a JS microtask A |
122+
// time | Enq job B to Worker | [PAUSE] |
123+
// | | | Enq Swift job C |
124+
// | | | End JS microtask A |
125+
// | | | Start 2nd wake-loop |
126+
// | | | Run Swift job B |
127+
// | | | Run Swift job C |
128+
// | | | End 2nd wake-loop |
129+
// v | | End JS macrotask |
130+
// +---------------------+--------------------------+
131+
132+
let context = Context()
133+
Task {
134+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
135+
try! await Task.sleep(nanoseconds: 1_000)
136+
}
137+
// Enqueue job B to Worker
138+
Task(executorPreference: executor) {
139+
XCTAssertFalse(isMainThread())
140+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
141+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
142+
}
143+
XCTAssertTrue(isMainThread())
144+
// Resume worker thread to let it enqueue job C
145+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
146+
}
147+
148+
// Start worker
149+
await Task(executorPreference: executor) {
150+
// Schedule a new macrotask to detect if the current macrotask has completed
151+
JSObject.global.setTimeout.function!(JSOneshotClosure { _ in
152+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
153+
return .undefined
154+
}, 0)
155+
156+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
157+
JSObject.global.queueMicrotask.function!(JSOneshotClosure { _ in
158+
// Resume the main thread and let it enqueue job B
159+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
160+
// Wait until the enqueue has completed
161+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
162+
// Should be still in the same macrotask
163+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
164+
// Enqueue job C
165+
Task(executorPreference: executor) {
166+
// Should be still in the same macrotask
167+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
168+
// Notify that job C has completed
169+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
170+
}
171+
return .undefined
172+
}, 0)
173+
// Wait until job B, C and the next macrotask have completed
174+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) ||
175+
!context.hasJobCEnded.load(ordering: .sequentiallyConsistent) ||
176+
!context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) {
177+
try! await Task.sleep(nanoseconds: 1_000)
178+
}
179+
}.value
180+
}
181+
182+
func testScheduleJobWithinMacroTask2() async throws {
183+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
184+
defer { executor.terminate() }
185+
186+
final class Context: @unchecked Sendable {
187+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
188+
let hasEnqueuedFromMain = Atomic<Bool>(false)
189+
let hasReachedNextMacroTask = Atomic<Bool>(false)
190+
let hasJobBEnded = Atomic<Bool>(false)
191+
let hasJobCEnded = Atomic<Bool>(false)
192+
}
193+
194+
// Scenario 2.
195+
// (The order of enqueue of job B and C are reversed from Scenario 1)
196+
//
197+
// | Main | Worker |
198+
// | +---------------------+--------------------------+
199+
// | | | Start JS macrotask |
200+
// | | | Start 1st wake-loop |
201+
// | | | Enq JS microtask A |
202+
// | | | End 1st wake-loop |
203+
// | | | Start a JS microtask A |
204+
// | | | Enq Swift job C |
205+
// time | Enq job B to Worker | [PAUSE] |
206+
// | | | End JS microtask A |
207+
// | | | Start 2nd wake-loop |
208+
// | | | Run Swift job B |
209+
// | | | Run Swift job C |
210+
// | | | End 2nd wake-loop |
211+
// v | | End JS macrotask |
212+
// +---------------------+--------------------------+
213+
214+
let context = Context()
215+
Task {
216+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
217+
try! await Task.sleep(nanoseconds: 1_000)
218+
}
219+
// Enqueue job B to Worker
220+
Task(executorPreference: executor) {
221+
XCTAssertFalse(isMainThread())
222+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
223+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
224+
}
225+
XCTAssertTrue(isMainThread())
226+
// Resume worker thread to let it enqueue job C
227+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
228+
}
229+
230+
// Start worker
231+
await Task(executorPreference: executor) {
232+
// Schedule a new macrotask to detect if the current macrotask has completed
233+
JSObject.global.setTimeout.function!(JSOneshotClosure { _ in
234+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
235+
return .undefined
236+
}, 0)
237+
238+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
239+
JSObject.global.queueMicrotask.function!(JSOneshotClosure { _ in
240+
// Enqueue job C
241+
Task(executorPreference: executor) {
242+
// Should be still in the same macrotask
243+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
244+
// Notify that job C has completed
245+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
246+
}
247+
// Resume the main thread and let it enqueue job B
248+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
249+
// Wait until the enqueue has completed
250+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
251+
// Should be still in the same macrotask
252+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
253+
return .undefined
254+
}, 0)
255+
// Wait until job B, C and the next macrotask have completed
256+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) ||
257+
!context.hasJobCEnded.load(ordering: .sequentiallyConsistent) ||
258+
!context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) {
259+
try! await Task.sleep(nanoseconds: 1_000)
260+
}
261+
}.value
262+
}
263+
100264
func testTaskGroupRunOnSameThread() async throws {
101265
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3)
102266

0 commit comments

Comments
 (0)