Skip to content

Commit b6a3c40

Browse files
committed
Thread::Queue transfers ownership.
1 parent e513db7 commit b6a3c40

File tree

3 files changed

+387
-0
lines changed

3 files changed

+387
-0
lines changed

lib/async/safe/builtins.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,30 @@
2020

2121
# ObjectSpace::WeakMap is async-safe:
2222
ObjectSpace::WeakMap::ASYNC_SAFE = true
23+
24+
module Async
25+
module Safe
26+
module TransferableThreadQueue
27+
def pop(...)
28+
object = super(...)
29+
Async::Safe.transfer(object)
30+
object
31+
end
32+
33+
def deq(...)
34+
object = super(...)
35+
Async::Safe.transfer(object)
36+
object
37+
end
38+
39+
def shift(...)
40+
object = super(...)
41+
Async::Safe.transfer(object)
42+
object
43+
end
44+
end
45+
end
46+
end
47+
48+
Thread::Queue.prepend(Async::Safe::TransferableThreadQueue)
49+
Thread::SizedQueue.prepend(Async::Safe::TransferableThreadQueue)

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- `Thread::Queue` transfers ownership of objects popped from it.
6+
37
## v0.1.0
48

59
- Implement TracePoint-based ownership tracking.

test/async/safe/builtins.rb

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/safe"
7+
require "set"
8+
9+
describe "Async::Safe Builtins" do
10+
before do
11+
Async::Safe.enable!
12+
end
13+
14+
after do
15+
Async::Safe.disable!
16+
end
17+
18+
# Simple test to verify the transfer mechanism works
19+
it "can manually transfer object ownership" do
20+
test_object = Class.new do
21+
def process
22+
"processed"
23+
end
24+
end.new
25+
26+
# Use the object in main fiber to establish ownership
27+
test_object.process
28+
29+
# Access from different fiber should raise error
30+
expect do
31+
Fiber.new do
32+
test_object.process
33+
end.resume
34+
end.to raise_exception(Async::Safe::ViolationError)
35+
36+
# But after manual transfer, it should work
37+
Fiber.new do
38+
Async::Safe.transfer(test_object)
39+
test_object.process # Should not raise
40+
end.resume
41+
end
42+
43+
with "Thread::Queue" do
44+
it "is marked as async-safe" do
45+
expect(Thread::Queue.async_safe?).to be == true
46+
end
47+
48+
it "allows concurrent access without transfer" do
49+
queue = Thread::Queue.new
50+
queue.push("item1")
51+
52+
expect do
53+
Fiber.new do
54+
queue.push("item2") # Should be OK - class is async-safe
55+
end.resume
56+
end.not.to raise_exception
57+
end
58+
59+
it "transfers ownership of objects via pop" do
60+
queue = Thread::Queue.new
61+
62+
# Create an object that will be monitored
63+
test_object = Class.new do
64+
def process
65+
"processed"
66+
end
67+
end.new
68+
69+
# Use the object in main fiber to establish ownership
70+
test_object.process
71+
72+
# Push it into the queue
73+
queue.push(test_object)
74+
75+
# Pop from different fiber - should transfer ownership
76+
result = nil
77+
exception_raised = false
78+
79+
begin
80+
Fiber.new do
81+
result = queue.pop
82+
# Should be able to use the object without violation after transfer
83+
result.process
84+
end.resume
85+
rescue Async::Safe::ViolationError
86+
exception_raised = true
87+
end
88+
89+
# The transfer should work, so no exception should be raised
90+
expect(exception_raised).to be == false
91+
expect(result).to be == test_object
92+
end
93+
94+
it "transfers ownership of objects via deq" do
95+
queue = Thread::Queue.new
96+
97+
# Create an object that will be monitored
98+
test_object = Class.new do
99+
def process
100+
"processed"
101+
end
102+
end.new
103+
104+
# Use the object in main fiber to establish ownership
105+
test_object.process
106+
107+
# Push it into the queue
108+
queue.push(test_object)
109+
110+
# Deq from different fiber - should transfer ownership
111+
result = nil
112+
exception_raised = false
113+
114+
begin
115+
Fiber.new do
116+
result = queue.deq
117+
# Should be able to use the object without violation after transfer
118+
result.process
119+
end.resume
120+
rescue Async::Safe::ViolationError
121+
exception_raised = true
122+
end
123+
124+
# The transfer should work, so no exception should be raised
125+
expect(exception_raised).to be == false
126+
expect(result).to be == test_object
127+
end
128+
129+
it "transfers ownership of objects via shift" do
130+
queue = Thread::Queue.new
131+
132+
# Create an object that will be monitored
133+
test_object = Class.new do
134+
def process
135+
"processed"
136+
end
137+
end.new
138+
139+
# Use the object in main fiber to establish ownership
140+
test_object.process
141+
142+
# Push it into the queue
143+
queue.push(test_object)
144+
145+
# Shift from different fiber - should transfer ownership
146+
result = nil
147+
exception_raised = false
148+
149+
begin
150+
Fiber.new do
151+
result = queue.shift
152+
# Should be able to use the object without violation after transfer
153+
result.process
154+
end.resume
155+
rescue Async::Safe::ViolationError
156+
exception_raised = true
157+
end
158+
159+
# The transfer should work, so no exception should be raised
160+
expect(exception_raised).to be == false
161+
expect(result).to be == test_object
162+
end
163+
164+
it "handles multiple objects in queue correctly" do
165+
queue = Thread::Queue.new
166+
167+
# Create multiple objects with different owners
168+
object1 = Class.new do
169+
def id; 1; end
170+
end.new
171+
172+
object2 = Class.new do
173+
def id; 2; end
174+
end.new
175+
176+
# Establish ownership in main fiber
177+
object1.id
178+
object2.id
179+
180+
# Push both objects
181+
queue.push(object1)
182+
queue.push(object2)
183+
184+
# Pop from different fiber
185+
results = []
186+
ids = []
187+
exception_raised = false
188+
189+
begin
190+
Fiber.new do
191+
results << queue.pop # Should get object2 (LIFO for Queue)
192+
results << queue.pop # Should get object1
193+
194+
# Should be able to use both objects after transfer
195+
ids << results[0].id
196+
ids << results[1].id
197+
end.resume
198+
rescue Async::Safe::ViolationError
199+
exception_raised = true
200+
end
201+
202+
expect(exception_raised).to be == false
203+
# Queue order can vary, but both objects should be retrievable
204+
expect(ids.sort).to be == [1, 2]
205+
end
206+
end
207+
208+
with "Thread::SizedQueue" do
209+
it "is marked as async-safe" do
210+
expect(Thread::SizedQueue.async_safe?).to be == true
211+
end
212+
213+
it "allows concurrent access without transfer" do
214+
queue = Thread::SizedQueue.new(2)
215+
queue.push("item1")
216+
217+
expect do
218+
Fiber.new do
219+
queue.push("item2") # Should be OK - class is async-safe
220+
end.resume
221+
end.not.to raise_exception
222+
end
223+
224+
it "transfers ownership of objects via pop" do
225+
queue = Thread::SizedQueue.new(2)
226+
227+
# Create an object that will be monitored
228+
test_object = Class.new do
229+
def process
230+
"processed"
231+
end
232+
end.new
233+
234+
# Use the object in main fiber to establish ownership
235+
test_object.process
236+
237+
# Push it into the queue
238+
queue.push(test_object)
239+
240+
# Pop from different fiber - should transfer ownership
241+
result = nil
242+
exception_raised = false
243+
244+
begin
245+
Fiber.new do
246+
result = queue.pop
247+
# Should be able to use the object without violation after transfer
248+
result.process
249+
end.resume
250+
rescue Async::Safe::ViolationError
251+
exception_raised = true
252+
end
253+
254+
# The transfer should work, so no exception should be raised
255+
expect(exception_raised).to be == false
256+
expect(result).to be == test_object
257+
end
258+
259+
it "transfers ownership with size limits" do
260+
queue = Thread::SizedQueue.new(1) # Only allow 1 item
261+
262+
# Create an object
263+
test_object = Class.new do
264+
def process
265+
"processed"
266+
end
267+
end.new
268+
269+
# Establish ownership
270+
test_object.process
271+
272+
# Fill the queue
273+
queue.push(test_object)
274+
275+
# Pop from different fiber to test transfer
276+
result = nil
277+
exception_raised = false
278+
279+
begin
280+
Fiber.new do
281+
result = queue.pop
282+
# Should be able to use the object without violation after transfer
283+
result.process
284+
end.resume
285+
rescue Async::Safe::ViolationError
286+
exception_raised = true
287+
end
288+
289+
# The transfer should work, so no exception should be raised
290+
expect(exception_raised).to be == false
291+
expect(result).to be == test_object
292+
end
293+
end
294+
295+
with "Immutable objects" do
296+
it "doesn't track ownership for frozen objects" do
297+
queue = Thread::Queue.new
298+
299+
# Frozen objects should not be tracked
300+
frozen_string = "test".freeze
301+
frozen_array = [1, 2, 3].freeze
302+
303+
queue.push(frozen_string)
304+
queue.push(frozen_array)
305+
306+
# Should be able to access from different fiber without issue
307+
retrieved_string = nil
308+
retrieved_array = nil
309+
exception_raised = false
310+
311+
begin
312+
Fiber.new do
313+
retrieved_string = queue.pop
314+
retrieved_array = queue.pop
315+
end.resume
316+
rescue Async::Safe::ViolationError
317+
exception_raised = true
318+
end
319+
320+
# These shouldn't cause violations since they're frozen
321+
expect(exception_raised).to be == false
322+
expect(retrieved_string).to be == "test"
323+
expect(retrieved_array).to be == [1, 2, 3]
324+
end
325+
326+
it "doesn't track basic immutable values" do
327+
queue = Thread::Queue.new
328+
329+
# Basic immutable values
330+
queue.push(nil)
331+
queue.push(true)
332+
queue.push(false)
333+
queue.push(42)
334+
queue.push(:symbol)
335+
336+
values = []
337+
exception_raised = false
338+
339+
begin
340+
Fiber.new do
341+
5.times { values << queue.pop }
342+
end.resume
343+
rescue Async::Safe::ViolationError
344+
exception_raised = true
345+
end
346+
347+
expect(exception_raised).to be == false
348+
# Check that all expected values are present (order may vary due to queue LIFO behavior)
349+
expect(values.length).to be == 5
350+
# Convert to sets for comparison since order doesn't matter
351+
actual_set = Set.new(values)
352+
expected_set = Set.new([nil, true, false, 42, :symbol])
353+
expect(actual_set).to be == expected_set
354+
end
355+
end
356+
end

0 commit comments

Comments
 (0)