Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis emitter #5474

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Fix redis emitter #5474

wants to merge 1 commit into from

Conversation

mifi
Copy link
Contributor

@mifi mifi commented Sep 28, 2024

it had multiple issues:

  • memory leak in removeAllListeners
  • eventName === 'error' wasn't consistently handled

I believe the memory leak might have been introduced in #4623

memory leak from prod:
Screenshot 2024-09-28 at 10 56 26

it had multiple issues:
- memory leak in removeAllListeners
- eventName === 'error' wasn't consistently handled
Copy link
Contributor

Diff output files
diff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
index 0d502d9..e64f5f0 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
@@ -1,10 +1,9 @@
 declare function _exports(redisClient: import('ioredis').Redis, redisPubSubScope: string): {
-    on: (eventName: string, handler: any) => void | EventEmitter<[never]>;
-    off: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
-    once: (eventName: string, handler: any) => void | EventEmitter<[never]>;
-    emit: (eventName: string, ...args: any[]) => void;
-    removeListener: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
-    removeAllListeners: (eventName: string) => Promise<void> | EventEmitter<[never]>;
+    on: (eventName: string, handler: any) => Promise<void>;
+    off: (eventName: string, handler: any) => Promise<void>;
+    once: (eventName: string, handler: any) => Promise<void>;
+    emit: (eventName: string, ...args: any[]) => Promise<void>;
+    removeListener: (eventName: string, handler: any) => Promise<void>;
+    removeAllListeners: (eventName: string) => Promise<void>;
 };
 export = _exports;
