Skip to content

Commit c267258

Browse files
author
Dogan AY
committed
feat(sse): implement Sinatra SSE + improvement on connection handling
1 parent 7061c57 commit c267258

File tree

4 files changed

+514
-28
lines changed
  • packages
    • forest_admin_datasource_rpc
    • forest_admin_rpc_agent
      • lib/forest_admin_rpc_agent/routes
      • spec/lib/forest_admin_rpc_agent/routes

4 files changed

+514
-28
lines changed

packages/forest_admin_datasource_rpc/lib/forest_admin_datasource_rpc/Utils/sse_client.rb

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
module ForestAdminDatasourceRpc
77
module Utils
88
class SseClient
9+
attr_reader :closed
10+
911
def initialize(uri, auth_secret, &on_rpc_stop)
1012
@uri = uri
1113
@auth_secret = auth_secret
1214
@on_rpc_stop = on_rpc_stop
1315
@client = nil
1416
@closed = false
17+
@connection_attempts = 0
1518
end
1619

1720
def start
@@ -26,26 +29,47 @@ def start
2629
'X_SIGNATURE' => signature
2730
}
2831

29-
ForestAdminRpcAgent::Facades::Container.logger.log('Debug', "Connecting to SSE at #{@uri}.")
32+
@connection_attempts += 1
33+
ForestAdminRpcAgent::Facades::Container.logger&.log(
34+
'Debug',
35+
"[SSE Client] Connecting to #{@uri} (attempt ##{@connection_attempts})"
36+
)
3037

31-
@client = SSE::Client.new(@uri, headers: headers) do |client|
32-
client.on_event do |event|
33-
handle_event(event)
34-
end
38+
begin
39+
@client = SSE::Client.new(@uri, headers: headers) do |client|
40+
client.on_event do |event|
41+
handle_event(event)
42+
end
3543

36-
client.on_error do |err|
37-
# TODO: optimisation on client close
38-
# ForestAdminRpcAgent::Facades::Container.logger.log('Warn', "[SSE] Error: #{err.class} - #{err.message}")
44+
client.on_error do |err|
45+
handle_error(err)
46+
end
3947
end
48+
49+
ForestAdminRpcAgent::Facades::Container.logger&.log('Debug', '[SSE Client] Connected successfully')
50+
rescue StandardError => e
51+
ForestAdminRpcAgent::Facades::Container.logger&.log(
52+
'Error',
53+
"[SSE Client] Failed to connect: #{e.class} - #{e.message}"
54+
)
55+
raise
4056
end
4157
end
4258

4359
def close
4460
return if @closed
4561

4662
@closed = true
47-
@client&.close
48-
# ForestAdminRpcAgent::Facades::Container.logger.log('Debug', '[SSE] Client closed')
63+
ForestAdminRpcAgent::Facades::Container.logger&.log('Debug', '[SSE Client] Closing connection')
64+
65+
begin
66+
@client&.close
67+
rescue StandardError => e
68+
ForestAdminRpcAgent::Facades::Container.logger&.log('Debug',
69+
"[SSE Client] Error during close: #{e.message}")
70+
end
71+
72+
ForestAdminRpcAgent::Facades::Container.logger&.log('Debug', '[SSE Client] Connection closed')
4973
end
5074

5175
private
@@ -56,14 +80,46 @@ def handle_event(event)
5680

5781
case type
5882
when 'heartbeat'
59-
# ForestAdminRpcAgent::Facades::Container.logger.log('Debug', '[SSE] Heartbeat')
83+
# Heartbeat received - connection is alive
6084
when 'RpcServerStop'
61-
# ForestAdminRpcAgent::Facades::Container.logger.log('Debug', '[SSE] RpcServerStop received')
62-
@on_rpc_stop&.call
85+
ForestAdminRpcAgent::Facades::Container.logger&.log('Debug', '[SSE Client] RpcServerStop received')
86+
handle_rpc_stop
6387
else
64-
ForestAdminRpcAgent::Facades::Container.logger.log('Debug',
65-
"[SSE] Unknown event: #{type} with payload: #{data}")
88+
ForestAdminRpcAgent::Facades::Container.logger&.log(
89+
'Debug',
90+
"[SSE Client] Unknown event: #{type} with payload: #{data}"
91+
)
6692
end
93+
rescue StandardError => e
94+
ForestAdminRpcAgent::Facades::Container.logger&.log(
95+
'Error',
96+
"[SSE Client] Error handling event: #{e.class} - #{e.message}"
97+
)
98+
end
99+
100+
def handle_error(err)
101+
# Ignore errors when client is intentionally closed
102+
return if @closed
103+
104+
error_message = case err
105+
when EOFError, IOError
106+
"Connection lost: #{err.class}"
107+
when StandardError
108+
"#{err.class} - #{err.message}"
109+
else
110+
err.to_s
111+
end
112+
113+
ForestAdminRpcAgent::Facades::Container.logger&.log('Warn', "[SSE Client] Error: #{error_message}")
114+
end
115+
116+
def handle_rpc_stop
117+
@on_rpc_stop&.call
118+
rescue StandardError => e
119+
ForestAdminRpcAgent::Facades::Container.logger&.log(
120+
'Error',
121+
"[SSE Client] Error in RPC stop callback: #{e.class} - #{e.message}"
122+
)
67123
end
68124

