Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0adf557

Browse files
authoredSep 9, 2024··
Revert "Tidy up body code. (#181)"
This reverts commit d5f0b31.
1 parent d5f0b31 commit 0adf557

File tree

18 files changed

+286
-90
lines changed

18 files changed

+286
-90
lines changed
 

‎examples/upload/client.rb

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,10 @@
99

1010
require 'async'
1111
require 'protocol/http/body/file'
12+
require 'async/http/body/delayed'
1213
require 'async/http/client'
1314
require 'async/http/endpoint'
1415

15-
class Delayed < ::Protocol::HTTP::Body::Wrapper
16-
def initialize(body, delay = 0.01)
17-
super(body)
18-
19-
@delay = delay
20-
end
21-
22-
def ready?
23-
false
24-
end
25-
26-
def read
27-
sleep(@delay)
28-
29-
return super
30-
end
31-
end
32-
3316
Async do
3417
endpoint = Async::HTTP::Endpoint.parse("http://localhost:9222")
3518
client = Async::HTTP::Client.new(endpoint, protocol: Async::HTTP::Protocol::HTTP2)
@@ -38,7 +21,7 @@ def read
3821
['accept', 'text/plain'],
3922
]
4023

41-
body = Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32))
24+
body = Async::HTTP::Body::Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32))
4225

4326
response = client.post(endpoint.path, headers, body)
4427

‎fixtures/async/http/a_protocol.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ module HTTP
162162
request_received.wait
163163
headers.add('etag', 'abcd')
164164

165-
body.close_write
165+
body.close
166166
end
167167

168168
response = client.post("/", headers, body)
@@ -187,7 +187,7 @@ module HTTP
187187
response_received.wait
188188
headers.add('etag', 'abcd')
189189

190-
body.close_write
190+
body.close
191191
end
192192

193193
::Protocol::HTTP::Response[200, headers, body]
@@ -395,9 +395,9 @@ module HTTP
395395
let(:app) do
396396
::Protocol::HTTP::Middleware.for do |request|
397397
Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
398-
stream.write(content)
399-
stream.write(content)
400-
stream.close_write
398+
stream.write content
399+
stream.write content
400+
stream.close
401401
end
402402
end
403403
end
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2023, by Samuel Williams.
5+
6+
require 'protocol/http/body/deflate'
7+
8+
module Async
9+
module HTTP
10+
module Body
11+
AWritableBody = Sus::Shared("a writable body") do
12+
it "can write and read data" do
13+
3.times do |i|
14+
body.write("Hello World #{i}")
15+
expect(body.read).to be == "Hello World #{i}"
16+
end
17+
end
18+
19+
it "can buffer data in order" do
20+
3.times do |i|
21+
body.write("Hello World #{i}")
22+
end
23+
24+
3.times do |i|
25+
expect(body.read).to be == "Hello World #{i}"
26+
end
27+
end
28+
29+
with '#join' do
30+
it "can join chunks" do
31+
3.times do |i|
32+
body.write("#{i}")
33+
end
34+
35+
body.close
36+
37+
expect(body.join).to be == "012"
38+
end
39+
end
40+
41+
with '#each' do
42+
it "can read all data in order" do
43+
3.times do |i|
44+
body.write("Hello World #{i}")
45+
end
46+
47+
body.close
48+
49+
3.times do |i|
50+
chunk = body.read
51+
expect(chunk).to be == "Hello World #{i}"
52+
end
53+
end
54+
55+
it "can propagate failures" do
56+
reactor.async do
57+
expect do
58+
body.each do |chunk|
59+
raise RuntimeError.new("It was too big!")
60+
end
61+
end.to raise_exception(RuntimeError, message: be =~ /big/)
62+
end
63+
64+
expect{
65+
body.write("Beep boop") # This will cause a failure.
66+
::Async::Task.current.yield
67+
body.write("Beep boop") # This will fail.
68+
}.to raise_exception(RuntimeError, message: be =~ /big/)
69+
end
70+
71+
it "can propagate failures in nested bodies" do
72+
nested = ::Protocol::HTTP::Body::Deflate.for(body)
73+
74+
reactor.async do
75+
expect do
76+
nested.each do |chunk|
77+
raise RuntimeError.new("It was too big!")
78+
end
79+
end.to raise_exception(RuntimeError, message: be =~ /big/)
80+
end
81+
82+
expect{
83+
body.write("Beep boop") # This will cause a failure.
84+
::Async::Task.current.yield
85+
body.write("Beep boop") # This will fail.
86+
}.to raise_exception(RuntimeError, message: be =~ /big/)
87+
end
88+
89+
it "can consume chunks" do
90+
body.write("Hello World!")
91+
body.close
92+
93+
expect(body).not.to be(:empty?)
94+
95+
body.each do |chunk|
96+
expect(chunk).to be == "Hello World!"
97+
end
98+
99+
expect(body).to be(:empty?)
100+
end
101+
end
102+
end
103+
end
104+
end
105+
end

