Skip to content

Commit ddfe7ae

Browse files
Introduce kill. (#43)
1 parent ab26d27 commit ddfe7ae

File tree

8 files changed

+239
-47
lines changed

8 files changed

+239
-47
lines changed

examples/container.rb

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,33 @@
66
# Copyright, 2019, by Yuji Yaginuma.
77
# Copyright, 2022, by Anton Sozontov.
88

9-
require "../lib/async/container"
9+
require_relative "../lib/async/container"
1010

1111
Console.logger.debug!
1212

1313
container = Async::Container.new
1414

15-
Console.debug "Spawning 2 containers..."
15+
Console.debug "Spawning 2 children..."
1616

1717
2.times do
18-
container.spawn do |task|
19-
Console.debug task, "Sleeping..."
20-
sleep(2)
21-
Console.debug task, "Waking up!"
18+
container.spawn do |instance|
19+
Signal.trap(:INT) {}
20+
Signal.trap(:TERM) {}
21+
22+
Console.debug instance, "Sleeping..."
23+
while true
24+
sleep
25+
end
26+
Console.debug instance, "Waking up!"
2227
end
2328
end
2429

2530
Console.debug "Waiting for container..."
26-
container.wait
31+
begin
32+
container.wait
33+
rescue Interrupt
34+
# Okay, done.
35+
ensure
36+
container.stop(true)
37+
end
2738
Console.debug "Finished."

fixtures/async/container/a_container.rb

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,91 @@ module Container
246246
expect(container.statistics).to have_attributes(failures: be > 0)
247247
end
248248
end
249+
250+
with "broken children" do
251+
it "can handle children that ignore termination with SIGKILL fallback" do
252+
# Test behavior that works for both processes (signals) and threads (exceptions)
253+
container.spawn(restart: false) do |instance|
254+
instance.ready!
255+
256+
# Ignore termination attempts in a way appropriate to the container type
257+
if container.class.multiprocess?
258+
# For multiprocess containers - ignore signals
259+
Signal.trap(:INT) {}
260+
Signal.trap(:TERM) {}
261+
while true
262+
sleep(0.1)
263+
end
264+
else
265+
# For threaded containers - ignore exceptions
266+
while true
267+
begin
268+
sleep(0.1)
269+
rescue Async::Container::Interrupt, Async::Container::Terminate
270+
# Ignore termination attempts
271+
end
272+
end
273+
end
274+
end
275+
276+
container.wait_until_ready
277+
278+
# Try to stop with a very short timeout to force escalation
279+
start_time = Time.now
280+
container.stop(0.1) # Very short timeout
281+
end_time = Time.now
282+
283+
# Should stop successfully via SIGKILL/thread termination
284+
expect(container.size).to be == 0
285+
286+
# Should not hang - escalation should work
287+
expect(end_time - start_time).to be < 2.0
288+
end
289+
290+
it "can handle unresponsive children that close pipes but don't exit" do
291+
container.spawn(restart: false) do |instance|
292+
instance.ready!
293+
294+
# Close communication pipe to simulate hung process:
295+
begin
296+
if instance.respond_to?(:out)
297+
instance.out.close if instance.out && !instance.out.closed?
298+
end
299+
rescue
300+
# Ignore close errors.
301+
end
302+
303+
# Become unresponsive:
304+
if container.class.multiprocess?
305+
# For multiprocess containers - ignore signals and close file descriptors:
306+
Signal.trap(:INT) {}
307+
Signal.trap(:TERM) {}
308+
(4..256).each do |fd|
309+
begin
310+
IO.for_fd(fd).close
311+
rescue
312+
# Ignore errors
313+
end
314+
end
315+
loop {} # Tight loop
316+
else
317+
# For threaded containers - just become unresponsive
318+
loop {} # Tight loop, no exception handling
319+
end
320+
end
321+
322+
container.wait_until_ready
323+
324+
# Should not hang even with unresponsive children
325+
start_time = Time.now
326+
container.stop(1.0)
327+
end_time = Time.now
328+
329+
expect(container.size).to be == 0
330+
# Should complete reasonably quickly via hang prevention
331+
expect(end_time - start_time).to be < 5.0
332+
end
333+
end
249334
end
250335
end
251336
end

guides/getting-started/readme.md

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,25 @@ container.wait
3939
Console.debug "Finished."
4040
```
4141

42+
### Stopping Child Processes
43+
44+
Containers provide three approaches for stopping child processes (or threads). When you call `container.stop()`, a progressive approach is used:
45+
46+
- **Interrupt** means **"Please start shutting down gracefully"**. This is the gentlest shutdown request, giving applications maximum time to finish current work and cleanup resources.
47+
48+
- **Terminate** means **"Shut down now"**. This is more urgent - the process should stop what it's doing and terminate promptly, but still has a chance to cleanup.
49+
50+
- **Kill** means **"Die immediately"**. This forcefully terminates the process with no cleanup opportunity. This is the method of last resort.
51+
52+
The escalation sequence follows this pattern:
53+
1. interrupt → wait for timeout → still running?
54+
2. terminate → wait for timeout → still running?
55+
3. kill → process terminated.
56+
57+
This gives well-behaved processes multiple opportunities to shut down gracefully, while ensuring that unresponsive processes are eventually killed.
58+
59+
**Implementation Note:** For forked containers, these methods send Unix signals (`SIGINT`, `SIGTERM`, `SIGKILL`). For threaded containers, they use different mechanisms appropriate to threads. The container abstraction hides these implementation details.
60+
4261
## Controllers
4362

4463
The controller provides the life-cycle management for one or more containers of processes. It provides behaviour like starting, restarting, reloading and stopping. You can see some [example implementations in Falcon](https://github.com/socketry/falcon/blob/master/lib/falcon/controller/). If the process running the controller receives `SIGHUP` it will recreate the container gracefully.
@@ -72,16 +91,12 @@ controller.run
7291
# If you send SIGHUP to this process, it will recreate the container.
7392
```
7493

75-
## Signal Handling
76-
77-
`SIGINT` is the reload signal. You may send this to a program to request that it reload its configuration. The default behavior is to gracefully reload the container.
78-
79-
`SIGINT` is the interrupt signal. The terminal sends it to the foreground process when the user presses **ctrl-c**. The default behavior is to terminate the process, but it can be caught or ignored. The intention is to provide a mechanism for an orderly, graceful shutdown.
80-
81-
`SIGQUIT` is the dump core signal. The terminal sends it to the foreground process when the user presses **ctrl-\\**. The default behavior is to terminate the process and dump core, but it can be caught or ignored. The intention is to provide a mechanism for the user to abort the process. You can look at `SIGINT` as "user-initiated happy termination" and `SIGQUIT` as "user-initiated unhappy termination."
94+
### Controller Signal Handling
8295

83-
`SIGTERM` is the termination signal. The default behavior is to terminate the process, but it also can be caught or ignored. The intention is to kill the process, gracefully or not, but to first allow it a chance to cleanup.
96+
Controllers are designed to run at the process level and are therefore responsible for processing signals. When your controller process receives these signals:
8497

85-
`SIGKILL` is the kill signal. The only behavior is to kill the process, immediately. As the process cannot catch the signal, it cannot cleanup, and thus this is a signal of last resort.
98+
- `SIGHUP` → Gracefully reload the container (restart with new configuration).
99+
- `SIGINT` → Begin graceful shutdown of the entire controller and all children.
100+
- `SIGTERM` → Begin immediate shutdown of the controller and all children.
86101

87-
`SIGSTOP` is the pause signal. The only behavior is to pause the process; the signal cannot be caught or ignored. The shell uses pausing (and its counterpart, resuming via `SIGCONT`) to implement job control.
102+
Ideally, do not send `SIGKILL` to a controller, as it will immediately terminate the controller without giving it a chance to gracefully shut down child processes. This can leave orphaned processes running and prevent proper cleanup.

lib/async/container/error.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ def initialize
2121
end
2222
end
2323

24+
# Similar to {Terminate}, but represents `SIGKILL`.
25+
class Kill < SignalException
26+
SIGKILL = Signal.list["KILL"]
27+
28+
# Create a new kill error.
29+
def initialize
30+
super(SIGKILL)
31+
end
32+
end
33+
2434
# Similar to {Interrupt}, but represents `SIGHUP`.
2535
class Restart < SignalException
2636
SIGHUP = Signal.list["HUP"]

lib/async/container/forked.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,20 +231,25 @@ def restart!
231231
# Wait for the child process to exit.
232232
# @asynchronous This method may block.
233233
#
234+
# @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination.
234235
# @returns [::Process::Status] The process exit status.
235-
def wait
236+
def wait(timeout = 0.1)
236237
if @pid && @status.nil?
237238
Console.debug(self, "Waiting for process to exit...", pid: @pid)
238239

239240
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
240241

241-
while @status.nil?
242-
sleep(0.1)
242+
if @status.nil?
243+
sleep(timeout) if timeout
243244

244245
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
245246

246247
if @status.nil?
247-
Console.warn(self) {"Process #{@pid} is blocking, has it exited?"}
248+
Console.warn(self) {"Process #{@pid} is blocking, sending kill signal..."}
249+
self.kill!
250+
251+
# Wait for the process to exit:
252+
_, @status = ::Process.wait2(@pid)
248253
end
249254
end
250255
end

lib/async/container/group.rb

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -119,36 +119,78 @@ def terminate
119119
end
120120
end
121121

122-
# Stop all child processes using {#terminate}.
123-
# @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first.
124-
def stop(timeout = 1)
125-
Console.debug(self, "Stopping all processes...", timeout: timeout)
126-
# Use a default timeout if not specified:
127-
timeout = 1 if timeout == true
122+
# Kill all running processes.
123+
# This resumes the controlling fiber with an instance of {Kill}.
124+
def kill
125+
Console.info(self, "Sending kill to #{@running.size} running processes...")
126+
@running.each_value do |fiber|
127+
fiber.resume(Kill)
128+
end
129+
end
130+
131+
private def wait_for_exit(clock, timeout)
132+
while self.any?
133+
duration = timeout - clock.total
134+
135+
if duration >= 0
136+
self.wait_for_children(duration)
137+
else
138+
self.wait_for_children(0)
139+
break
140+
end
141+
end
142+
end
143+
144+
# Stop all child processes with a multi-phase shutdown sequence.
145+
#
146+
# A graceful shutdown performs the following sequence:
147+
# 1. Send SIGINT and wait up to `interrupt_timeout` seconds
148+
# 2. Send SIGTERM and wait up to `terminate_timeout` seconds
149+
# 3. Send SIGKILL and wait indefinitely for process cleanup
150+
#
151+
# If `graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL.
152+
#
153+
# @parameter graceful [Boolean] Whether to send SIGINT first or skip directly to SIGTERM.
154+
# @parameter interrupt_timeout [Numeric | Nil] Time to wait after SIGINT before escalating to SIGTERM.
155+
# @parameter terminate_timeout [Numeric | Nil] Time to wait after SIGTERM before escalating to SIGKILL.
156+
def stop(graceful = true, interrupt_timeout: 1, terminate_timeout: 1)
157+
case graceful
158+
when true
159+
# Use defaults.
160+
when false
161+
interrupt_timeout = nil
162+
when Numeric
163+
interrupt_timeout = graceful
164+
terminate_timeout = graceful
165+
end
128166

129-
if timeout
130-
start_time = Async::Clock.now
167+
Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout)
168+
169+
# If a timeout is specified, interrupt the children first:
170+
if interrupt_timeout
171+
clock = Async::Clock.start
131172