-import { EventEmitter } from "events";
diff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
index 62796bc..b946923 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
@@ -19,52 +19,60 @@ function replacer(key, value) {
 module.exports = (redisClient, redisPubSubScope) => {
   const prefix = redisPubSubScope ? `${redisPubSubScope}:` : "";
   const getPrefixedEventName = (eventName) => `${prefix}${eventName}`;
-  const publisher = redisClient.duplicate({ lazyConnect: true });
-  publisher.on("error", err => logger.error("publisher redis error", err.toString()));
-  /** @type {import('ioredis').Redis} */
-  let subscriber;
-  const connectedPromise = publisher.connect().then(() => {
-    subscriber = publisher.duplicate();
-    subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
-    return subscriber.connect();
-  });
-  const handlersByEvent = new Map();
   const errorEmitter = new EventEmitter();
   const handleError = (err) => errorEmitter.emit("error", err);
-  connectedPromise.catch((err) => handleError(err));
+  async function makeRedis() {
+    const publisher = redisClient.duplicate({ lazyConnect: true });
+    publisher.on("error", err => logger.error("publisher redis error", err.toString()));
+    const subscriber = publisher.duplicate();
+    subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
+    await publisher.connect();
+    await subscriber.connect();
+    return { subscriber, publisher };
+  }
+  const redisPromise = makeRedis();
+  redisPromise.catch((err) => handleError(err));
+  /**
+   * @param {(a: Awaited<typeof redisPromise>) => void} fn
+   */
   async function runWhenConnected(fn) {
     try {
-      await connectedPromise;
-      await fn();
+      await fn(await redisPromise);
     } catch (err) {
       handleError(err);
     }
   }
+  /** @type {Map<string, Map<() => unknown, () => unknown>>} */
+  const handlersByEvent = new Map();
   /**
    * Remove an event listener
    *
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event to remove
    */
-  function removeListener(eventName, handler) {
+  async function removeListener(eventName, handler) {
     if (eventName === "error") {
-      return errorEmitter.removeListener("error", handler);
+      errorEmitter.removeListener("error", handler);
+      return;
     }
-    return runWhenConnected(() => {
-      const handlersByThisEventName = handlersByEvent.get(eventName);
-      if (handlersByThisEventName == null) {
-        return undefined;
-      }
-      const actualHandler = handlersByThisEventName.get(handler);
-      if (actualHandler == null) {
-        return undefined;
-      }
-      handlersByThisEventName.delete(handler);
-      if (handlersByThisEventName.size === 0) {
-        handlersByEvent.delete(eventName);
-      }
+    const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName);
+    if (thisEventNameActualHandlerByHandler == null) {
+      return;
+    }
+    const actualHandler = thisEventNameActualHandlerByHandler.get(handler);
+    if (actualHandler == null) {
+      return;
+    }
+    thisEventNameActualHandlerByHandler.delete(handler);
+    const didRemoveLastListener = thisEventNameActualHandlerByHandler.size === 0;
+    if (didRemoveLastListener) {
+      handlersByEvent.delete(eventName);
+    }
+    await runWhenConnected(async ({ subscriber }) => {
       subscriber.off("pmessage", actualHandler);
-      return subscriber.punsubscribe(getPrefixedEventName(eventName));
+      if (didRemoveLastListener) {
+        await subscriber.punsubscribe(getPrefixedEventName(eventName));
+      }
     });
   }
   /**
@@ -72,7 +80,15 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {*} handler
    * @param {*} _once
    */
-  function addListener(eventName, handler, _once = false) {
+  async function addListener(eventName, handler, _once = false) {
+    if (eventName === "error") {
+      if (_once) {
+        errorEmitter.once("error", handler);
+      } else {
+        errorEmitter.addListener("error", handler);
+      }
+      return;
+    }
     function actualHandler(pattern, channel, message) {
       if (pattern !== getPrefixedEventName(eventName)) {
         return;
@@ -89,15 +105,15 @@ module.exports = (redisClient, redisPubSubScope) => {
       }
       handler(...args);
     }
-    let handlersByThisEventName = handlersByEvent.get(eventName);
-    if (handlersByThisEventName == null) {
-      handlersByThisEventName = new WeakMap();
-      handlersByEvent.set(eventName, handlersByThisEventName);
+    let thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName);
+    if (thisEventNameActualHandlerByHandler == null) {
+      thisEventNameActualHandlerByHandler = new Map();
+      handlersByEvent.set(eventName, thisEventNameActualHandlerByHandler);
     }
-    handlersByThisEventName.set(handler, actualHandler);
-    runWhenConnected(() => {
+    thisEventNameActualHandlerByHandler.set(handler, actualHandler);
+    await runWhenConnected(async ({ subscriber }) => {
       subscriber.on("pmessage", actualHandler);
-      return subscriber.psubscribe(getPrefixedEventName(eventName));
+      await subscriber.psubscribe(getPrefixedEventName(eventName));
     });
   }
   /**
@@ -106,11 +122,8 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function on(eventName, handler) {
-    if (eventName === "error") {
-      return errorEmitter.on("error", handler);
-    }
-    return addListener(eventName, handler);
+  async function on(eventName, handler) {
+    await addListener(eventName, handler);
   }
   /**
    * Remove an event listener
@@ -118,8 +131,8 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function off(eventName, handler) {
-    return removeListener(eventName, handler);
+  async function off(eventName, handler) {
+    await removeListener(eventName, handler);
   }
   /**
    * Add an event listener (will be triggered at most once)
@@ -127,33 +140,35 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function once(eventName, handler) {
-    if (eventName === "error") {
-      return errorEmitter.once("error", handler);
-    }
-    return addListener(eventName, handler, true);
+  async function once(eventName, handler) {
+    await addListener(eventName, handler, true);
   }
   /**
    * Announce the occurrence of an event
    *
    * @param {string} eventName name of the event
    */
-  function emit(eventName, ...args) {
-    runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer)));
+  async function emit(eventName, ...args) {
+    await runWhenConnected(async (
+      { publisher },
+    ) => (publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))));
   }
   /**
    * Remove all listeners of an event
    *
    * @param {string} eventName name of the event
    */
-  function removeAllListeners(eventName) {
+  async function removeAllListeners(eventName) {
     if (eventName === "error") {
-      return errorEmitter.removeAllListeners(eventName);
+      errorEmitter.removeAllListeners(eventName);
+      return;
+    }
+    const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName);
+    if (thisEventNameActualHandlerByHandler != null) {
+      for (const handler of thisEventNameActualHandlerByHandler.keys()) {
+        await removeListener(eventName, handler);
+      }
     }
-    return runWhenConnected(() => {
-      handlersByEvent.delete(eventName);
-      return subscriber.punsubscribe(getPrefixedEventName(eventName));
-    });
   }
   return {
     on,

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual handler by handler? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a map that maps handler -> actual handler, meaning you can get the actual handler by providing a handler.

or in TS: Map<() => void, () => void>

where the key is the handler and the value is the actualHandler.

This naming was there before and i didn't have any better suggestion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write some docs above the Map why it is needed and why it has this structure?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it more, this feels off to me? Why do we need handler -> handler? Is it because you have multiple handlers per event type? Maybe name it subscribers then and array of handlers per event name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants