Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions spec/std/fiber_spec.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "spec"
require "wait_group"

describe Fiber do
it "#resumable?" do
Expand All @@ -18,4 +19,61 @@ describe Fiber do

resumable.should be_false
end

describe ".sleep" do
it "expires" do
cancelation_token = nil
channel = Channel(Fiber::TimeoutResult).new

spawn do
result = Fiber.sleep(10.milliseconds) { |token| cancelation_token = token }
channel.send(result)
end

channel.receive.should eq(Fiber::TimeoutResult::EXPIRED)
end

it "is canceled" do
cancelation_token = nil
channel = Channel(Fiber::TimeoutResult).new

fiber = spawn do
result = Fiber.sleep(1.second) { |token| cancelation_token = token }
channel.send(result)
end

until cancelation_token
Fiber.yield
end

if fiber.resolve_timer?(cancelation_token.not_nil!)
fiber.enqueue
end

channel.receive.should eq(Fiber::TimeoutResult::CANCELED)
end

it "expires or is canceled" do
20.times do
WaitGroup.wait do |wg|
cancelation_token = nil

suspended_fiber = wg.spawn do
Fiber.sleep(10.milliseconds) do |token|
# save the token so another fiber can try to cancel the timer
cancelation_token = token
end
end

sleep rand(9..11).milliseconds

# let's try to cancel the timer
if suspended_fiber.resolve_timer?(cancelation_token.not_nil!)
# canceled: we must enqueue the fiber
suspended_fiber.enqueue
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ end
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal.trace :sched, "sleep", for: time
Crystal::EventLoop.current.sleep(time)
Fiber.sleep(time) { }
end

# Blocks the current fiber forever.
Expand Down
8 changes: 6 additions & 2 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ abstract class Crystal::EventLoop
# time in parallel, but this assumption may change in the future!
abstract def interrupt : Nil

# Suspend the current fiber for *duration*.
abstract def sleep(duration : Time::Span) : Nil
# Suspends the current fiber until the absolute *time* is reached (as per the
# monotonic clock). Must resolve the timeout using `Fiber.resolve_timer?` and
# *token* before resuming the fiber.
#
# Returns true if the timer expired, false if it was canceled.
abstract def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool

# Create a new resume event for a fiber.
#
Expand Down
36 changes: 24 additions & 12 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
fiber = timer.value.fiber

case timer.value.type
in .sleep?, .timeout?
in .sleep?
# the timer might have been canceled already, and we must synchronize with
# the resumed `#timeout` fiber; by rule we must always resume the fiber,
# regardless of whether we resolve the timeout or not.
timer.value.timed_out! if fiber.resolve_timer?(timer.value.cancelation_token)
in .timeout?
timer.value.timed_out!
in .select_timeout?
return unless select_action = fiber.timeout_select_action
Expand All @@ -168,10 +173,11 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end
end

protected def delete_timer(timer : Pointer(Timer)) : Nil
protected def delete_timer(timer : Pointer(Timer)) : Bool
@mutex.synchronize do
_, was_next_ready = @timers.delete(timer)
dequeued, was_next_ready = @timers.delete(timer)
rearm_waitable_timer(@timers.next_ready?, interruptible: false) if was_next_ready
dequeued
end
end

Expand All @@ -195,25 +201,31 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end
end

def sleep(duration : Time::Span) : Nil
timer = Timer.new(:sleep, Fiber.current, duration)
def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool
timer = Timer.new(:sleep, Fiber.current)
timer.wake_at = time
timer.cancelation_token = token
add_timer(pointerof(timer))

Fiber.suspend

# safety check
return if timer.timed_out?
unless timer.timed_out? || delete_timer(pointerof(timer))
# the timeout was canceled while another thread dequeued the timer while
# running the event loop: we must synchronize with #process_timer
# (otherwise *event* might go out of scope); by rule the timer will always
# enqueue the fiber and we must suspend again.
Fiber.suspend
end

# try to avoid a double resume if possible, but another thread might be
# running the evloop and dequeue the event in parallel, so a "can't resume
# dead fiber" can still happen in a MT execution context.
delete_timer(pointerof(timer))
raise "BUG: #{timer.fiber} called sleep but was manually resumed before the timer expired!"
timer.timed_out?
end