173+
# Interrupt the children:
132174
self.interrupt
133175

134-
while self.any?
135-
duration = Async::Clock.now - start_time
136-
remaining = timeout - duration
137-
138-
if remaining >= 0
139-
self.wait_for_children(duration)
140-
else
141-
self.wait_for_children(0)
142-
break
143-
end
144-
end
176+
# Wait for the children to exit:
177+
self.wait_for_exit(clock, interrupt_timeout)
145178
end
146179

147-
# Terminate all children:
148-
self.terminate if any?
180+
if terminate_timeout
181+
clock = Async::Clock.start
182+
183+
# If the children are still running, terminate them:
184+
self.terminate
185+
186+
# Wait for the children to exit:
187+
self.wait_for_exit(clock, terminate_timeout)
188+
end
149189

150-
# Wait for all children to exit:
151-
self.wait
190+
if any?
191+
self.kill
192+
self.wait
193+
end
152194
end
153195

154196
# Wait for a message in the specified {Channel}.
@@ -165,6 +207,8 @@ def wait_for(channel)
165207
channel.interrupt!
166208
elsif result == Terminate
167209
channel.terminate!
210+
elsif result == Kill
211+
channel.kill!
168212
elsif result
169213
yield result
170214
elsif message = channel.receive
@@ -184,7 +228,7 @@ def wait_for_children(duration = nil)
184228
# This log is a big noisy and doesn't really provide a lot of useful information.
185229
# Console.debug(self, "Waiting for children...", duration: duration, running: @running)
186230

