Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: improve EDDB download and update times #23

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
62 changes: 45 additions & 17 deletions modules/eddb/commodities.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,25 @@ function Commodities() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// Uncomment for database insertion progress
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/listings.csv', 'csv');
stream
.on('start', response => {
console.log(`EDDB commodity dump started with status code ${response.statusCode}`);
console.time('commodity');
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -142,26 +155,41 @@ function Commodities() {
})
.on('data', async json => {
stream.pause();
try {
await commoditiesModel.findOneAndUpdate(
{
id: json.id
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at)
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is commented out because otherwise, Mongo returns a duplicate key error for every record NOT updated. It still updates any changed field, eg 0 inserted, 1000 matched, 1 modified, 0 upserted

},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await commoditiesModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
commoditiesModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('commodity');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand Down
64 changes: 45 additions & 19 deletions modules/eddb/factions.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,24 @@ function Factions() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/factions.json', 'json');
stream
.on('start', response => {
console.log(`EDDB faction dump started with status code ${response.statusCode}`);
console.time('faction')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -145,32 +157,46 @@ function Factions() {
})
.on('data', async json => {
stream.pause();
try {
await factionsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at)
json.name_lower = utilities.modify.lowerify(json.name)
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await factionsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
factionsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('faction');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
})
}
}

inherits(Factions, eventEmmiter);
66 changes: 48 additions & 18 deletions modules/eddb/populated_systems.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,24 @@ function PopulatedSystems() {
})
};

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/systems_populated.json', 'json');
stream
.on('start', response => {
console.log(`EDDB populated system dump started with status code ${response.statusCode}`);
console.time('populatedSystems')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -148,27 +160,42 @@ function PopulatedSystems() {
.on('data', async json => {
stream.pause();
json = modify(json);
try {
await populatedSystemsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at);
json.name_lower = utilities.modify.lowerify(json.name);
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await populatedSystemsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
populatedSystemsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('populatedSystems');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand All @@ -182,6 +209,9 @@ function PopulatedSystems() {
minor_faction_presences[index].active_states = statify(minor_faction_presence.active_states);
minor_faction_presences[index].pending_states = statify(minor_faction_presence.pending_states);
minor_faction_presences[index].recovering_states = statify(minor_faction_presence.recovering_states);
if (minor_faction_presence.name) {
minor_faction_presences[index].name_lower = utilities.modify.lowerify(minor_faction_presence.name);
}
});
return json;
}
Expand Down
71 changes: 53 additions & 18 deletions modules/eddb/stations.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,24 @@ function Stations() {
})
}

const bulkUpdateCallback = function(err, result){
if (err) {
console.log(`Errors: ${err.result.getWriteErrorCount()}, example: ${err.message}`);
result = err.result;
}
// if (result) {
// console.log(`${result.insertedCount} inserted, ${result.matchedCount} matched, ${result.modifiedCount} modified, ${result.upsertedCount} upserted`);
// }
}

this.downloadUpdate = function () {
let recordsUpdated = 0;
let recordsFound = 0;
let operations = [];
let stream = utilities.downloadUpdate('https://eddb.io/archive/v6/stations.json', 'json');
stream
.on('start', response => {
console.log(`EDDB station dump started with status code ${response.statusCode}`);
console.time('stations')
this.emit('started', {
response: response,
insertion: "started",
Expand All @@ -148,27 +160,50 @@ function Stations() {
.on('data', async json => {
stream.pause();
json = modify(json);
try {
await stationsModel.findOneAndUpdate(
{
id: json.id,
updated_at: { $ne: json.updated_at }
},
json,
{
upsert: true,
runValidators: true
});
recordsUpdated++;
} catch (err) {
json.updated_at = utilities.modify.millisecondify(json.updated_at);
if (json.shipyard_updated_at) {
json.shipyard_updated_at = utilities.modify.millisecondify(json.shipyard_updated_at);
}
if (json.outfitting_updated_at) {
json.outfitting_updated_at = utilities.modify.millisecondify(json.outfitting_updated_at);
}
if (json.market_updated_at) {
json.market_updated_at = utilities.modify.millisecondify(json.market_updated_at);
}
operations.push({
updateOne: {
filter: {
id: json.id,
// updated_at: { $ne: json.updated_at }
},
update: { $set: json },
upsert: true
}
});
recordsFound++;
if (operations.length % 1000 === 0 ) {
try {
await stationsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
} catch (err) {
this.emit('error', err);
} finally {
stream.resume();
}
operations = [];
}
stream.resume();
})
.on('end', () => {
console.log(`${recordsUpdated} records updated`);
this.emit('done', recordsUpdated);
stationsModel.bulkWrite(
operations,
{ ordered: false },
bulkUpdateCallback
);
console.timeEnd('stations');
console.log(`${recordsFound} records processed.`);
this.emit('done', recordsFound);
})
.on('error', err => {
this.emit('error', err);
Expand Down
Loading