Skip to content

Commit

Permalink
fix(NODE-6394): data events missed while awaiting drain (#4249)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <[email protected]>
  • Loading branch information
baileympearson and nbbeeken authored Sep 27, 2024
1 parent 681ddd8 commit 3f9d243
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
.on('error', this.onError.bind(this));
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

this.messageStream.pause();
}

public get hello() {
Expand Down Expand Up @@ -651,6 +653,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.messageStream.resume();
for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand All @@ -661,6 +664,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
} finally {
this.dataEvents = null;
this.messageStream.pause();
this.throwIfAborted();
}
}
Expand Down
181 changes: 181 additions & 0 deletions test/unit/cmap/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { Socket } from 'node:net';

import { expect } from 'chai';
import * as sinon from 'sinon';
import { setTimeout } from 'timers/promises';

import {
connect,
Connection,
isHello,
MongoClientAuthProviders,
MongoDBCollectionNamespace,
MongoNetworkTimeoutError,
ns
} from '../../mongodb';
Expand Down Expand Up @@ -142,4 +146,181 @@ describe('new Connection()', function () {
expect(beforeHandshakeSymbol).to.be.a('symbol');
expect(error).to.have.property(beforeHandshakeSymbol, true);
});

describe('NODE-6370: regression test', function () {
class MockSocket extends Socket {
override write(_data: string | Buffer) {
return false;
}
}

let socket: MockSocket;
let connection: Connection;

this.timeout(10_000);

beforeEach(function () {
socket = new MockSocket();
connection = new Connection(socket, {});
});

const validResponse = Buffer.from(
'a30000002a0800004b010000dd07000000000000008e000000016f6b00000000000000f03f0324636c757374657254696d65005800000011636c757374657254696d65001c00000093f6f266037369676e61747572650033000000056861736800140000000072d8d6eab4e0703d2d50846e2db7adb5d2733cc4126b65794964000200000026f6f2660000116f7065726174696f6e54696d65001c00000093f6f26600',
'hex'
);

const chunks = [validResponse.slice(0, 10), validResponse.slice(10)];

describe('when data is emitted before drain', function () {
describe('first command', function () {
describe('when there is no delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);
// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);

socket.emit('data', validResponse);
socket.emit('drain');

await result$;
});
});

describe('when there is a delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

await setTimeout(10);

socket.emit('drain');
await result$;
});
});

describe('when the data comes in multiple chunks', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', chunks[0]);

await setTimeout(10);
socket.emit('drain');

socket.emit('data', chunks[1]);

await result$;
});
});
});

describe('not first command', function () {
beforeEach(async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('drain');
socket.emit('data', validResponse);

await result$;
});

describe('when there is no delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

// await setTimeout(0);
// await setTimeout(10);
socket.emit('drain');
await result$;
});
});

describe('when there is a delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

await setTimeout(10);
// await setTimeout(10);
socket.emit('drain');
await result$;
});
});

describe('when the data comes in multiple chunks', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);

socket.emit('data', chunks[0]);

await setTimeout(10);

socket.emit('drain');

socket.emit('data', chunks[1]);
await result$;
});
});
});
});
});
});

0 comments on commit 3f9d243

Please sign in to comment.