Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions spec/std/crystal/fd_lock_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
require "spec"
require "crystal/fd_lock"
require "wait_group"

describe Crystal::FdLock do
describe "#reference" do
it "acquires" do
lock = Crystal::FdLock.new
called = 0

lock.reference { called += 1 }
lock.reference { called += 1 }

called.should eq(2)
end

it "allows reentrancy (side effect)" do
lock = Crystal::FdLock.new
called = 0

lock.reference { called += 1 }
lock.reference do
lock.reference { called += 1 }
end

called.should eq(2)
end

it "acquires shared reference" do
lock = Crystal::FdLock.new

ready = WaitGroup.new(1)
release = Channel(String).new

spawn do
lock.reference do
ready.done

select
when release.send("ok")
when timeout(1.second)
release.send("timeout")
end
end
end

ready.wait
lock.reference { }

release.receive.should eq("ok")
end

it "raises when closed" do
lock = Crystal::FdLock.new
lock.try_close? { }

called = false
expect_raises(IO::Error, "Closed") do
lock.reference { called = true }
end

called.should be_false
end
end

describe "#try_close?" do
it "closes" do
lock = Crystal::FdLock.new
lock.closed?.should be_false

called = false
lock.try_close? { called = true }.should be_true
lock.closed?.should be_true
called.should be_true
end

it "closes once" do
lock = Crystal::FdLock.new

called = 0

WaitGroup.wait do |wg|
10.times do
wg.spawn do
lock.try_close? { called += 1 }
lock.try_close? { called += 1 }
end
end
end

called.should eq(1)
end

it "waits for all references to return" do
lock = Crystal::FdLock.new

ready = WaitGroup.new(10)
exceptions = Channel(Exception).new(10)

WaitGroup.wait do |wg|
10.times do
wg.spawn do
begin
lock.reference do
ready.done
Fiber.yield
end
rescue ex
exceptions.send(ex)
end
end
end

ready.wait

called = false
lock.try_close? { called = true }.should eq(true)
lock.closed?.should be_true
end
exceptions.close

if ex = exceptions.receive?
raise ex
end
end
end

it "#reset" do
lock = Crystal::FdLock.new
lock.try_close? { }
lock.reset
lock.try_close? { }.should eq(true)
end
end
2 changes: 1 addition & 1 deletion src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
# AcceptEx does not automatically set the socket options on the accepted
# socket to match those of the listening socket, we need to ask for that
# explicitly with SO_UPDATE_ACCEPT_CONTEXT
socket.system_setsockopt client_handle, LibC::SO_UPDATE_ACCEPT_CONTEXT, socket.fd
System::Socket.setsockopt client_handle, LibC::SO_UPDATE_ACCEPT_CONTEXT, socket.fd

true
else
Expand Down
10 changes: 8 additions & 2 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,13 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
file_descriptor.evented_close
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
def before_close(file_descriptor : Crystal::System::FileDescriptor) : Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: The names before_close and close are not very descriptive about their actual purpose. Maybe we can find more explicit names?
before_close closes the wrapper object, while close closes the system resource. Maybe close_wrapper and close_system_fd? Not entirely convinced these are the best choices, but they seem better than the current ones...

Copy link
Contributor Author

@ysbaddaden ysbaddaden Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean with "wrapper object". Maybe #before_close could be #resume_all but it becomes a directive (do this) rather than a broader "we're about to close, do whatever you need to"

But #close becomes more to the point: close this fd/sock (and only useful for io_uring that can close async).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "wrapper object" is not a good term. I was thinking about FileDescriptor as being a wrapper of the system API. And before_close takes care of closing the aspects specific to the runtime, while close closes the actual file descriptor resource for the OS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the behavior of #before_close here is identical to the #reopened hook called after we reopen a file descriptor: resume any pending waiter for the given IO::FileDescriptor or Socket. We might as well have a single #resume_all, maybe?

# perform cleanup before LibC.close. Using a file descriptor after it has
# been closed is never defined and can always lead to undefined results
file_descriptor.evented_close
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.file_descriptor_close
end

Expand Down Expand Up @@ -299,10 +302,13 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def close(socket : ::Socket) : Nil
def before_close(socket : ::Socket) : Nil
# perform cleanup before LibC.close. Using a file descriptor after it has
# been closed is never defined and can always lead to undefined results
socket.evented_close
end

def close(socket : ::Socket) : Nil
socket.socket_close
end

Expand Down
10 changes: 8 additions & 2 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
resume_all(file_descriptor)
end

def close(file_descriptor : System::FileDescriptor) : Nil
def before_close(file_descriptor : System::FileDescriptor) : Nil
# perform cleanup before LibC.close. Using a file descriptor after it has
# been closed is never defined and can always lead to undefined results
resume_all(file_descriptor)
end

def close(file_descriptor : System::FileDescriptor) : Nil
file_descriptor.file_descriptor_close
end