187-
if !@running.empty?
231+
unless @running.empty?
188232
# Maybe consider using a proper event loop here:
189233
if ready = self.select(duration)
190234
ready.each do |io|

lib/async/container/threaded.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,20 @@ def restart!
216216
end
217217

218218
# Wait for the thread to exit and return he exit status.
219+
# @asynchronous This method may block.
220+
#
221+
# @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination.
219222
# @returns [Status]
220-
def wait
223+
def wait(timeout = 0.1)
221224
if @waiter
222-
@waiter.join
225+
Console.debug(self, "Waiting for thread to exit...", timeout: timeout)
226+
227+
unless @waiter.join(timeout)
228+
Console.warn(self) {"Thread #{@thread} is blocking, sending kill signal..."}
229+
self.kill!
230+
@waiter.join
231+
end
232+
223233
@waiter = nil
224234
end
225235

releases.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# Releases
22

3+
## Unreleased
4+
5+
### Production Reliability Improvements
6+
7+
This release significantly improves container reliability by eliminating production hangs caused by unresponsive child processes.
8+
9+
**SIGKILL Fallback Support**: Containers now automatically escalate to SIGKILL when child processes ignore SIGINT and SIGTERM signals. This prevents the critical production issue where containers would hang indefinitely waiting for uncooperative processes to exit.
10+
11+
**Hang Prevention**: Individual child processes now have timeout-based hang prevention. If a process closes its notification pipe but doesn't actually exit, the container will detect this and escalate to SIGKILL after a reasonable timeout instead of hanging forever.
12+
13+
**Improved Three-Phase Shutdown**: The `Group#stop()` method now uses a cleaner interrupt → terminate → kill escalation sequence with configurable timeouts for each phase, giving well-behaved processes multiple opportunities to shut down gracefully while ensuring unresponsive processes are eventually terminated.
14+
315
## v0.25.0
416

517
- Introduce `async:container:notify:log:ready?` task for detecting process readiness.

0 commit comments

Comments
 (0)