‎gems.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
# gem "protocol-http2", path: "../protocol-http2"
2121
# gem "protocol-hpack", path: "../protocol-hpack"
2222

23-
gem "protocol-http", git: "https://github.com/socketry/protocol-http.git"
24-
2523
group :maintenance, optional: true do
2624
gem "bake-modernize"
2725
gem "bake-gem"

‎lib/async/http/body/delayed.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2023, by Samuel Williams.
5+
# Copyright, 2020, by Bruno Sutic.
6+
# Copyright, 2023, by Thomas Morgan.
7+
8+
require 'protocol/http/body/wrapper'
9+
10+
module Async
11+
module HTTP
12+
module Body
13+
class Delayed < ::Protocol::HTTP::Body::Wrapper
14+
def initialize(body, delay = 0.01)
15+
super(body)
16+
17+
@delay = delay
18+
end
19+
20+
def ready?
21+
false
22+
end
23+
24+
def read
25+
Async::Task.current.sleep(@delay)
26+
27+
return super
28+
end
29+
end
30+
end
31+
end
32+
end

‎lib/async/http/body/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def stream?
3636
end
3737

3838
def call(stream)
39-
@block.call(stream)
39+
return @block.call(stream)
4040
end
4141

4242
attr :input

‎lib/async/http/body/pipe.rb

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def initialize(input, output = Writable.new, task: Task.current)
1717

1818
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
1919

20-
@head = ::IO::Stream(head)
20+
@head = ::IO::Stream::Buffered.new(head)
2121
@tail = tail
2222

2323
@reader = nil
@@ -52,10 +52,8 @@ def reader(task)
5252
end
5353

5454
@head.close_write
55-
rescue => error
56-
raise
5755
ensure
58-
@input.close(error)
56+
@input.close($!)
5957

6058
close_head if @writer&.finished?
6159
end
@@ -70,10 +68,8 @@ def writer(task)
7068
while chunk = @head.read_partial
7169
@output.write(chunk)
7270
end
73-
rescue => error
74-
raise
7571
ensure
76-
@output.close_write(error)
72+
@output.close($!)
7773

7874
close_head if @reader&.finished?
7975
end

‎lib/async/http/body/slowloris.rb

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2023, by Samuel Williams.
5+
6+
require_relative 'writable'
7+
8+
require 'async/clock'
9+
10+
module Async
11+
module HTTP
12+
module Body
13+
# A dynamic body which you can write to and read from.
14+
class Slowloris < Writable
15+
class ThroughputError < StandardError
16+
def initialize(throughput, minimum_throughput, time_since_last_write)
17+
super("Slow write: #{throughput.round(1)}bytes/s less than required #{minimum_throughput.round}bytes/s.")
18+
end
19+
end
20+
21+
# In order for this implementation to work correctly, you need to use a LimitedQueue.
22+
# @param minimum_throughput [Integer] the minimum bytes per second otherwise this body will be forcefully closed.
23+
def initialize(*arguments, minimum_throughput: 1024, **options)
24+
super(*arguments, **options)
25+
26+
@minimum_throughput = minimum_throughput
27+
28+
@last_write_at = nil
29+
@last_chunk_size = nil
30+
end
31+
32+
attr :minimum_throughput
33+
34+
# If #read is called regularly to maintain throughput, that is good. If #read is not called, that is a problem. Throughput is dependent on data being available, from #write, so it doesn't seem particularly problimatic to do this check in #write.
35+
def write(chunk)
36+
if @last_chunk_size
37+
time_since_last_write = Async::Clock.now - @last_write_at
38+
throughput = @last_chunk_size / time_since_last_write
39+
40+
if throughput < @minimum_throughput
41+
error = ThroughputError.new(throughput, @minimum_throughput, time_since_last_write)
42+
43+
self.close(error)
44+
end
45+
end
46+
47+
super.tap do
48+
@last_write_at = Async::Clock.now
49+
@last_chunk_size = chunk&.bytesize
50+
end
51+
end
52+
end
53+
end
54+
end
55+
end

