Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions src/invidious/helpers/helpers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel)
end
end

spawn do
begin
loop do
event = connection.receive

begin
heartbeat_timer = 0.seconds
loop do
select
when event = connection.receive
notification = JSON.parse(event.payload)
topic = notification["topic"].as_s
video_id = notification["videoId"].as_s
Expand All @@ -139,27 +139,20 @@ def create_notification_stream(env, topics, connection_channel)

env.response.puts "id: #{id}"
env.response.puts "data: #{response.to_json}"
env.response.puts
env.response.flush

id += 1
when timeout heartbeat_timer
# Send heartbeat on every timeout
env.response.puts ":keepalive #{Time.utc.to_unix}"
end
rescue ex
ensure
connection_channel.send({false, connection})
end
end

begin
# Send heartbeat
loop do
env.response.puts ":keepalive #{Time.utc.to_unix}"
heartbeat_timer = ((20 + rand(11)).seconds)
env.response.puts
env.response.flush
sleep (20 + rand(11)).seconds
end
rescue ex
ensure
connection.close
connection_channel.send({false, connection})
end
end
Expand Down
8 changes: 7 additions & 1 deletion src/invidious/jobs/notification_job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
def begin
connections = [] of ::Channel(PQ::Notification)

PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
PG.connect_listen(pg_url, "notifications") do |event|
connections.each do |channel|
channel.send(event)
rescue ::Channel::ClosedError
# Notification stream was closed.
end
end

# hash of channels to their videos (id+published) that need notifying
to_notify = Hash(String, Set(VideoNotification)).new(
Expand Down
Loading