Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6398): bulkWrite internals to use async/await #4252

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 80 additions & 120 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
MongoBatchReExecutionError,
MONGODB_ERROR_CODES,
MongoInvalidArgumentError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand All @@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import {
applyRetryableWrites,
type Callback,
getTopology,
hasAtomicOperators,
maybeAddIdToDocuments,
Expand Down Expand Up @@ -500,86 +498,46 @@ export function mergeBatchResults(
}
}

function executeCommands(
async function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions,
callback: Callback<BulkWriteResult>
) {
options: BulkWriteOptions
): Promise<BulkWriteResult> {
if (bulkOperation.s.batches.length === 0) {
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
}

const batch = bulkOperation.s.batches.shift() as Batch;
for (const batch of bulkOperation.s.batches) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

function resultHandler(err?: AnyError, result?: Document) {
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
executeCommands(bulkOperation, options, callback);
}

const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}

// Set an operationIf if provided
if (bulkOperation.operationId) {
resultHandler.operationId = bulkOperation.operationId;
}

// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.multi);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

try {
const operation = isInsertBatch(batch)
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: isUpdateBatch(batch)
Expand All @@ -588,39 +546,50 @@ function executeCommands(
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: null;

if (operation != null) {
executeOperation(bulkOperation.s.collection.client, operation).then(
result => resultHandler(undefined, result),
error => resultHandler(error)
);
if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

let thrownError = null;
let result;
try {
result = await executeOperation(bulkOperation.s.collection.client, operation);
} catch (error) {
thrownError = error;
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

if (thrownError != null) {
if (thrownError instanceof MongoWriteConcernError) {
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
const writeResult = new BulkWriteResult(
bulkOperation.s.bulkResult,
bulkOperation.isOrdered
);

throw new MongoBulkWriteError(
{
message: thrownError.result.writeConcernError.errmsg,
code: thrownError.result.writeConcernError.code
},
writeResult
);
} else {
// Error is a driver related error not a bulk op error, return early
throw new MongoBulkWriteError(
thrownError,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}
}
} catch (err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
callback();

mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
}
}

function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
mergeBatchResults(batch, bulkResult, undefined, err.result);

callback(
new MongoBulkWriteError(
{
message: err.result.writeConcernError.errmsg,
code: err.result.writeConcernError.code
},
new BulkWriteResult(bulkResult, isOrdered)
)
);
bulkOperation.s.batches.length = 0;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
return writeResult;
}

/**
Expand Down Expand Up @@ -875,8 +844,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
let?: Document;
}

const executeCommandsAsync = promisify(executeCommands);

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
Expand All @@ -895,15 +862,15 @@ export class BulkWriteShimOperation extends AbstractOperation {
return 'bulkWrite' as const;
}

execute(_server: Server, session: ClientSession | undefined): Promise<any> {
async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommandsAsync(this.bulkOperation, this.options);
return await executeCommands(this.bulkOperation, this.options);
}
}

Expand Down Expand Up @@ -1239,33 +1206,26 @@ export abstract class BulkOperationBase {
* Handles the write error before executing commands
* @internal
*/
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.bulkResult.writeErrors.length > 0) {
const msg = this.s.bulkResult.writeErrors[0].errmsg
? this.s.bulkResult.writeErrors[0].errmsg
: 'write operation failed';

callback(
new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
)
throw new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
);

return true;
}

const writeConcernError = writeResult.getWriteConcernError();
if (writeConcernError) {
callback(new MongoBulkWriteError(writeConcernError, writeResult));
return true;
throw new MongoBulkWriteError(writeConcernError, writeResult);
}

return false;
}

abstract addToOperationsList(
Expand Down
7 changes: 3 additions & 4 deletions src/bulk/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { Collection } from '../collection';
import { MongoInvalidArgumentError } from '../error';
import type { DeleteStatement } from '../operations/delete';
import type { UpdateStatement } from '../operations/update';
import { type Callback } from '../utils';
import {
Batch,
BatchType,
Expand All @@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase {
super(collection, options, false);
}

override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean {
override handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.batches.length) {
return false;
return;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}

return super.handleWriteError(callback, writeResult);
return super.handleWriteError(writeResult);
}

addToOperationsList(
Expand Down
22 changes: 10 additions & 12 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
Collection,
CommandFailedEvent,
CommandSucceededEvent,
MongoBulkWriteError,
type MongoClient,
MongoError,
MongoServerError,
ObjectId,
ReturnDocument
Expand Down Expand Up @@ -1093,22 +1093,16 @@ describe('CRUD API', function () {
}
});

it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }
},

test: async function () {
describe('when performing a multi-batch unordered bulk write that has a duplicate key', function () {
it('throws a MongoBulkWriteError indicating the duplicate key document failed', async function () {
const ops = [];
// Create a set of operations that go over the 1000 limit causing two messages
let i = 0;
for (; i < 1005; i++) {
ops.push({ insertOne: { _id: i, a: i } });
}

ops.push({ insertOne: { _id: 0, a: i } });
ops[500] = { insertOne: { _id: 0, a: i } };

const db = client.db();

Expand All @@ -1117,8 +1111,12 @@ describe('CRUD API', function () {
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
.catch(error => error);

expect(error).to.be.instanceOf(MongoError);
}
expect(error).to.be.instanceOf(MongoBulkWriteError);
// 1004 because one of them is duplicate key
// but since it is unordered we continued to write
expect(error).to.have.property('insertedCount', 1004);
expect(error.writeErrors[0]).to.have.nested.property('err.index', 500);
});
});

it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', {
Expand Down