Expand Down Expand Up @@ -363,10 +366,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def close(socket : ::Socket) : Nil
def before_close(socket : ::Socket) : Nil
# perform cleanup before LibC.close. Using a file descriptor after it has
# been closed is never defined and can always lead to undefined results
resume_all(socket)
end

def close(socket : ::Socket) : Nil
socket.socket_close
end

Expand Down
10 changes: 8 additions & 2 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
raise NotImplementedError.new("Crystal::EventLoop#reopened(FileDescriptor)")
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
def before_close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.file_descriptor_close
end

Expand Down Expand Up @@ -133,8 +136,11 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
raise NotImplementedError.new "Crystal::Wasi::EventLoop#accept"
end

def close(socket : ::Socket) : Nil
def before_close(socket : ::Socket) : Nil
socket.evented_close
end

def close(socket : ::Socket) : Nil
socket.socket_close
end

Expand Down
124 changes: 124 additions & 0 deletions src/crystal/fd_lock.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# :nodoc:
#
# Tracks active references over a system file descriptor (fd).
#
# The fdlock can be closed at any time, but the actual system close will wait
# until there are no more references left. This avoids potential races when a
# thread might try to read a fd that has been closed and has been reused by the
# OS.
struct Crystal::FdLock
CLOSED = 1_u32 # the fdlock has been closed
REF = 2_u32 # the ref counter increment
MASK = ~(REF - 1) # mask for the ref counter

{% if flag?(:preview_mt) %}
@m = Atomic(UInt32).new(0_u32)
@closing : Fiber?
{% else %}
@closed = false
{% end %}

# Borrows a reference for the duration of the block. Raises if the fdlock is
# closed while trying to borrow.
def reference(& : -> F) : F forall F
{% if flag?(:preview_mt) %}
m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed)
increment_slow(m) unless success

begin
yield
ensure
m = @m.sub(REF, :release)
handle_last_ref(m)
end
{% else %}
raise IO::Error.new("Closed") if @closed
yield
{% end %}
end

private def increment_slow(m)
while true
if (m & CLOSED) == CLOSED
raise IO::Error.new("Closed")
end
m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed)
break if success
end
end

private def handle_last_ref(m)
return unless (m & CLOSED) == CLOSED # is closed?
return unless (m & MASK) == REF # was the last ref?

# the last ref after close is responsible to resume the closing fiber
@closing.not_nil!("BUG: expected a closing fiber to resume.").enqueue
end

# Closes the fdlock. Blocks for as long as there are references.
#
# Returns true if the fdlock has been closed: no fiber can acquire a reference
# anymore, the calling fiber fully owns the fd and can safely close it.
#
# Returns false if the fdlock has already been closed: the calling fiber
# doesn't own the fd and musn't close it (there might still be active
# references).
def try_close?(&before_close : ->) : Bool
{% if flag?(:preview_mt) %}
m = @m.get(:relaxed)

# increment ref and close (abort if already closed)
while true
if (m & CLOSED) == CLOSED
return false
end
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
break if success
end

# set the current fiber as the closing fiber (to be resumed by the last ref)
# then decrement ref
@closing = Fiber.current
m = @m.sub(REF, :release)

begin
# before close callback
yield
ensure
# wait for the last ref... unless we're the last ref!
Fiber.suspend unless (m & MASK) == REF
end

@closing = nil
true
{% else %}
if @closed
false
else
@closed = true
yield
true
end
{% end %}
end

# Resets the fdlock back to its pristine state so it can be used again.
# Assumes the caller owns the fdlock. This is required by
# `TCPSocket#initialize`.
def reset : Nil
{% if flag?(:preview_mt) %}
@m.lazy_set(0_u32)
@closing = nil
{% else %}
@closed = false
{% end %}
end

def closed? : Bool
{% if flag?(:preview_mt) %}
(@m.get(:relaxed) & CLOSED) == CLOSED
{% else %}
@closed
{% end %}
end
end
8 changes: 5 additions & 3 deletions src/crystal/system/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ module Crystal::System::Socket

# private def system_linger=(val)

# private def system_getsockopt(fd, optname, optval, level = LibC::SOL_SOCKET, &)
# private def system_getsockopt(optname, optval, level = LibC::SOL_SOCKET, &)

# private def system_getsockopt(fd, optname, optval, level = LibC::SOL_SOCKET)
# private def system_getsockopt(optname, optval, level = LibC::SOL_SOCKET)

# private def system_setsockopt(fd, optname, optval, level = LibC::SOL_SOCKET)
# private def system_setsockopt(optname, optval, level = LibC::SOL_SOCKET)

# private def system_blocking?

Expand All @@ -70,6 +70,8 @@ module Crystal::System::Socket

# private def system_close_on_exec=(arg : Bool)

# private def system_fcntl(cmd, arg = 0)

# def self.fcntl(fd, cmd, arg = 0)

# def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle}
Expand Down
Loading
Loading