diff --git a/modules/eddb/commodities.js b/modules/eddb/commodities.js index f32d5d0..29a96c7 100644 --- a/modules/eddb/commodities.js +++ b/modules/eddb/commodities.js @@ -128,12 +128,25 @@ function Commodities() { }) }; + const bulkUpdateCallback = function(err, result){ + if (err) { + console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`); + result = err.result; + } + // Uncomment for database insertion progress +// if (result) { +// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`); +// } + } + this.downloadUpdate = function () { - let recordsUpdated = 0; + let recordsFound = 0; + let operations = []; let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/listings.csv', 'csv'); stream .on('start', response => { console.log(`EDDB commodity dump started with status code ${response.statusCode}`); + console.time('commodity'); this.emit('started', { response: response, insertion: "started", @@ -142,26 +155,41 @@ function Commodities() { }) .on('data', async json => { stream.pause(); - try { - await commoditiesModel.findOneAndUpdate( - { - id: json.id - }, - json, - { - upsert: true, - runValidators: true - }); - recordsUpdated++; - } catch (err) { + json.updated_at = utilities.modify.millisecondify(json.updated_at) + operations.push({ + updateOne: { + filter: { + id: json.id, +// updated_at: { $ne: json.updated_at } + }, + update: { $set: json }, + upsert: true + } + }); + recordsFound++; + if (operations.length % 1000 === 0 ) { + try { + await commoditiesModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + } catch (err) { this.emit('error', err); - } finally { - stream.resume(); + } + operations = []; } + stream.resume(); }) .on('end', () => { - console.log(`${recordsUpdated} records updated`); - this.emit('done', recordsUpdated); + commoditiesModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + console.timeEnd('commodity'); + console.log(`${recordsFound} records processed.`); + this.emit('done', recordsFound); }) .on('error', err => { this.emit('error', err); diff --git a/modules/eddb/factions.js b/modules/eddb/factions.js index a3f5b9e..fc5d01d 100644 --- a/modules/eddb/factions.js +++ b/modules/eddb/factions.js @@ -131,12 +131,24 @@ function Factions() { }) }; + const bulkUpdateCallback = function(err, result){ + if (err) { + console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`); + result = err.result; + } +// if (result) { +// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`); +// } + } + this.downloadUpdate = function () { - let recordsUpdated = 0; + let recordsFound = 0; + let operations = []; let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/factions.json', 'json'); stream .on('start', response => { console.log(`EDDB faction dump started with status code ${response.statusCode}`); + console.time('faction') this.emit('started', { response: response, insertion: "started", @@ -145,32 +157,46 @@ function Factions() { }) .on('data', async json => { stream.pause(); - try { - await factionsModel.findOneAndUpdate( - { - id: json.id, - updated_at: { $ne: json.updated_at } - }, - json, - { - upsert: true, - runValidators: true - }); - recordsUpdated++; - } catch (err) { + json.updated_at = utilities.modify.millisecondify(json.updated_at) + json.name_lower = utilities.modify.lowerify(json.name) + operations.push({ + updateOne: { + filter: { + id: json.id, +// updated_at: { $ne: json.updated_at } + }, + update: { $set: json }, + upsert: true + } + }); + recordsFound++; + if (operations.length % 1000 === 0 ) { + try { + await factionsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + } catch (err) { this.emit('error', err); - } finally { - stream.resume(); + } + operations = []; } + stream.resume(); }) .on('end', () => { - console.log(`${recordsUpdated} records updated`); - this.emit('done', recordsUpdated); + factionsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + console.timeEnd('faction'); + console.log(`${recordsFound} records processed.`); + this.emit('done', recordsFound); }) .on('error', err => { this.emit('error', err); }) } } - inherits(Factions, eventEmmiter); diff --git a/modules/eddb/populated_systems.js b/modules/eddb/populated_systems.js index f8afe42..402c84e 100644 --- a/modules/eddb/populated_systems.js +++ b/modules/eddb/populated_systems.js @@ -133,12 +133,24 @@ function PopulatedSystems() { }) }; + const bulkUpdateCallback = function(err, result){ + if (err) { + console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`); + result = err.result; + } +// if (result) { +// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`); +// } + } + this.downloadUpdate = function () { - let recordsUpdated = 0; + let recordsFound = 0; + let operations = []; let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/systems_populated.json', 'json'); stream .on('start', response => { console.log(`EDDB populated system dump started with status code ${response.statusCode}`); + console.time('populatedSystems') this.emit('started', { response: response, insertion: "started", @@ -148,27 +160,42 @@ function PopulatedSystems() { .on('data', async json => { stream.pause(); json = modify(json); - try { - await populatedSystemsModel.findOneAndUpdate( - { - id: json.id, - updated_at: { $ne: json.updated_at } - }, - json, - { - upsert: true, - runValidators: true - }); - recordsUpdated++; - } catch (err) { + json.updated_at = utilities.modify.millisecondify(json.updated_at); + json.name_lower = utilities.modify.lowerify(json.name); + operations.push({ + updateOne: { + filter: { + id: json.id, +// updated_at: { $ne: json.updated_at } + }, + update: { $set: json }, + upsert: true + } + }); + recordsFound++; + if (operations.length % 1000 === 0 ) { + try { + await populatedSystemsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + } catch (err) { this.emit('error', err); - } finally { - stream.resume(); + } + operations = []; } + stream.resume(); }) .on('end', () => { - console.log(`${recordsUpdated} records updated`); - this.emit('done', recordsUpdated); + populatedSystemsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + console.timeEnd('populatedSystems'); + console.log(`${recordsFound} records processed.`); + this.emit('done', recordsFound); }) .on('error', err => { this.emit('error', err); @@ -182,6 +209,9 @@ function PopulatedSystems() { minor_faction_presences[index].active_states = statify(minor_faction_presence.active_states); minor_faction_presences[index].pending_states = statify(minor_faction_presence.pending_states); minor_faction_presences[index].recovering_states = statify(minor_faction_presence.recovering_states); + if (minor_faction_presence.name) { + minor_faction_presences[index].name_lower = utilities.modify.lowerify(minor_faction_presence.name); + } }); return json; } diff --git a/modules/eddb/stations.js b/modules/eddb/stations.js index f117b31..5cd158a 100644 --- a/modules/eddb/stations.js +++ b/modules/eddb/stations.js @@ -133,12 +133,24 @@ function Stations() { }) } + const bulkUpdateCallback = function(err, result){ + if (err) { + console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`); + result = err.result; + } +// if (result) { +// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`); +// } + } + this.downloadUpdate = function () { - let recordsUpdated = 0; + let recordsFound = 0; + let operations = []; let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/stations.json', 'json'); stream .on('start', response => { console.log(`EDDB station dump started with status code ${response.statusCode}`); + console.time('stations') this.emit('started', { response: response, insertion: "started", @@ -148,27 +160,50 @@ function Stations() { .on('data', async json => { stream.pause(); json = modify(json); - try { - await stationsModel.findOneAndUpdate( - { - id: json.id, - updated_at: { $ne: json.updated_at } - }, - json, - { - upsert: true, - runValidators: true - }); - recordsUpdated++; - } catch (err) { + json.updated_at = utilities.modify.millisecondify(json.updated_at); + if (json.shipyard_updated_at) { + json.shipyard_updated_at = utilities.modify.millisecondify(json.shipyard_updated_at); + } + if (json.outfitting_updated_at) { + json.outfitting_updated_at = utilities.modify.millisecondify(json.outfitting_updated_at); + } + if (json.market_updated_at) { + json.market_updated_at = utilities.modify.millisecondify(json.market_updated_at); + } + operations.push({ + updateOne: { + filter: { + id: json.id, +// updated_at: { $ne: json.updated_at } + }, + update: { $set: json }, + upsert: true + } + }); + recordsFound++; + if (operations.length % 1000 === 0 ) { + try { + await stationsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + } catch (err) { this.emit('error', err); - } finally { - stream.resume(); + } + operations = []; } + stream.resume(); }) .on('end', () => { - console.log(`${recordsUpdated} records updated`); - this.emit('done', recordsUpdated); + stationsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + console.timeEnd('stations'); + console.log(`${recordsFound} records processed.`); + this.emit('done', recordsFound); }) .on('error', err => { this.emit('error', err); diff --git a/modules/eddb/systems.js b/modules/eddb/systems.js index 19c229f..c31f0ad 100644 --- a/modules/eddb/systems.js +++ b/modules/eddb/systems.js @@ -131,12 +131,24 @@ function Systems() { }) } + const bulkUpdateCallback = function(err, result){ + if (err) { + console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`); + result = err.result; + } +// if (result) { +// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`); +// } + } + this.downloadUpdate = function () { - let recordsUpdated = 0; + let recordsFound = 0; + let operations = []; let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/systems.csv', 'csv'); stream .on('start', response => { console.log(`EDDB system dump started with status code ${response.statusCode}`); + console.time('systems') this.emit('started', { response: response, insertion: "started", @@ -145,27 +157,42 @@ function Systems() { }) .on('data', async json => { stream.pause(); - try { - await systemsModel.findOneAndUpdate( - { - id: json.id, - updated_at: { $ne: json.updated_at } - }, - json, - { - upsert: true, - runValidators: true - }); - recordsUpdated++; - } catch (err) { + json.updated_at = utilities.modify.millisecondify(json.updated_at) + json.name_lower = utilities.modify.millisecondify(json.name) + operations.push({ + updateOne: { + filter: { + id: json.id, +// updated_at: { $ne: json.updated_at } + }, + update: { $set: json }, + upsert: true + } + }); + recordsFound++; + if (operations.length % 10000 === 0 ) { + try { + await systemsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + } catch (err) { this.emit('error', err); - } finally { - stream.resume(); + } + operations = []; } + stream.resume(); }) .on('end', () => { - console.log(`${recordsUpdated} records updated`); - this.emit('done', recordsUpdated); + systemsModel.bulkWrite( + operations, + { ordered: false }, + bulkUpdateCallback + ); + console.timeEnd('faction'); + console.log(`${recordsFound} records processed.`); + this.emit('done', recordsFound); }) .on('error', err => { this.emit('error', err); diff --git a/modules/utilities/modify.js b/modules/utilities/modify.js index 046629d..036651a 100644 --- a/modules/utilities/modify.js +++ b/modules/utilities/modify.js @@ -23,4 +23,14 @@ function statify(ref) { return ref; } -module.exports = { objectify, statify } +function millisecondify(ts) { + ts *= 1000; + return ts; +} + +function lowerify(name) { + name = name.toLowerCase(); + return name; +} + +module.exports = { objectify, statify, millisecondify, lowerify }