Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis emitter #5474

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 75 additions & 54 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,68 @@ function replacer(key, value) {
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
/** @type {import('ioredis').Redis} */
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

const handlersByEvent = new Map()

const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)

connectedPromise.catch((err) => handleError(err))
async function makeRedis() {
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
const subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
await publisher.connect()
await subscriber.connect()
return { subscriber, publisher }
}

const redisPromise = makeRedis()
redisPromise.catch((err) => handleError(err))

/**
*
* @param {(a: Awaited<typeof redisPromise>) => void} fn
*/
async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
await fn(await redisPromise)
} catch (err) {
handleError(err)
}
}

/** @type {Map<string, Map<() => unknown, () => unknown>>} */
const handlersByEvent = new Map()

/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
if (eventName === 'error') return errorEmitter.removeListener('error', handler)
async function removeListener (eventName, handler) {
if (eventName === 'error') {
errorEmitter.removeListener('error', handler)
return
}

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual handler by handler? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a map that maps handler -> actual handler, meaning you can get the actual handler by providing a handler.

or in TS: Map<() => void, () => void>

where the key is the handler and the value is the actualHandler.

This naming was there before and i didn't have any better suggestion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write some docs above the Map why it is needed and why it has this structure?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it more, this feels off to me? Why do we need handler -> handler? Is it because you have multiple handlers per event type? Maybe name it subscribers then and array of handlers per event name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the api allows you to removeListener for a specific handler, then we need to keep a reference to that handler. i believe the redis-emitter's api mimics the EventEmitter API, and that api allows removing a specific handler from an event name. That's why it's implemented like this. We could probably change the API but that's a bigger change and potentially breaking, so maybe that could be done in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment and renamed the variables a bit

if (thisEventNameActualHandlerByHandler == null) return

const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined
const actualHandler = thisEventNameActualHandlerByHandler.get(handler)
if (actualHandler == null) return

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)
thisEventNameActualHandlerByHandler.delete(handler)

const didRemoveLastListener = thisEventNameActualHandlerByHandler.size === 0
if (didRemoveLastListener) {
handlersByEvent.delete(eventName)
}

await runWhenConnected(async ({ subscriber }) => {
subscriber.off('pmessage', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
if (didRemoveLastListener) {
await subscriber.punsubscribe(getPrefixedEventName(eventName))
}
})
}

Expand All @@ -77,7 +91,13 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {*} handler
* @param {*} _once
*/
function addListener (eventName, handler, _once = false) {
async function addListener (eventName, handler, _once = false) {
if (eventName === 'error') {
if (_once) errorEmitter.once('error', handler)
else errorEmitter.addListener('error', handler)
return
}

function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
Expand All @@ -91,19 +111,20 @@ module.exports = (redisClient, redisPubSubScope) => {
handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
return
}

handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
let thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName)
if (thisEventNameActualHandlerByHandler == null) {
thisEventNameActualHandlerByHandler = new Map()
handlersByEvent.set(eventName, thisEventNameActualHandlerByHandler)
}
handlersByThisEventName.set(handler, actualHandler)
thisEventNameActualHandlerByHandler.set(handler, actualHandler)

runWhenConnected(() => {
await runWhenConnected(async ({ subscriber }) => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
await subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

Expand All @@ -113,10 +134,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
if (eventName === 'error') return errorEmitter.on('error', handler)

return addListener(eventName, handler)
async function on (eventName, handler) {
await addListener(eventName, handler)
}

/**
Expand All @@ -125,8 +144,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function off (eventName, handler) {
return removeListener(eventName, handler)
async function off (eventName, handler) {
await removeListener(eventName, handler)
}

/**
Expand All @@ -135,36 +154,38 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
if (eventName === 'error') return errorEmitter.once('error', handler)

return addListener(eventName, handler, true)
async function once (eventName, handler) {
await addListener(eventName, handler, true)
}

/**
* Announce the occurrence of an event
*
* @param {string} eventName name of the event
*/
function emit (eventName, ...args) {
runWhenConnected(
() => publisher.publish(getPrefixedEventName(eventName),
safeStringify(args, replacer)),
)
async function emit (eventName, ...args) {
await runWhenConnected(async ({ publisher }) => (
publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))
))
}

/**
* Remove all listeners of an event
*
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)
async function removeAllListeners (eventName) {
if (eventName === 'error') {
errorEmitter.removeAllListeners(eventName)
return
}

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName)
if (thisEventNameActualHandlerByHandler != null) {
for (const handler of thisEventNameActualHandlerByHandler.keys()) {
await removeListener(eventName, handler)
}
}
}

return {
Expand Down
Loading