# Suspend the current fiber for *duration* and returns true if the timer
# expired and false if the fiber was resumed early.
#
# Specific to IOCP to handle IO timeouts.
#
# TODO: use sleep(time, token) instead
def timeout(duration : Time::Span) : Bool
event = Fiber.current.resume_event
event.add(duration)
Expand Down
2 changes: 2 additions & 0 deletions src/crystal/event_loop/iocp/timer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ struct Crystal::EventLoop::IOCP::Timer
# trigger. Nil for IO events without a timeout.
getter! wake_at : Time::Span

property! cancelation_token : Fiber::CancelationToken

# True if an IO event has timed out (i.e. we're past `#wake_at`).
getter? timed_out : Bool = false

Expand Down
32 changes: 30 additions & 2 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,37 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
event_base.loop_exit
end

def sleep(duration : ::Time::Span) : Nil
Fiber.current.resume_event.add(duration)
class SleepData
property fiber : Fiber
property cancelation_token : Fiber::CancelationToken
property? expired : Bool = false

def initialize(@fiber, @cancelation_token)
end
end

def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool
arg = SleepData.new(Fiber.current, token)

event = event_base.new_event(-1, LibEvent2::EventFlags::None, arg) do |s, flags, data|
d = data.as(SleepData)
f = d.fiber
if f.resolve_timer?(d.cancelation_token)
d.expired = true
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end
event.add(time - Time.monotonic)

Fiber.suspend

event.delete unless arg.expired?
arg.expired?
end

# Create a new resume event for a fiber (sleep).
Expand Down
27 changes: 17 additions & 10 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,23 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop

# fiber interface, see Crystal::EventLoop

def sleep(duration : ::Time::Span) : Nil
event = Event.new(:sleep, Fiber.current, timeout: duration)
def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool
event = Event.new(:sleep, Fiber.current)
event.wake_at = time
event.cancelation_token = token
add_timer(pointerof(event))

Fiber.suspend

# safety check
return if event.timed_out?
unless event.timed_out? || delete_timer(pointerof(event))
# the timeout was canceled while another thread dequeued the timer while
# running the event loop: we must synchronize with #process_timer
# (otherwise *event* might go out of scope); by rule the timer will always
# enqueue the fiber and we must suspend again.
Fiber.suspend
end

# try to avoid a double resume if possible, but another thread might be
# running the evloop and dequeue the event in parallel, so a "can't resume
# dead fiber" can still happen in a MT execution context.
delete_timer(pointerof(event))
raise "BUG: #{event.fiber} called sleep but was manually resumed before the timer expired!"
event.timed_out?
end

def create_timeout_event(fiber : Fiber) : FiberEvent
Expand Down Expand Up @@ -607,7 +611,10 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
return unless select_action.time_expired?
fiber.@timeout_event.as(FiberEvent).clear
when .sleep?
event.value.timed_out!
# the timer might have been canceled already, and we must synchronize with
# the resumed `#timeout` fiber; by rule we must always resume the fiber,
# regardless of whether we resolve the timeout or not.
event.value.timed_out! if fiber.resolve_timer?(event.value.cancelation_token)
else
raise RuntimeError.new("BUG: unexpected event in timers: #{event.value}%s\n")
end
Expand Down
2 changes: 2 additions & 0 deletions src/crystal/event_loop/polling/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct Crystal::EventLoop::Polling::Event
# True if an IO event has timed out (i.e. we're past `#wake_at`).
getter? timed_out : Bool = false

property! cancelation_token : Fiber::CancelationToken

# The event can be added to `Waiters` lists.
include PointerLinkedList::Node

Expand Down
4 changes: 2 additions & 2 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt")
end

def sleep(duration : ::Time::Span) : Nil
raise NotImplementedError.new("Crystal::Wasi::EventLoop.sleep")
def sleep(until time : Time::Span, token : Fiber::CancelationToken) : Bool
raise NotImplementedError.new("Crystal::Wasi::EventLoop#sleep")
end

# Creates a timeout_event.
Expand Down
Loading