Skip to content

Commit b03c49f

Browse files
committed
Better Channel close behavior.
1 parent d751bea commit b03c49f

File tree

13 files changed

+70
-50
lines changed

13 files changed

+70
-50
lines changed

examples/a-tour-of-go-channels/default-selection.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
loop do
1414
Channel.select do |s|
15-
s.take(tick) { print "tick.\n" }
15+
s.take(tick) { |t| print "tick.\n" if t }
1616
s.take(boom) do
1717
print "BOOM!\n"
1818
exit

examples/go-by-example-channels/rate-limiting.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
limiter = Channel.ticker(0.2)
1919
requests.each do |req|
20-
~limiter
21-
print "request #{req} #{Channel::Tick.new}\n"
20+
print "request #{req} #{Channel::Tick.new}\n" if ~limiter
2221
end
2322
print "\n"
2423

@@ -27,8 +26,9 @@
2726
bursty_limiter << Channel::Tick.new
2827
end
2928

29+
ticker = Channel.ticker(0.2)
3030
Channel.go do
31-
Channel.ticker(0.2).each do |t|
31+
ticker.each do |t|
3232
bursty_limiter << t
3333
end
3434
end
@@ -44,6 +44,9 @@
4444
print "request #{req} #{Channel::Tick.new}\n"
4545
end
4646

47+
limiter.close
48+
ticker.close
49+
4750
__END__
4851
request 1 2012-10-19 00:38:18.687438 +0000 UTC
4952
request 2 2012-10-19 00:38:18.887471 +0000 UTC

examples/go-by-example-channels/ticker.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
ticker = Channel.ticker(0.5)
1111
Channel.go do
1212
ticker.each do |tick|
13-
print "Tick at #{tick}\n"
13+
print "Tick at #{tick}\n" if tick
1414
end
1515
end
1616

examples/go-by-example-channels/timers.rb

+2-4
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
timer1 = Channel.timer(2)
1111

12-
~timer1
13-
puts 'Timer 1 expired'
12+
puts 'Timer 1 expired' if ~timer1
1413

1514
timer2 = Channel.timer(1)
1615
Channel.go do
17-
~timer2
18-
print "Timer 2 expired\n"
16+
print "Timer 2 expired\n" if ~timer2
1917
end
2018

2119
stop2 = timer2.stop

lib/concurrent/channel.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class Channel
1212
include Enumerable
1313

1414
# NOTE: Move to global IO pool once stable
15-
GOROUTINES = Concurrent::CachedThreadPool.new
15+
GOROUTINES = Concurrent::CachedThreadPool.new(auto_terminate: true)
1616
private_constant :GOROUTINES
1717