‎lib/async/http/protocol/http1/server.rb

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,31 +68,19 @@ def each(task: Task.current)
6868
stream = write_upgrade_body(protocol)
6969

7070
# At this point, the request body is hijacked, so we don't want to call #finish below.
71-
request = nil
71+
request = nil unless request.body
7272
response = nil
7373

7474
# We must return here as no further request processing can be done:
7575
return body.call(stream)
76-
elsif response.status == 101
77-
# This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header.
78-
write_response(@version, response.status, response.headers)
79-
80-
stream = write_tunnel_body(request.version)
81-
82-
# Same as above:
83-
request = nil
84-
response = nil
85-
86-
# We must return here as no further request processing can be done:
87-
return body&.call(stream)
8876
else
8977
write_response(@version, response.status, response.headers)
9078

9179
if request.connect? and response.success?
9280
stream = write_tunnel_body(request.version)
9381

9482
# Same as above:
95-
request = nil
83+
request = nil unless request.body
9684
response = nil
9785

9886
# We must return here as no further request processing can be done:

‎lib/async/http/protocol/http2/input.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
# Released under the MIT License.
44
# Copyright, 2020-2023, by Samuel Williams.
55

6-
require 'protocol/http/body/writable'
6+
require_relative '../../body/writable'
77

88
module Async
99
module HTTP
1010
module Protocol
1111
module HTTP2
1212
# A writable body which requests window updates when data is read from it.
13-
class Input < ::Protocol::HTTP::Body::Writable
13+
class Input < Body::Writable
1414
def initialize(stream, length)
1515
super(length)
1616

‎lib/async/http/protocol/http2/output.rb

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,18 @@ def write(chunk)
5050
end
5151
end
5252

53-
def close_write(error = nil)
54-
if stream = @stream
55-
@stream = nil
56-
stream.finish_output(error)
57-
end
58-
end
59-
6053
# This method should only be called from within the context of the output task.
6154
def close(error = nil)
62-
close_write(error)
63-
stop(error)
55+
if @stream
56+
@stream.finish_output(error)
57+
@stream = nil
58+
end
6459
end
6560

6661
# This method should only be called from within the context of the HTTP/2 stream.
6762
def stop(error)
68-
if task = @task
69-
@task = nil
70-
task.stop(error)
71-
end
63+
@task&.stop
64+
@task = nil
7265
end
7366

7467
private
@@ -77,12 +70,10 @@ def stream(task)
7770
task.annotate("Streaming #{@body} to #{@stream}.")
7871

7972
input = @stream.wait_for_input
80-
stream = ::Protocol::HTTP::Body::Stream.new(input, self)
8173

82-
@body.call(stream)
83-
rescue => error
84-
self.close(error)
85-
raise
74+
@body.call(::Protocol::HTTP::Body::Stream.new(input, self))
75+
rescue Async::Stop
76+
# Ignore.
8677
end
8778

8879
# Reads chunks from the given body and writes them to the stream as fast as possible.
@@ -95,17 +86,11 @@ def passthrough(task)
9586
# chunk.clear unless chunk.frozen?
9687
# GC.start
9788
end
98-
rescue => error
99-
raise
100-
ensure
101-
# Ensure the body we are reading from is fully closed:
102-
if body = @body
103-
@body = nil
104-
body.close(error)
105-
end
10689

107-
# Ensure the output of this body is closed:
108-
self.close_write(error)
90+
self.close
91+
ensure
92+
@body&.close($!)
93+
@body = nil
10994
end
11095