69125
def generate_signature(timestamp)

packages/forest_admin_datasource_rpc/spec/lib/forest_admin_datasource_rpc/utils/sse_client_spec.rb

Lines changed: 223 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,27 @@ module Utils
1212
allow(ForestAdminRpcAgent::Facades::Container).to receive(:logger).and_return(logger)
1313
end
1414

15+
describe '#initialize' do
16+
it 'initializes with correct attributes' do
17+
client = described_class.new(uri, secret) { callback.call }
18+
19+
expect(client.instance_variable_get(:@uri)).to eq(uri)
20+
expect(client.instance_variable_get(:@auth_secret)).to eq(secret)
21+
expect(client.instance_variable_get(:@closed)).to be false
22+
expect(client.instance_variable_get(:@connection_attempts)).to eq(0)
23+
end
24+
25+
it 'exposes closed status via attr_reader' do
26+
client = described_class.new(uri, secret) { callback.call }
27+
28+
expect(client.closed).to be false
29+
30+
client.close
31+
32+
expect(client.closed).to be true
33+
end
34+
end
35+
1536
describe '#start' do
1637
it 'connects to SSE with the expected headers' do
1738
fake_client = instance_double(SSE::Client)
@@ -49,6 +70,63 @@ module Utils
4970
# finally, we check that it didn't try to connect
5071
expect(SSE::Client).not_to have_received(:new)
5172
end
73+
74+
it 'increments connection attempts counter' do
75+
fake_client = instance_double(SSE::Client)
76+
allow(SSE::Client).to receive(:new).and_yield(fake_client).and_return(fake_client)
77+
allow(fake_client).to receive(:on_event)
78+
allow(fake_client).to receive(:on_error)
79+
80+
client = described_class.new(uri, secret) { callback.call }
81+
82+
expect(client.instance_variable_get(:@connection_attempts)).to eq(0)
83+
84+
client.start
85+
86+
expect(client.instance_variable_get(:@connection_attempts)).to eq(1)
87+
88+
# Simulate reconnection
89+
client.instance_variable_set(:@closed, false)
90+
client.start
91+
92+
expect(client.instance_variable_get(:@connection_attempts)).to eq(2)
93+
end
94+
95+
it 'logs connection attempt with attempt number' do
96+
fake_client = instance_double(SSE::Client)
97+
allow(SSE::Client).to receive(:new).and_yield(fake_client)
98+
allow(fake_client).to receive(:on_event)
99+
allow(fake_client).to receive(:on_error)
100+
101+
client = described_class.new(uri, secret) { callback.call }
102+
client.start
103+
104+
expect(logger).to have_received(:log).with('Debug', /Connecting to.*attempt #1/)
105+
end
106+
107+
it 'logs successful connection' do
108+
fake_client = instance_double(SSE::Client)
109+
allow(SSE::Client).to receive(:new).and_yield(fake_client)
110+
allow(fake_client).to receive(:on_event)
111+
allow(fake_client).to receive(:on_error)
112+
113+
client = described_class.new(uri, secret) { callback.call }
114+
client.start
115+
116+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] Connected successfully')
117+
end
118+
119+
it 'logs and re-raises connection errors' do
120+
allow(SSE::Client).to receive(:new).and_raise(StandardError, 'Connection failed')
121+
122+
client = described_class.new(uri, secret) { callback.call }
123+
124+
expect do
125+
client.start
126+
end.to raise_error(StandardError, 'Connection failed')
127+
128+
expect(logger).to have_received(:log).with('Error', /Failed to connect.*StandardError/)
129+
end
52130
end
53131

54132
describe '#close' do
@@ -71,6 +149,45 @@ module Utils
71149

72150
expect { client.close }.not_to raise_error
73151
end
152+
153+
it 'logs closing and closed messages' do
154+
fake_client = instance_double(SSE::Client, close: true)
155+
allow(SSE::Client).to receive(:new).and_return(fake_client)
156+
allow(fake_client).to receive(:on_event)
157+
allow(fake_client).to receive(:on_error)
158+
159+
client = described_class.new(uri, secret) { callback.call }
160+
client.start
161+
client.close
162+
163+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] Closing connection')
164+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] Connection closed')
165+
end
166+
167+
it 'handles errors during close gracefully' do
168+
fake_client = instance_double(SSE::Client)
169+
allow(SSE::Client).to receive(:new).and_return(fake_client)
170+
allow(fake_client).to receive(:on_event)
171+
allow(fake_client).to receive(:on_error)
172+
allow(fake_client).to receive(:close).and_raise(StandardError, 'Close failed')
173+
174+
client = described_class.new(uri, secret) { callback.call }
175+
client.start
176+
177+
expect { client.close }.not_to raise_error
178+
179+
expect(logger).to have_received(:log).with('Debug', /Error during close/)
180+
end
181+
182+
it 'sets closed flag to true' do
183+
client = described_class.new(uri, secret) { callback.call }
184+
185+
expect(client.closed).to be false
186+
187+
client.close
188+
189+
expect(client.closed).to be true
190+
end
74191
end
75192

