Skip to content

Commit a3f33e8

Browse files
authored
Another fix for connection interrupted/resumed. Add test. (#27)
1 parent 9d41512 commit a3f33e8

File tree

4 files changed

+51
-3
lines changed

4 files changed

+51
-3
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "AWSCRT"
22
uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7"
3-
version = "0.4.0"
3+
version = "0.4.1"
44

55
[deps]
66
CountDownLatches = "621fb831-fdad-4fff-93ac-1af7b7ed19e3"

src/AWSMQTT.jl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ function _c_on_connection_interrupted(
222222

223223
data = Base.unsafe_pointer_to_objref(userdata)::_OnConnectionInterruptedUserData
224224
try
225-
put!(data.ch, _OnConnectionInterruptedEvent(data.callback, error_code))
225+
put!(data.ch, _OnConnectionInterruptedEvent(data.callback, data.conn, error_code))
226226
catch ex
227227
if ex isa InvalidStateException && ex.state == :closed
228228
else
@@ -265,7 +265,12 @@ function _c_on_connection_resumed(
265265
try
266266
put!(
267267
data.ch,
268-
_OnConnectionResumedEvent(data.callback, aws_mqtt_connect_return_code(return_code), session_present != 0),
268+
_OnConnectionResumedEvent(
269+
data.callback,
270+
data.conn,
271+
aws_mqtt_connect_return_code(return_code),
272+
session_present != 0,
273+
),
269274
)
270275
catch ex
271276
if ex isa InvalidStateException && ex.state == :closed

test/interrupt_connection.jl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
@testset "interrupt and resume connection" begin
2+
topic1 = "test-topic-$(Random.randstring(6))"
3+
payload1 = Random.randstring(48)
4+
client_id1 = random_client_id()
5+
@show topic1 payload1 client_id1
6+
7+
client = MQTTClient(new_tls_ctx())
8+
connection = MQTTConnection(client)
9+
10+
interruptions = Threads.Atomic{Int}(0)
11+
resumes = Threads.Atomic{Int}(0)
12+
13+
task = connect(
14+
connection,
15+
ENV["ENDPOINT"],
16+
8883,
17+
client_id1;
18+
will = Will(topic1, AWS_MQTT_QOS_AT_LEAST_ONCE, "The client has gone offline!", false),
19+
on_connection_interrupted = (conn, error_code) -> begin
20+
Threads.atomic_add!(interruptions, 1)
21+
@warn "connection interrupted" error_code
22+
end,
23+
on_connection_resumed = (conn, return_code, session_present) -> begin
24+
Threads.atomic_add!(resumes, 1)
25+
@info "connection resumed" return_code session_present
26+
end,
27+
)
28+
@test fetch(task) == Dict(:session_present => false)
29+
30+
# kill the MQTT connection
31+
run(`sudo ss -K dport = 8883`)
32+
33+
wait_for(() -> resumes[] > 0, Timer(30))
34+
@test interruptions[] == 1
35+
@test resumes[] == 1
36+
37+
disconnect(connection)
38+
end

test/runtests.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ end)
2222
Aqua.test_all(AWSCRT, ambiguities = false)
2323
Aqua.test_ambiguities(AWSCRT)
2424
end
25+
@testset "interrupt_connection.jl" begin
26+
# running this in parallel will kill the connections used by other tests so don't do that
27+
@info "Starting interrupt_connection.jl"
28+
include("interrupt_connection.jl")
29+
end
2530
end
2631
@testset "mqtt_test.jl" begin
2732
@info "Starting mqtt_test.jl"

0 commit comments

Comments
 (0)