Skip to content

Commit

Permalink
feat: add transaction support
Browse files Browse the repository at this point in the history
Signed-off by: Sergey Nosenko <[email protected]>
Signed-off-by: Rifa Achrinza <[email protected]>
  • Loading branch information
darknos authored and achrinza committed Aug 12, 2021
1 parent baf0759 commit 91c865b
Show file tree
Hide file tree
Showing 2 changed files with 363 additions and 30 deletions.
159 changes: 129 additions & 30 deletions lib/mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ exports.initialize = function initializeDataSource(dataSource, callback) {

s.safe = s.safe !== false;
s.w = s.w || 1;
s.writeConcern = s.writeConcern || {
w: s.w,
wtimeout: s.wtimeout || null,
j: s.j || null,
journal: s.journal || null,
fsync: s.fsync || null,
};
s.url = s.url || generateMongoDBURL(s);
s.useNewUrlParser = s.useNewUrlParser !== false;
s.useUnifiedTopology = s.useUnifiedTopology !== false;
Expand Down Expand Up @@ -251,9 +258,6 @@ MongoDB.prototype.connect = function(callback) {
'acceptableLatencyMS',
'connectWithNoPrimary',
'authSource',
'w',
'wtimeout',
'j',
'forceServerObjectId',
'serializeFunctions',
'ignoreUndefined',
Expand All @@ -278,13 +282,13 @@ MongoDB.prototype.connect = function(callback) {
'password',
'authMechanism',
'compression',
'fsync',
'readPreferenceTags',
'numberOfRetries',
'auto_reconnect',
'minSize',
'useNewUrlParser',
'useUnifiedTopology',
'writeConcern',
// Ignored options
'native_parser',
// Legacy options
Expand All @@ -293,6 +297,11 @@ MongoDB.prototype.connect = function(callback) {
'replSet',
'mongos',
'db',
'w',
'wtimeout',
'j',
'journal',
'fsync',
];

const lbOptions = Object.keys(self.settings);
Expand Down Expand Up @@ -683,7 +692,7 @@ MongoDB.prototype.exists = function(modelName, id, options, callback) {
debug('exists', modelName, id);
}
id = self.coerceId(modelName, id, options);
this.execute(modelName, 'findOne', {_id: id}, function(err, data) {
this.execute(modelName, 'findOne', {_id: id}, buildOptions({}, options), function(err, data) {
if (self.debug) {
debug('exists.callback', modelName, id, err, data);
}
Expand All @@ -704,7 +713,7 @@ MongoDB.prototype.find = function find(modelName, id, options, callback) {
}
const idName = self.idName(modelName);
const oid = self.coerceId(modelName, id, options);
this.execute(modelName, 'findOne', {_id: oid}, function(err, data) {
this.execute(modelName, 'findOne', {_id: oid}, buildOptions({}, options), function(err, data) {
if (self.debug) {
debug('find.callback', modelName, id, err, data);
}
Expand Down Expand Up @@ -893,7 +902,7 @@ MongoDB.prototype.destroy = function destroy(modelName, id, options, callback) {
debug('delete', modelName, id);
}
id = self.coerceId(modelName, id, options);
this.execute(modelName, 'deleteOne', {_id: id}, function(err, result) {
this.execute(modelName, 'deleteOne', {_id: id}, buildOptions({}, options), function(err, result) {
if (self.debug) {
debug('delete.callback', modelName, id, err, result);
}
Expand Down Expand Up @@ -1034,6 +1043,16 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {

query[k] = {$regex: cond};
} else {
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
if (Array.isArray(cond)) {
cond = cond.map(function(c) {
return ObjectID(c);
});
} else {
cond = ObjectID(cond);
}
}

query[k] = {};
query[k]['$' + spec] = cond;
}
Expand All @@ -1044,8 +1063,15 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {
query[k] = {$type: 10};
} else {
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
cond = ObjectID(cond);
if (Array.isArray(cond)) {
cond = cond.map(function(c) {
return ObjectID(c);
});
} else {
cond = ObjectID(cond);
}
}

query[k] = cond;
}
}
Expand Down Expand Up @@ -1312,8 +1338,17 @@ MongoDB.prototype.convertColumnNames = function(model, data, direction) {
}

if (direction === 'database') {
data[columnName] = data[propName];
delete data[propName];
// Handle data is Array object - in case of fields filter
if (Array.isArray(data)) {
const idx = data.indexOf(propName);
if (idx !== -1) {
data.push(columnName);
delete data[idx];
}
} else { // Handle data as Object - in case to create / update
data[columnName] = data[propName];
delete data[propName];
}
}

if (direction === 'property') {
Expand Down Expand Up @@ -1351,17 +1386,23 @@ MongoDB.prototype.all = function all(modelName, filter, options, callback) {
if (filter.where) {
query = self.buildWhere(modelName, filter.where, options);
}
let fields = filter.fields;
// Use Object.assign to avoid change filter.fields
// which will cause error when create model from data
let fields = undefined;
if (typeof filter.fields !== 'undefined') {
fields = [];
Object.assign(fields, filter.fields);
}

// Convert custom column names
fields = self.fromPropertyToDatabaseNames(modelName, fields);

options = buildOptions({}, options);

if (fields) {
const findOpts = {projection: fieldsArrayToObj(fields)};
this.execute(modelName, 'find', query, findOpts, processResponse);
} else {
this.execute(modelName, 'find', query, processResponse);
options.projection = fieldsArrayToObj(fields);

This comment has been minimized.

Copy link
@mCassanoNsi

mCassanoNsi Oct 19, 2021

#648 options.projection gets used on line 1442 causing included models to use the same projection, resulting in empty included results if they dont have a fields filter defined in scope

}
this.execute(modelName, 'find', query, options, processResponse);

function processResponse(err, cursor) {
if (err) {
Expand Down Expand Up @@ -1461,7 +1502,7 @@ MongoDB.prototype.destroyAll = function destroyAll(
where = self.buildWhere(modelName, where, options);
if (debug.enabled) debug('destroyAll where %s', util.inspect(where));

this.execute(modelName, 'deleteMany', where || {}, function(err, info) {
this.execute(modelName, 'deleteMany', where || {}, buildOptions({}, options), function(err, info) {
if (err) return callback && callback(err);

if (self.debug) debug('destroyAll.callback', modelName, where, err, info);
Expand All @@ -1488,15 +1529,26 @@ MongoDB.prototype.count = function count(modelName, where, options, callback) {
debug('count', modelName, where);
}
where = self.buildWhere(modelName, where, options) || {};
const method = Object.keys(where).length === 0 ? 'estimatedDocumentCount' : 'countDocuments';
this.execute(modelName, method, where, function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
options = buildOptions({}, options);
if (Object.keys(where).length === 0 && !options.session) {
this.execute(modelName, 'estimatedDocumentCount', function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
} else {
this.execute(modelName, 'countDocuments', where, options, function(err, count) {
if (self.debug) {
debug('count.callback', modelName, err, count);
}
if (callback) {
callback(err, count);
}
});
}
};

/**
Expand Down Expand Up @@ -1538,7 +1590,7 @@ MongoDB.prototype.replaceWithOptions = function(modelName, id, data, options, cb
const idName = self.idName(modelName);
delete data[idName];
data = self.toDatabase(modelName, data);
this.execute(modelName, 'replaceOne', {_id: id}, data, options, function(
this.execute(modelName, 'replaceOne', {_id: id}, data, buildOptions({}, options), function(
err,
info,
) {
Expand Down Expand Up @@ -1735,11 +1787,11 @@ MongoDB.prototype.upsertWithWhere = function upsertWithWhere(
'findOneAndUpdate',
where,
updateData,
{
buildOptions({
upsert: true,
returnOriginal: false,
sort: [['_id', 'asc']],
},
}, options),
function(err, result) {
if (err) return cb && cb(err);

Expand Down Expand Up @@ -2015,6 +2067,48 @@ MongoDB.prototype.ping = function(cb) {
}
};

MongoDB.prototype.beginTransaction = function(isolationLevel, cb) {
// TODO: think about how to convert READ_COMMITED, etc. to transactionOptions
const transactionOptions = {
readPreference: 'primary',
readConcern: {level: 'local'},
writeConcern: {w: 'majority'},
};
if (isolationLevel instanceof Object) {
Object.assign(transactionOptions, isolationLevel || {});
}
const session = this.client.startSession();
session.startTransaction(transactionOptions);
cb(null, session);
};

MongoDB.prototype.commit = function(tx, cb) {
tx.commitTransaction(function(err) {
tx.endSession(null, function(error) {
if (err) return cb(err);
if (error) return cb(error);
cb();
});
});
};

MongoDB.prototype.rollback = function(tx, cb) {
tx.abortTransaction(function(err) {
tx.endSession(null, function(error) {
if (err) return cb(err);
if (error) return cb(error);
cb();
});
});
};

function isInTransation(options) {
const ops = {};
if (options && options.transaction && options.transaction.isInTransation)
ops.session = options.transaction.session;
return ops;
}

// Case insensitive check if a string looks like "ObjectID"
function typeIsObjectId(input) {
if (!input) return false;
Expand Down Expand Up @@ -2072,7 +2166,8 @@ function coerceToObjectId(modelCtor, propDef, propValue) {
function isObjectIDProperty(modelCtor, propDef, value, options) {
if (!propDef) return false;

if (typeof value === 'string' && value.match(ObjectIdValueRegex)) {
if ((typeof value === 'string' && value.match(ObjectIdValueRegex)) ||
(Array.isArray(value) && value.every((v) => v.match(ObjectIdValueRegex)))) {
if (isStoredAsObjectID(propDef)) return true;
else return !isStrictObjectIDCoercionEnabled(modelCtor, options);
} else if (value instanceof mongodb.ObjectID) {
Expand Down Expand Up @@ -2306,5 +2401,9 @@ function hasDataType(dataType, propertyDef) {
* @param {*} connectorOptions User specified Options
*/
function buildOptions(requiredOptions, connectorOptions) {
return Object.assign({}, connectorOptions, requiredOptions);
if (connectorOptions && connectorOptions.transaction && connectorOptions.transaction.isActive()) {
return Object.assign({session: connectorOptions.transaction.connection}, connectorOptions, requiredOptions);
} else {
return Object.assign({}, connectorOptions, requiredOptions);
}
}
Loading

0 comments on commit 91c865b

Please sign in to comment.