76193
describe '#handle_event' do
@@ -99,7 +216,112 @@ module Utils
99216
event = Struct.new(:type, :data).new('FooEvent', 'hello')
100217
client.send(:handle_event, event)
101218

102-
expect(logger).to have_received(:log).with('Debug', '[SSE] Unknown event: FooEvent with payload: hello')
219+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] Unknown event: FooEvent with payload: hello')
220+
end
221+
222+
it 'logs RpcServerStop event when received' do
223+
client = described_class.new(uri, secret) { callback.call }
224+
225+
event = Struct.new(:type, :data).new('RpcServerStop', '')
226+
client.send(:handle_event, event)
227+
228+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] RpcServerStop received')
229+
end
230+
231+
it 'handles errors during event processing' do
232+
client = described_class.new(uri, secret) { callback.call }
233+
234+
event = Struct.new(:type, :data).new('invalid', nil)
235+
allow(event).to receive(:type).and_raise(StandardError, 'Event parsing failed')
236+
237+
expect { client.send(:handle_event, event) }.not_to raise_error
238+
239+
expect(logger).to have_received(:log).with('Error', /Error handling event/)
240+
end
241+
242+
it 'strips whitespace from event type and data' do
243+
client = described_class.new(uri, secret) { callback.call }
244+
245+
event = Struct.new(:type, :data).new(' FooEvent ', ' data ')
246+
client.send(:handle_event, event)
247+
248+
expect(logger).to have_received(:log).with('Debug', '[SSE Client] Unknown event: FooEvent with payload: data')
249+
end
250+
end
251+
252+
describe '#handle_error' do
253+
it 'logs connection errors' do
254+
client = described_class.new(uri, secret) { callback.call }
255+
256+
error = StandardError.new('Connection interrupted')
257+
client.send(:handle_error, error)
258+
259+
expect(logger).to have_received(:log).with('Warn', /Error: StandardError - Connection interrupted/)
260+
end
261+
262+
it 'identifies EOFError as connection lost' do
263+
client = described_class.new(uri, secret) { callback.call }
264+
265+
error = EOFError.new
266+
client.send(:handle_error, error)
267+
268+
expect(logger).to have_received(:log).with('Warn', /Connection lost: EOFError/)
269+
end
270+
271+
it 'identifies IOError as connection lost' do
272+
client = described_class.new(uri, secret) { callback.call }
273+
274+
error = IOError.new
275+
client.send(:handle_error, error)
276+
277+
expect(logger).to have_received(:log).with('Warn', /Connection lost: IOError/)
278+
end
279+
280+
it 'does not log errors when client is closed' do
281+
client = described_class.new(uri, secret) { callback.call }
282+
client.close
283+
284+
error = StandardError.new('Error after close')
285+
client.send(:handle_error, error)
286+
287+
expect(logger).not_to have_received(:log).with('Warn', anything)
288+
end
289+
end
290+
291+
describe '#handle_rpc_stop' do
292+
it 'executes the callback safely' do
293+
client = described_class.new(uri, secret) { callback.call }
294+
295+
client.send(:handle_rpc_stop)
296+
297+
expect(callback).to have_received(:call)
298+
end
299+
300+
it 'handles callback errors without crashing' do
301+
error_callback = proc { raise StandardError, 'Callback failed' }
302+
client = described_class.new(uri, secret, &error_callback)
303+
304+
expect { client.send(:handle_rpc_stop) }.not_to raise_error
305+
306+
expect(logger).to have_received(:log).with('Error', /Error in RPC stop callback/)
307+
end
308+
309+
it 'handles nil callback gracefully' do
310+
client = described_class.new(uri, secret)
311+
312+
expect { client.send(:handle_rpc_stop) }.not_to raise_error
313+
end
314+
end
315+
316+
describe '#generate_signature' do
317+
it 'generates correct HMAC signature' do
318+
client = described_class.new(uri, secret) { callback.call }
319+
timestamp = '2025-01-01T12:00:00Z'
320+
321+
signature = client.send(:generate_signature, timestamp)
322+
323+
expected_signature = OpenSSL::HMAC.hexdigest('SHA256', secret, timestamp)
324+
expect(signature).to eq(expected_signature)
103325
end
104326
end
105327
end

0 commit comments

Comments
 (0)