11196
# Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.

‎lib/async/http/protocol/http2/stream.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def process_headers(frame)
5959

6060
# TODO this might need to be in an ensure block:
6161
if @input and frame.end_stream?
62-
@input.close_write
62+
@input.close($!)
6363
@input = nil
6464
end
6565
rescue ::Protocol::HTTP2::HeaderError => error
@@ -98,7 +98,7 @@ def process_data(frame)
9898
end
9999

100100
if frame.end_stream?
101-
@input.close_write
101+
@input.close
102102
@input = nil
103103
end
104104
end
@@ -149,7 +149,7 @@ def closed(error)
149149
super
150150

151151
if @input
152-
@input.close_write(error)
152+
@input.close(error)
153153
@input = nil
154154
end
155155

‎test/async/http/body.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
output.write(chunk.reverse)
2424
end
2525

26-
output.close_write
26+
output.close
2727
end
2828

2929
Protocol::HTTP::Response[200, [], output]
@@ -35,7 +35,7 @@
3535

3636
reactor.async do |task|
3737
output.write("Hello World!")
38-
output.close_write
38+
output.close
3939
end
4040

4141
response = client.post("/", {}, output)
@@ -58,7 +58,7 @@
5858
notification.wait
5959
end
6060

61-
body.close_write
61+
body.close
6262
end
6363

6464
Protocol::HTTP::Response[200, {}, body]

‎test/async/http/body/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
3.times do
1616
stream.write(content)
1717
end
18-
stream.close_write
18+
stream.close
1919
end
2020
end
2121

‎test/async/http/body/pipe.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
include Sus::Fixtures::Async::ReactorContext
2121

2222
let(:input_write_duration) {0}
23-
let(:io) {pipe.to_io}
23+
let(:io) { pipe.to_io }
2424

2525
def before
2626
super
@@ -31,12 +31,14 @@ def before
3131
input.write("#{first} ")
3232
sleep(input_write_duration) if input_write_duration > 0
3333
input.write(second)
34-
input.close_write
34+
input.close
3535
end
3636
end
3737

38-
after do
38+
def aftrer
3939
io.close
40+
41+
super
4042
end
4143

4244
it "returns an io socket" do

‎test/async/http/body/slowloris.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2023, by Samuel Williams.
5+
6+
require 'async/http/body/slowloris'
7+
8+
require 'sus/fixtures/async'
9+
require 'async/http/body/a_writable_body'
10+
11+
describe Async::HTTP::Body::Slowloris do
12+
include Sus::Fixtures::Async::ReactorContext
13+
14+
let(:body) {subject.new}
15+
16+
it_behaves_like Async::HTTP::Body::AWritableBody
17+
18+
it "closes body with error if throughput is not maintained" do
19+
body.write("Hello World")
20+
21+
sleep 0.1
22+
23+
expect do
24+
body.write("Hello World")
25+
end.to raise_exception(Async::HTTP::Body::Slowloris::ThroughputError, message: be =~ /Slow write/)
26+
end
27+
28+
it "doesn't close body if throughput is exceeded" do
29+
body.write("Hello World")
30+
31+
expect do
32+
body.write("Hello World")
33+
end.not.to raise_exception
34+
end
35+
end

‎test/async/http/body/writable.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2023, by Samuel Williams.
5+
6+
require 'async/http/body/slowloris'
7+
8+
require 'sus/fixtures/async'
9+
require 'async/http/body/a_writable_body'
10+
11+
describe Async::HTTP::Body::Writable do
12+
include Sus::Fixtures::Async::ReactorContext
13+
14+
let(:body) {subject.new}
15+
16+
it_behaves_like Async::HTTP::Body::AWritableBody
17+
end

‎test/async/http/proxy.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
expect(response).to be(:success?)
5858

5959
input.write(data)
60-
input.close_write
60+
input.close
6161

6262
expect(response.read).to be == data
6363
end
@@ -74,7 +74,7 @@
7474
stream.flush
7575
end
7676

77-
stream.close_write
77+
stream.close
7878
end
7979
end
8080
end

0 commit comments

Comments
 (0)
Please sign in to comment.