1818
BUFFER_TYPES = {

lib/concurrent/channel/buffer/base.rb

+7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'concurrent/synchronization/lockable_object'
2+
require 'concurrent/utility/at_exit'
23

34
module Concurrent
45
class Channel
@@ -32,6 +33,7 @@ def initialize(*args)
3233
@capacity = 0
3334
@buffer = nil
3435
ns_initialize(*args)
36+
AtExit.add(self) { terminate_at_exit }
3537
end
3638
end
3739

@@ -232,6 +234,11 @@ def ns_full?
232234
def ns_closed?
233235
@closed
234236
end
237+
238+
# @!visibility private
239+
def terminate_at_exit
240+
synchronize { self.closed = true }
241+
end
235242
end
236243
end
237244
end

lib/concurrent/channel/buffer/ticker.rb

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ def ns_initialize(interval)
1919

2020
def do_poll
2121
synchronize do
22-
if !ns_closed? && (now = Concurrent.monotonic_time) >= @next_tick
22+
if ns_closed?
23+
return Concurrent::NULL, false
24+
elsif (now = Concurrent.monotonic_time) >= @next_tick
2325
tick = Concurrent::Channel::Tick.new(@next_tick)
2426
@next_tick = now + @interval
25-
return tick
27+
return tick, true
2628
else
27-
return Concurrent::NULL
29+
return nil, true
2830
end
2931
end
3032
end

lib/concurrent/channel/buffer/timer.rb

+16-12
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,28 @@ def offer(item)
1818
end
1919

2020
def take
21-
# a Go timer will block forever if stopped
2221
loop do
23-
tick = do_poll
24-
return tick if tick != Concurrent::NULL
25-
Thread.pass
22+
tick, _ = do_poll
23+
if tick
24+
return tick
25+
else
26+
Thread.pass
27+
end
2628
end
2729
end
2830

2931
def next
30-
# a Go timer will block forever if stopped
31-
# it will always return `true` for more
3232
loop do
33-
tick = do_poll
34-
return tick, true if tick != Concurrent::NULL
33+
tick, more = do_poll
34+
return tick, more if tick
3535
Thread.pass
3636
end
3737
end
3838

3939
def poll
40-
do_poll
40+
tick, _ = do_poll
41+
tick = Concurrent::NULL unless tick
42+
tick
4143
end
4244

4345
private
@@ -61,12 +63,14 @@ def ns_full?
6163

6264
def do_poll
6365
synchronize do
64-
if !ns_closed? && Concurrent.monotonic_time >= @tick
66+
if ns_closed?
67+
return Concurrent::NULL, false
68+
elsif Concurrent.monotonic_time >= @tick
6569
# only one listener gets notified
6670
self.closed = true
67-
return Concurrent::Channel::Tick.new(@tick)
71+
return Concurrent::Channel::Tick.new(@tick), false
6872
else
69-
return Concurrent::NULL
73+
return nil, true
7074
end
7175
end
7276
end

spec/concurrent/channel/buffer/ticker_spec.rb

+18
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ module Concurrent::Channel::Buffer
1616
expected.times { actual += 1 if subject.take.is_a? Concurrent::Channel::Tick }
1717
expect(actual).to eq expected
1818
end
19+
20+
it 'returns Concurrent::NULL when closed after trigger' do
21+
subject.take
22+
subject.close
23+
expect(subject).to be_closed
24+
expect(subject.take).to eq Concurrent::NULL
25+
end
1926
end
2027

2128
context '#poll' do
@@ -37,6 +44,17 @@ module Concurrent::Channel::Buffer
3744
expected.times { actual += 1 if subject.next.first.is_a? Concurrent::Channel::Tick }
3845
expect(actual).to eq expected
3946
end
47+
48+
it 'returns true for more while open' do
49+
_, more = subject.next
50+
expect(more).to be true
51+
end
52+
53+
it 'returns false for more once closed' do
54+
subject.close
55+
_, more = subject.next
56+
expect(more).to be false
57+
end
4058
end
4159
end
4260
end

spec/concurrent/channel/buffer/timer_spec.rb

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ module Concurrent::Channel::Buffer
3333
end
3434
expect(subject).to be_closed
3535
end
36+
37+
it 'returns false for more' do
38+
_, more = subject.next
39+
expect(more).to be false
40+
end
3641
end
3742
end
3843
end

spec/concurrent/channel/buffer/timing_buffer_shared.rb

+4-21
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,9 @@
6969
expect(actual - start).to be >= 0.1
7070
end
7171

72-
it 'blocks forever when closed' do
72+
it 'returns Concurrent::NULL when closed' do
7373
subject.close
74-
t = Thread.new do
75-
subject.take
76-
end
77-
actual = t.join(1)
78-
t.kill # clean up
79-
80-
expect(actual).to be_falsey
74+
expect(subject.take).to eq Concurrent::NULL
8175
end
8276
end
8377

@@ -129,20 +123,9 @@
129123
expect(value).to be_a Concurrent::Channel::Tick
130124
end
131125

132-
it 'returns true for more' do
133-
_, more = subject.next
134-
expect(more).to be true
135-
end
136-
137-
it 'blocks forever when closed' do
126+
it 'returns Concurrent::NULL, false when closed' do
138127
subject.close
139-
t = Thread.new do
140-
subject.next
141-
end
142-
actual = t.join(1)
143-
t.kill # clean up
144-
145-
expect(actual).to be_falsey
128+
expect(subject.take).to eq Concurrent::NULL
146129
end
147130

148131
it 'triggers after the specified time interval' do

spec/concurrent/channel/integration_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
describe 'channel integration tests', notravis: true do
1+
describe 'channel integration tests' do
22

33
let!(:examples_root) { File.expand_path(File.join(File.dirname(__FILE__), '../../../examples')) }
44

@@ -65,7 +65,7 @@
6565
expect(result).to eq expected
6666
end
6767

68-
specify 'default-selection.rb' do
68+
specify 'default-selection.rb', notravis: true do
6969
expected = <<-STDOUT
7070
.
7171
.

spec/concurrent/channel_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ module Concurrent
557557
end
558558
end
559559

560-
context 'goroutines' do
560+
context 'goroutines', notravis: true do
561561

562562
let(:default_executor) { Channel.const_get(:GOROUTINES) }
563563

@@ -608,7 +608,7 @@ module Concurrent
608608
actual < expected
609609
end
610610

611-
latch.wait(3)
611+
latch.wait(10)
612612
expect(actual).to eq expected
613613
end
614614
end

0 commit comments

Comments
 (0)