-
-
Notifications
You must be signed in to change notification settings - Fork 2k
Fix leaked fiber in notification_stream_handler
#5330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Of the technically four fibers spawned by `create_notification_stream` two of them are wrapped around an ensure clause to always unsubscribe itself from the notification job after an error, or when it simply finishes. The first is the heartbeat fiber, which is also actually the main fiber of the route handler. The second is a fiber that awaits for notification pushes from the notification job through the `connection` channel. When an error occurs within the main heartbeat fiber, the ensure clause is executed and the function will unsubscribe itself from receiving any pushes from the notification job. The problem however is that this could (will almost always actually) occur when the notification receiver fiber is awaiting a value from the notification job. Except the job will no longer be able to send anything to the receiver since they were unsubscribed by the heartbeat fiber just a moment ago. The notification receiver fiber will now block indefinitely. And in doing so will pretty much prevent the entire execution stack of the fiber and the `create_notification_stream` function from getting garbage collected. The IO buffers for the contents of the request and response will stay referenced, the underlying TCP/TLS sockets will become inaccessible and leaked, the parsed structures of the YT's massive JSON objects will stay allocated, etc. This PR simply merges the two into a single fiber, via a select statement ensuring that there will be no concurrency problems.
For reference here's a quick benchmark I did with and without this fix (plus the patch below to tell Invidious to run Before the changes here:
Afterwards:
Make sure to increase Patchdiff --git a/src/invidious/helpers/handlers.cr b/src/invidious/helpers/handlers.cr
index 13ea9fe9..55f4066b 100644
--- a/src/invidious/helpers/handlers.cr
+++ b/src/invidious/helpers/handlers.cr
@@ -95,6 +95,9 @@ class AuthHandler < Kemal::Handler
return call_next env unless only_match? env
begin
+ # Bypass database call for the sake of testing the leak
+ call_next env
+
if token = env.request.headers["Authorization"]?
token = JSON.parse(URI.decode_www_form(token.lchop("Bearer ")))
session = URI.decode_www_form(token["session"].as_s)
diff --git a/src/invidious/routes/before_all.cr b/src/invidious/routes/before_all.cr
index b5269668..8ea304a9 100644
--- a/src/invidious/routes/before_all.cr
+++ b/src/invidious/routes/before_all.cr
@@ -66,6 +66,8 @@ module Invidious::Routes::BeforeAll
}.any? { |r| env.request.resource.starts_with? r }
if env.request.cookies.has_key? "SID"
+ # Bypass database call for the sake of testing the leak
+ return
sid = env.request.cookies["SID"].value
if sid.starts_with? "v1:"
diff --git a/src/invidious/routes/misc.cr b/src/invidious/routes/misc.cr
index 0b868755..45103e2e 100644
--- a/src/invidious/routes/misc.cr
+++ b/src/invidious/routes/misc.cr
@@ -38,6 +38,13 @@ module Invidious::Routes::Misc
rendered "licenses"
end
+ def self.call_gc(env)
+ 20.times do
+ GC.collect
+ 0.20.seconds
+ end
+ end
+
def self.cross_instance_redirect(env)
referer = get_referer(env)
diff --git a/src/invidious/routing.cr b/src/invidious/routing.cr
index 46b71f1f..8d64b5b5 100644
--- a/src/invidious/routing.cr
+++ b/src/invidious/routing.cr
@@ -21,6 +21,7 @@ module Invidious::Routing
get "/privacy", Routes::Misc, :privacy
get "/licenses", Routes::Misc, :licenses
get "/redirect", Routes::Misc, :cross_instance_redirect
+ get "/call_gc", Routes::Misc, :call_gc
self.register_channel_routes
self.register_watch_routes Scriptrequire "http"
require "wait_group"
HEADERS = HTTP::Headers{
"Connection" => "Keep-Alive",
}
COOKIES = HTTP::Cookies.new
COOKIES.add_request_headers(HEADERS)
CLIENTS_MADE = [] of HTTP::Client
BATCH_SIZE = 500
BATCH_COUNT = 3
TARGET = URI.parse(<<INSERT URI>>)
# Used to modify program state during execution
module Global
class_property? exit_request = false
class_property! await_cancellation : WaitGroup
end
def make_client_and_request
client = HTTP::Client.new(TARGET)
client.get("/api/v1/auth/notifications", headers: HEADERS) do |response|
CLIENTS_MADE << client
Fiber.yield
until response.body_io.closed?
if Global.exit_request?
return Global.await_cancellation.done
else
response.body_io.gets
end
end
end
ensure
client.try &.close
end
BATCH_COUNT.times do |current_cycle|
BATCH_SIZE.times { spawn { make_client_and_request } }
while CLIENTS_MADE.size < (BATCH_SIZE * (current_cycle + 1))
puts "Batch #{current_cycle + 1}: #{CLIENTS_MADE.size}/#{BATCH_SIZE * (current_cycle + 1)} made"
sleep 2.seconds
end
puts "Batch #{current_cycle + 1}: Finished creating #{CLIENTS_MADE.size}/#{BATCH_SIZE * (current_cycle + 1)} clients"
puts "#{BATCH_COUNT - (current_cycle + 1)} more batches left to go | #{CLIENTS_MADE.size}/#{BATCH_COUNT * BATCH_SIZE}"
sleep 5.seconds
Global.exit_request = true
await_cancellation = WaitGroup.new(BATCH_SIZE)
Global.await_cancellation = await_cancellation
puts "Canceling all connections to prepare for the next batch."
await_cancellation.wait
Global.exit_request = false
puts "Done. Starting next batch\n\n"
end
puts "Finished making #{BATCH_COUNT * BATCH_SIZE} requests"
sleep
exit |
I don't believe that closing Fiber channels are strictly necessary but it doesn't hurt to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, nice catch!
@Fijxu would you mind trying it just to be safe? Then it will be good to merge. |
Of the technically four fibers spawned by
create_notification_stream
two of them are wrapped around an ensure clause to always unsubscribe themselves from the notification job after an error.The first is the heartbeat fiber, which is also actually the main fiber of the route handler. The second is a fiber that awaits for notification pushes from the notification job through the
connection
channel. When an error occurs within the main heartbeat fiber, the ensure clause is executed and the function will unsubscribe itself from receiving any pushes from the notification job.The problem however is that this could (will almost always actually) occur when the notification receiver fiber is awaiting a value from the notification job. Except the job will no longer be able to send anything to the receiver since they were unsubscribed by the heartbeat fiber just a moment ago.
The notification receiver fiber will now block indefinitely.
And in doing so will pretty much prevent the entire execution stack of the fiber and the
create_notification_stream
function from getting garbage collected.The IO buffers for the contents of the request and response will stay referenced, the underlying TCP/TLS sockets will become inaccessible and leaked, the parsed structures of the YT's massive JSON objects will stay allocated, etc.
This PR simply merges the two into a single fiber, via a select statement ensuring that there will be no concurrency problems.
TL;DR: A dangling fiber was blocking indefinitely which prevented a bunch of stuff from being garbage collected.