Skip to content

Commit

Permalink
Implement event listener priority queue for optimized event handling (#…
Browse files Browse the repository at this point in the history
…45)

* feat: performance metrics

* removed temp benchmark

* Implement event listener priority queue for optimized event handling (#44)

* refactor: Implement event listener priority queue for optimized event handling

* refactor: Implement event listener priority queue for optimized event handling

* refactor: improved test for complex scenario

* removed temp benchmark

* version and deps

* refactor: Implement event listener priority queue for optimized event handling
  • Loading branch information
valehasadli authored Nov 22, 2023
1 parent 1f9c185 commit 5c0f0e0
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 129 deletions.
44 changes: 44 additions & 0 deletions benchmarks/loadTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const Emitter = require('../dist/core/Emitter').default;

const userEmitter = new Emitter();

// High-priority listener for userLoggedIn
userEmitter.subscribe('userLoggedIn', (username) => {
console.log(`[High Priority] ${username} logged in`);
}, 100);

// Medium-priority listener for userLoggedIn
userEmitter.subscribe('userLoggedIn', (username) => {
for (let i = 0; i < 1000; i++) {} // Simulate processing delay
console.log(`[Medium Priority] Welcome message sent to ${username}`);
}, 50);

// Low-priority listener for userLoggedIn
userEmitter.subscribe('userLoggedIn', (username) => {
for (let i = 0; i < 10000; i++) {} // Simulate heavier task
console.log(`[Low Priority] Analytics updated for ${username}`);
}, 10);

// Additional listeners for systemCheck and dataUpdate
userEmitter.subscribe('systemCheck', (message) => {
console.log(`[System Check] ${message}`);
});

userEmitter.subscribe('dataUpdate', (message) => {
console.log(`[Data Update] ${message}`);
});

console.time('emitLoop');

// Emitting a mix of events
for (let i = 0; i < 1000000; i++) {
userEmitter.emit('userLoggedIn', `User${i}`);
if (i % 100 === 0) {
userEmitter.emit('systemCheck', `System Check at ${i}`);
userEmitter.emit('dataUpdate', `Data Update at ${i}`);
}
}

console.timeEnd('emitLoop');

module.exports = Emitter;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "blink-hub",
"version": "0.3.8",
"version": "0.3.9",
"description": "A versatile and efficient event-handling library designed for the modern JS/TS ecosystem.",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
25 changes: 12 additions & 13 deletions src/core/EventRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import { Listener } from "../types";
import PriorityQueue from "./PriorityQueue";

export class EventRegistry<T extends Record<string, (...args: any[]) => void>> {
private events: Partial<Record<keyof T, Set<Listener<T[keyof T]>>>> = {};
private events: Partial<Record<keyof T, PriorityQueue<T[keyof T]>>> = {};

subscribe<K extends keyof T>(name: K, callback: T[K], priority: number = 0): () => void {
if (!this.events[name]) {
this.events[name] = new Set();
this.events[name] = new PriorityQueue<T[keyof T]>();
}

const listener: Listener<T[K]> = { callback, priority };

this.events[name] = new Set([...this.events[name]!]
.concat(listener)
.sort((a, b) => b.priority - a.priority));
this.events[name]!.enqueue(listener);

return (): void => {
this.events[name]?.forEach(listener => {
if (listener.callback === callback) {
this.events[name]?.delete(listener);
}
});
this.events[name]?.remove(listener);
};
}

Expand Down Expand Up @@ -62,13 +56,18 @@ export class EventRegistry<T extends Record<string, (...args: any[]) => void>> {
return [];
}

const listeners = this.events[name]!;
const results: (ReturnType<T[K]> | Error)[] = [];
const listeners = this.events[name]!.getListeners();

for (const listener of listeners) {
try {
// Call the listener and capture any return value
const result = listener.callback(...args);
results.push(result as ReturnType<T[K]>);

// If result is undefined (void), we don't push it to results
if (result !== undefined) {
results.push(result as ReturnType<T[K]>);
}
} catch (error) {
results.push(error as Error);
}
Expand Down
26 changes: 26 additions & 0 deletions src/core/PriorityQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Listener } from "../types";

export default class PriorityQueue<T extends (...args: any[]) => any> {
private items: Array<Listener<T>> = [];

enqueue(listener: Listener<T>): void {
let index = this.items.findIndex(item => item.priority < listener.priority);
index = index === -1 ? this.items.length : index;
this.items.splice(index, 0, listener);
}

getListeners(): Array<Listener<T>> {
return [...this.items];
}

remove(listenerToRemove: Listener<T>): void {
const index = this.items.findIndex(listener =>
listener.callback === listenerToRemove.callback &&
listener.priority === listenerToRemove.priority
);

if (index !== -1) {
this.items.splice(index, 1);
}
}
}
4 changes: 1 addition & 3 deletions src/types/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { Callback } from "./Callback";

export type Listener<T extends Callback<any[]>> = {
export type Listener<T> = {
callback: T;
priority: number;
};
Loading

0 comments on commit 5c0f0e0

Please sign in to comment.