Skip to content

Commit

Permalink
Fix reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
joshbetz committed Nov 22, 2022
1 parent 4687587 commit 24778d5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 51 deletions.
33 changes: 14 additions & 19 deletions src/hashpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export default class HashPool extends EventEmitter {
socketTimeout: 1000,
}, opts );

this.opts.forwardPoolErrors = true;

this.retries = 0;
this.isReady = false;
this.hashring = new HashRing();
Expand All @@ -55,20 +57,19 @@ export default class HashPool extends EventEmitter {
}

const [ host, port ] = node.split( ':' );
let pool: Pool;
try {
pool = new Pool( parseInt( port, 10 ), host, this.opts );
} catch ( error ) {
return;
}

const pool = new Pool( parseInt( port, 10 ), host, this.opts );
pool.on( 'error', ( error: NodeJS.ErrnoException ) => {
const host = this.nodes.get( node );
if ( error?.code === 'ECONNREFUSED' && !host?.reconnecting ) {
if ( !host || host.reconnecting ) {
return;
}

if ( error?.code === 'ECONNREFUSED' ) {
// If the connection is closed, remove it and reconnect immediately
return this.disconnect( node );
}

if ( host && !host.reconnecting && host.errors++ > this.opts.failures ) {
if ( host.errors++ > this.opts.failures ) {
return this.disconnect( node );
}
} );
Expand All @@ -87,21 +88,15 @@ export default class HashPool extends EventEmitter {
this.isReady = true;
this.emit( 'ready' );
} )
.catch( ( error: NodeJS.ErrnoException ) => {
if ( error?.code === 'ECONNREFUSED' ) {
// This is already handled by the event emitter
return;
}

this.nodes.delete( node );
this.reconnect( node );
.catch( () => {
this.disconnect( node );
} );
}

reconnect( node: string ) {
setTimeout( () => {
this.connect( node );
}, this.opts.retry( this.retries++ ) ).unref();
}, this.opts.retry( this.retries++ ) );
}

disconnect( node: string, reconnect = true ) {
Expand Down Expand Up @@ -133,7 +128,7 @@ export default class HashPool extends EventEmitter {
return new Promise<void>( ( resolve, reject ) => {
const timeout = setTimeout( () => {
reject( new Error( 'No hosts' ) );
}, this.opts.socketTimeout ).unref();
}, this.opts.timeout ).unref();

this.once( 'ready', () => {
clearTimeout( timeout );
Expand Down
4 changes: 2 additions & 2 deletions src/memcached.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default class Memcached extends EventEmitter {
this.client.setTimeout( this.opts.socketTimeout, () => {
this.emit( 'error', new Error( 'Socket Timeout' ) );
this.client.destroy();
} ).unref();
} );
this.client.once( 'connect', () => this.client.setTimeout( 0 ) );
this.client.once( 'ready', () => { this.isReady = true; } );

Expand Down Expand Up @@ -206,7 +206,7 @@ export default class Memcached extends EventEmitter {

async end(): Promise<void> {
return new Promise( resolve => {
const timeout = setTimeout( this.client.destroy, this.opts.socketTimeout );
const timeout = setTimeout( () => this.client.destroy(), this.opts.socketTimeout );
this.client.once( 'close', () => {
clearTimeout( timeout );
resolve();
Expand Down
39 changes: 9 additions & 30 deletions src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ export default class Pool extends EventEmitter {
opts: any;
pool: GenericPool<Memcached>;
failures: number;
closing: boolean;

constructor( port: number, host: string, opts?: any ) {
super();

this.failures = 0;
this.closing = false;

this.opts = Object.assign( {
failures: 5,
forwardPoolErrors: false,

// Pool options
max: 10,
Expand All @@ -38,10 +37,15 @@ export default class Pool extends EventEmitter {
this.pool = createPool( {
create: async () => {
const memcached = new Memcached( port, host, this.opts );
if ( this.opts.forwardPoolErrors ) {
memcached.on( 'error', ( error: Error ) => this.emit( 'error', error ) );
}

await memcached.ready();
return memcached;
},
destroy: async ( memcached: Memcached ) => {
memcached.removeAllListeners();
return memcached.end();
},
validate: async ( memcached: Memcached ) => {
Expand All @@ -55,41 +59,17 @@ export default class Pool extends EventEmitter {
}

async ready() {
if ( this.closing ) {
throw new Error( 'Closing' );
}

if ( this.pool.available >= this.pool.min ) {
return true;
}

return new Promise( ( resolve, reject ) => {
const timeout = setTimeout( () => reject( new Error( 'Timeout' ) ), this.opts.timeout ).unref();

const isReady = () => {
if ( this.pool.available >= this.pool.min ) {
clearTimeout( timeout );
resolve( true );
} else {
setTimeout( isReady, 100 ).unref();
}
};

isReady();
} );
return this.pool.ready();
}

async use( fn: ( client: Memcached ) => Promise<any> ): Promise<any> {
let client;
let value;
try {
client = await this.pool.acquire();
value = await this.pool.use( fn );
} catch ( error ) {
return false;
}

const value = await fn( client );
await this.pool.release( client );

return value;
}

Expand Down Expand Up @@ -122,7 +102,6 @@ export default class Pool extends EventEmitter {
}

async end() {
this.closing = true;
await this.pool.drain();
await this.pool.clear();
}
Expand Down

0 comments on commit 24778d5

Please sign in to comment.