Skip to content

Commit

Permalink
feat: Batching for bulk insertions and SIS API querying
Browse files Browse the repository at this point in the history
  • Loading branch information
mathhulk committed Oct 30, 2024
1 parent 4e7a43b commit 58a11f8
Showing 1 changed file with 91 additions and 120 deletions.
211 changes: 91 additions & 120 deletions apps/backend/src/scripts/update-catalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ const queryPage = async <V>(
key: string,
url: string,
field: string,
page: number,
params?: Record<string, string>
) => {
let retries = 3;

console.log("Querying SIS API page...");
console.log(`URL: ${url}`);
if (params) console.log(`Params: ${params.toString()}`);
// console.log("Querying SIS API page...");
// console.log(`URL: ${url}`);
// if (params) console.log(`Params: ${params.toString()}`);

while (retries > 0) {
try {
const _params = new URLSearchParams({
"page-number": "1",
"page-size": "100",
...params,
"page-number": page.toString(),
"page-size": "100",
});

const response = await fetch(`${url}?${_params}`, {
Expand All @@ -62,17 +63,17 @@ const queryPage = async <V>(
? data.apiResponse.response[field]
: data.response[field];
} catch (error) {
console.log(`Unexpected error querying SIS API. Error: "${error}"`);
// console.log(`Unexpected error querying SIS API. Error: "${error}"`);

if (retries === 0) {
console.log(`Too many errors querying SIS API. Terminating update...`);
// console.log(`Too many errors querying SIS API. Terminating update...`);

break;
}

retries--;

console.log(`Retrying...`);
// console.log(`Retrying...`);

continue;
}
Expand All @@ -88,74 +89,31 @@ const queryPages = async <V>(
field: string,
params?: Record<string, string>
) => {
let page = 1;
let retries = 3;

const values: V[] = [];

console.log("Querying SIS API pages...");
console.log(`URL: ${url}`);
if (params) console.log(`Params: ${params}`);

while (retries > 0) {
try {
const _params = new URLSearchParams({
"page-number": page.toString(),
"page-size": "100",
...params,
});

const response = await fetch(`${url}?${_params}`, {
headers: {
app_id: id,
app_key: key,
},
});

if (response.status !== 200) throw new Error(response.statusText);

const data = (await response.json()) as
| DeprecatedSISResponse<V>
| SISResponse<V>;

const _values = data.apiResponse
? data.apiResponse.response[field]
: data.response[field];

values.push(..._values);
// Query courses in batches of 50
const queryBatchSize = 50;
let page = 1;

if (_values.length < 100) {
console.log(
`No more data found on page ${page}. Terminating update...`
);
while (values.length % 100 === 0) {
console.log(`Querying ${queryBatchSize} pages from page ${page}...`);

break;
}
} catch (error) {
console.log(`Unexpected error querying SIS API. Error: "${error}"`);
const promises = [];

if (retries === 0) {
console.log(`Too many errors querying SIS API. Terminating update...`);
for (let i = 0; i < queryBatchSize; i++) {
promises.push(queryPage<V>(id, key, url, field, page + i, params));
}

break;
}
const results = await Promise.all(promises);
const flattenedResults = results.flat();

retries--;
if (flattenedResults.length === 0) break;

console.log(`Retrying...`);
values.push(...flattenedResults);

continue;
}

page++;
page += queryBatchSize;
}

console.log(
`Finished querying SIS API. Received ${values.length} objects in ${
page
} pages.`
);

return values;
};

Expand All @@ -167,25 +125,25 @@ const updateCourses = async () => {
config.sis.COURSE_APP_KEY,
SIS_COURSE_URL,
"courses"
// {
// "status-code": "ACTIVE",
// }
);

const operations = courses.map((course) => ({
replaceOne: {
filter: { classDisplayName: course.classDisplayName },
replacement: course,
upsert: true,
},
}));
console.log(`Received ${courses.length} courses from SIS API.`);

const { upsertedCount, modifiedCount } =
await CourseModel.bulkWrite(operations);
// Remove all courses
await CourseModel.deleteMany({});

console.log(
`Completed updating database with new course data. Created ${upsertedCount} and updated ${modifiedCount} course objects.`
);
// Insert courses in batches of 5000
const insertBatchSize = 5000;

for (let i = 0; i < courses.length; i += insertBatchSize) {
const batch = courses.slice(i, i + insertBatchSize);

console.log(`Inserting batch ${i / insertBatchSize + 1}...`);

await CourseModel.insertMany(batch, { ordered: false });
}

console.log(`Completed updating database with new course data.`);
};

const updateClasses = async (currentTerms: TermType[]) => {
Expand All @@ -207,22 +165,27 @@ const updateClasses = async (currentTerms: TermType[]) => {
classes.push(...termClasses);
}

console.log("Updating database with new class data...");
console.log(`Received ${classes.length} classes from SIS API.`);

const operations = classes.map((_class) => ({
replaceOne: {
filter: { displayName: _class.displayName },
replacement: _class,
upsert: true,
// Remove all classes
await ClassModel.deleteMany({
"session.term.name": {
$in: currentTerms.map((term) => term.name),
},
}));
});

const { upsertedCount, modifiedCount } =
await ClassModel.bulkWrite(operations);
// Split classes into batches of 5000
const batchSize = 5000;

console.log(
`Completed updating database with new class data. Created ${upsertedCount} and updated ${modifiedCount} class objects.`
);
for (let i = 0; i < classes.length; i += batchSize) {
const batch = classes.slice(i, i + batchSize);

console.log(`Inserting batch ${i / batchSize + 1}...`);

await ClassModel.insertMany(batch, { ordered: false });
}

console.log(`Completed updating database with new class data.`);
};

const updateSections = async (currentTerms: TermType[]) => {
Expand All @@ -231,33 +194,38 @@ const updateSections = async (currentTerms: TermType[]) => {
for (const term of currentTerms) {
console.log(`Updating sections for ${term.name}...`);

const termClasses = await queryPages<SectionType>(
const termSections = await queryPages<SectionType>(
config.sis.CLASS_APP_ID,
config.sis.CLASS_APP_KEY,
SIS_SECTION_URL,
"classSections",
{ "term-id": term.id }
);

sections.push(...termClasses);
sections.push(...termSections);
}

console.log("Updating database with new section data...");
console.log(`Received ${sections.length} sections from SIS API.`);

const operations = sections.map((section) => ({
replaceOne: {
filter: { displayName: section.displayName },
replacement: section,
upsert: true,
// Remove all sections
await SectionModel.deleteMany({
"class.session.term.name": {
$in: currentTerms.map((term) => term.name),
},
}));
});

const { upsertedCount, modifiedCount } =
await SectionModel.bulkWrite(operations);
// Split sections into batches of 5000
const batchSize = 5000;

console.log(
`Completed updating database with new section data. Created ${upsertedCount} and updated ${modifiedCount} section objects.`
);
for (let i = 0; i < sections.length; i += batchSize) {
const batch = sections.slice(i, i + batchSize);

console.log(`Inserting batch ${i / batchSize + 1}...`);

await SectionModel.insertMany(batch, { ordered: false });
}

console.log(`Completed updating database with new section data.`);
};

const updateTerms = async () => {
Expand All @@ -269,6 +237,7 @@ const updateTerms = async () => {
config.sis.TERM_APP_KEY,
SIS_TERM_URL,
"terms",
1,
{
"temporal-position": "Next",
}
Expand All @@ -285,6 +254,7 @@ const updateTerms = async () => {
config.sis.TERM_APP_KEY,
SIS_TERM_URL,
"terms",
1,
{
"temporal-position": "Previous",
"as-of-date": currentTerm.beginDate as unknown as string,
Expand All @@ -294,22 +264,23 @@ const updateTerms = async () => {
if (currentTerm) terms.push(currentTerm);
}

console.log("Updating database with new term data...");
console.log(`Received ${terms.length} terms from SIS API.`);

const operations = terms.map((term) => ({
replaceOne: {
filter: { name: term.name },
replacement: term,
upsert: true,
},
}));
// Remove all terms
await TermModel.deleteMany({});

const { upsertedCount, modifiedCount } =
await TermModel.bulkWrite(operations);
// Split terms into batches of 5000
const batchSize = 5000;

console.log(
`Completed updating database with new term data. Created ${upsertedCount} and updated ${modifiedCount} term objects.`
);
for (let i = 0; i < terms.length; i += batchSize) {
const batch = terms.slice(i, i + batchSize);

console.log(`Inserting batch ${i / batchSize + 1}...`);

await TermModel.insertMany(batch, { ordered: false });
}

console.log(`Completed updating database with new term data.`);
};

const initialize = async () => {
Expand All @@ -323,7 +294,7 @@ const initialize = async () => {
await updateCourses();

const currentTerms = await TermModel.find({
temporalPosition: "Current",
temporalPosition: { $or: ["Current", "Next"] },
}).lean();

console.log("\n=== UPDATE CLASSES ===");
Expand Down

0 comments on commit 58a11f8

Please sign in to comment.