Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(NODE-5788): rework change stream close rejection test #4232

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 109 additions & 88 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { on, once } from 'events';
import { gte, lt } from 'semver';
import * as sinon from 'sinon';
import { PassThrough } from 'stream';
import { setTimeout } from 'timers';
import { clearTimeout, setTimeout } from 'timers';

import {
type ChangeStream,
Expand Down Expand Up @@ -780,115 +780,136 @@ describe('Change Streams', function () {
});
});

describe('should properly handle a changeStream event being processed mid-close', function () {
let client, coll, changeStream;

function write() {
return Promise.resolve()
.then(() => coll.insertOne({ a: 1 }))
.then(() => coll.insertOne({ b: 2 }));
}

function lastWrite() {
return coll.insertOne({ c: 3 });
describe('when close is called while changes are pending', function () {
let client;
let db;
let collection: Collection<{ insertCount: number }>;
let changeStream: ChangeStream<{ insertCount: number }>;
let insertInterval = undefined;
let insertCount = 0;

/** insertOne every 300ms without running the next insert before the previous one completes */
function setInsertInterval() {
// start an insert
// if first one, create a timeout and refresh
// if NOT first one, just refresh
collection?.insertOne({ insertCount: insertCount++ }).then(() => {
insertInterval ??= setTimeout(setInsertInterval, 300);
insertInterval.refresh();
});
}

beforeEach(function () {
beforeEach(async function () {
client = this.configuration.newClient();
return client.connect().then(_client => {
client = _client;
coll = client.db(this.configuration.db).collection('tester');
changeStream = coll.watch();
});
await client.connect();
db = client.db('test');
collection = db.collection('test_close');
await collection.drop().catch(() => null);
changeStream = collection.watch();

insertCount = 0;
setInsertInterval();
});

afterEach(async function () {
await changeStream?.close();
await client?.close();
coll = undefined;
changeStream = undefined;
clearTimeout(insertInterval);
await collection.drop().catch(() => null);
await client.close();

db = undefined;
client = undefined;
collection = undefined;
changeStream = undefined;
insertInterval = undefined;
insertCount = 0;
});

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset' } },
test: function () {
const read = () => {
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
this.defer(lastWrite());
const nextP = changeStream.next();
return changeStream.close().then(() => nextP);
});
};
it(
'rejects promises already returned by next',
{ requires: { topology: 'replicaset' } },
async function () {
const changes = Array.from({ length: 20 }, () => changeStream.next());
await changeStream.close();
const results = await Promise.allSettled(changes);

const statuses = results.map(({ status, reason, value }) => {
const res =
status === 'rejected'
? reason.message
: value.operationType === 'insert'
? `insert count = ${value.fullDocument.insertCount}`
: null;
return `${status}:${res}`;
});

return Promise.all([read(), write()]).then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
expect(statuses).to.deep.equal(
Array.from({ length: 20 }, () => 'rejected:ChangeStream is closed')
);
}
});

it('when invoked with callbacks', {
metadata: { requires: { topology: 'replicaset' } },
test: function (done) {
const ops = [];
changeStream.next(() => {
changeStream.next(() => {
ops.push(lastWrite());

// explicitly close the change stream after the write has begun
ops.push(changeStream.close());
);

changeStream.next(err => {
try {
expect(err)
.property('message')
.to.match(/ChangeStream is closed/);
Promise.all(ops).then(() => done(), done);
} catch (e) {
done(e);
}
});
});
it.skip(
'rejects promises already returned by next after awaiting the first one',
{ requires: { topology: 'replicaset' } },
async function () {
const changes = Array.from({ length: 20 }, () => changeStream.next());
await changes[0];
const allChanges = Promise.allSettled(changes);

await changeStream.close();

const results = await allChanges;

const statuses = results.map(({ status, reason, value }) => {
const res =
status === 'rejected'
? reason.message
: value.operationType === 'insert'
? `insert count = ${value.fullDocument.insertCount}`
: null;
return `${status}:${res}`;
});

ops.push(
write().catch(() => {
// ignore
})
);
console.log(statuses);

expect(statuses).to.deep.equal([
'fulfilled:insert count = 1',
...Array.from({ length: 19 }, () => 'rejected:ChangeStream is closed')
]);
}
});
).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic';

it.skip('when invoked using eventEmitter API', {
metadata: {
requires: { topology: 'replicaset' }
},
async test() {
const changes = on(changeStream, 'change');
await once(changeStream.cursor, 'init');
it.skip(
'rejects promises already returned by next after awaiting half of them',
{ requires: { topology: 'replicaset' } },
async function () {
const changes = Array.from({ length: 20 }, () => changeStream.next());
const allChanges = Promise.allSettled(changes);

await write();
await lastWrite().catch(() => null);
await Promise.allSettled(changes.slice(10));

let counter = 0;
await changeStream.close();

for await (const _ of changes) {
counter += 1;
if (counter === 2) {
await changeStream.close();
break;
}
}
const results = await allChanges;

const statuses = results.map(({ status, reason, value }) => {
const res =
status === 'rejected'
? reason.message
: value.operationType === 'insert'
? `insert count = ${value.fullDocument.insertCount}`
: null;
return `${status}:${res}`;
});

console.log(statuses);

const result = await Promise.race([changes.next(), sleep(800).then(() => 42)]);
expect(result, 'should not have recieved a third event').to.equal(42);
expect(statuses).to.deep.equal([
...Array.from({ length: 1 }, () => 'fulfilled:insert count = 0'),
...Array.from({ length: 19 }, () => 'fulfilled:insert count = 1')
]);
}
}).skipReason =
'This test only worked because of timing, changeStream.close does not remove the change listener';
).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic';
});

describe('iterator api', function () {
Expand Down