Skip to content

Commit

Permalink
10554 One-time script for cleaning up attachments on corrupted messages
Browse files Browse the repository at this point in the history
  • Loading branch information
En-8 committed Dec 12, 2024
1 parent 226e59e commit ac9d2f3
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 6 deletions.
2 changes: 1 addition & 1 deletion scripts/env/environments/00-common
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ REGION="${DEFAULT_REGION}"
CURRENT_COLOR=$(./scripts/dynamo/get-current-color.sh "${ENV}")
SOURCE_TABLE=$(./scripts/dynamo/get-source-table.sh "${ENV}")
SOURCE_TABLE_VERSION="${SOURCE_TABLE//efcms-${ENV}-/}"
DB_HOST=$(./scripts/postgres/get-host.sh -h)
DB_HOST=$(./scripts/postgres/get-host.sh -h -w) # -w for writeable

# region hard-coded; all ES domains and Cognito user pools are in us-east-1
ELASTICSEARCH_ENDPOINT=$(aws es describe-elasticsearch-domain \
Expand Down
2 changes: 1 addition & 1 deletion scripts/postgres/restoreDbFromSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async function main() {
}
void main();

async function describeRDSInstance({
export async function describeRDSInstance({
environment,
rdsClient,
useWriter = false,
Expand Down
219 changes: 219 additions & 0 deletions scripts/run-once-scripts/cleanup-corrupt-messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
#!/usr/bin/env npx ts-node --transpile-only

import { type ScriptConfig, parseArguments } from '../reports/reportUtils';
import {
type ServerApplicationContext,
createApplicationContext,
} from '../../web-api/src/applicationContext';
import { requireEnvVars } from '../../shared/admin-tools/util';
import { connect } from '../../web-api/src/database';
import PQueue from 'p-queue';
import fs from 'fs';
import { queryFull } from '../../web-api/src/persistence/dynamodbClientService';
import { Signer } from '@aws-sdk/rds-signer';
import path from 'path';
import { Kysely } from 'kysely';
import { Database } from '../../web-api/src/database-types';

requireEnvVars(['REGION', 'DB_NAME', 'DB_HOST', 'DB_USER']);
const { DB_NAME, DB_HOST, DB_USER } = process.env;
const DB_PORT = 5432;

const scriptConfig: ScriptConfig = {
parameters: {
liveRun: {
required: false,
type: 'boolean',
default: false,
long: 'live-run',
description:
'If true, will proceed with removing the attachments from the impacted messages.',
},
},
};

type MessageFragment = {
attachments: any[] | undefined;
docketNumber: string;
messageId: string;
};

const getDocketEntryIdsByDocketNumbers = async ({
applicationContext,
docketNumbers,
}: {
applicationContext: ServerApplicationContext;
docketNumbers: string[];
}): Promise<Record<string, string[]>> => {
console.log(`Fetching docket entries for each docket number...`);

const priorityQueue = new PQueue({ concurrency: 50 });

const docketEntryIdsByDocketNumber: Record<string, string[]> = {};

const getDocketEntriesFunctions = docketNumbers.map(
docketNumber => async () => {
const docketEntries = (await queryFull({
ExpressionAttributeNames: {
'#pk': 'pk',
'#sk': 'sk',
},
ExpressionAttributeValues: {
':pk': `case|${docketNumber}`,
':prefix': 'docket-entry|',
},
KeyConditionExpression: '#pk = :pk AND begins_with(#sk, :prefix)',
applicationContext,
})) as RawDocketEntry[];

docketEntryIdsByDocketNumber[docketNumber] = docketEntries.map(
docketEntry => docketEntry.docketEntryId,
);
},
);

await priorityQueue.addAll(getDocketEntriesFunctions);
return docketEntryIdsByDocketNumber;
};

const removePoisonAttachmentsFromMessages = async ({
messageFragments,
docketEntryIdsByDocketNumber,
}: {
messageFragments: MessageFragment[];
docketEntryIdsByDocketNumber: Record<string, string[]>;
}): Promise<{
deletedAttachmentAuditRecords: {
messageId: string;
docketEntryId: string;
}[];
updatedMessageFragments: MessageFragment[];
}> => {
const updatedMessageFragments: MessageFragment[] = [];
const deletedAttachmentAuditRecords: {
messageId: string;
docketEntryId: string;
}[] = [];
for (const message of messageFragments) {
if (message.attachments) {
for (const attachment of message.attachments) {
if (
!docketEntryIdsByDocketNumber[message.docketNumber]?.includes(
attachment.documentId,
)
) {
deletedAttachmentAuditRecords.push({
messageId: message.messageId,
docketEntryId: attachment.documentId,
});
console.log(
`Removing attachment ${attachment.documentId} from message ${message.messageId}`,
);
message.attachments = message.attachments?.filter(
att => att.documentId !== attachment.documentId,
);
updatedMessageFragments.push(message);
}
}
}
}

return {
deletedAttachmentAuditRecords,
updatedMessageFragments,
};
};

const udpateMessagesInDb = async (
db: Kysely<Database>,
updatedMessageFragments: MessageFragment[],
) => {
await db.transaction().execute(async trx => {
for (const message of updatedMessageFragments) {
await trx
.updateTable('dwMessage')
.set({ attachments: JSON.stringify(message.attachments) })
.where('messageId', '=', message.messageId)
.execute();
}
});
};

// eslint-disable-next-line @typescript-eslint/no-floating-promises
(async () => {
const applicationContext: ServerApplicationContext = createApplicationContext(
{},
);

const { liveRun } = parseArguments(scriptConfig);

const sourceSigner = new Signer({
hostname: DB_HOST!,
port: DB_PORT,
region: 'us-east-1',
username: DB_USER!,
});
const sourcePassword = await sourceSigner.getAuthToken();

const config = {
database: DB_NAME,
host: DB_HOST,
idleTimeoutMillis: 1000,
max: 1,
password: sourcePassword,
port: DB_PORT,
user: DB_USER,
ssl: {
ca: fs.readFileSync('global-bundle.pem').toString(),
},
};

const db = await connect(config);

console.log('Fetching messages that have not been replied to...');
const messageFragments = await db
.selectFrom('dwMessage')
.select(['attachments', 'messageId', 'docketNumber'])
.where('isRepliedTo', '=', false)
.execute();

// collect all unique docket numbers from messages
console.log('Collecting unique docket numbers from messages...');
const docketNumbers = Array.from(
new Set(messageFragments.map(message => message.docketNumber)),
);

const docketEntryIdsByDocketNumber = await getDocketEntryIdsByDocketNumbers({
applicationContext,
docketNumbers,
});

const { deletedAttachmentAuditRecords, updatedMessageFragments } =
await removePoisonAttachmentsFromMessages({
docketEntryIdsByDocketNumber,
messageFragments,
});

if (liveRun) {
console.log(`Updating ${updatedMessageFragments.length} messages in DB...`);
await udpateMessagesInDb(db, updatedMessageFragments);
}

const auditFilename = 'corruptMessageCleanupAudit.json';
fs.writeFileSync(
auditFilename,
JSON.stringify(deletedAttachmentAuditRecords, null, 2),
);

console.log(
'------------------------------------------------------------------',
);
console.log(
`A log of attachments removed can be found here: ${path.resolve(__dirname, auditFilename)}`,
);
console.log(
'Removed attachments count: ',
deletedAttachmentAuditRecords.length,
);
console.log('Impacted messages count: ', updatedMessageFragments.length);
})();
6 changes: 4 additions & 2 deletions web-api/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ async function createConnection<T>({
throw new Error('token does not exist');
}

dbInstances[dbKey] = await connect({
const config = {
...POOL,
host,
password: token,
});
};

dbInstances[dbKey] = await connect(config);

return await cb(dbInstances[dbKey]!);
} catch (err) {
Expand Down
3 changes: 1 addition & 2 deletions web-api/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ export const environment = {
user: process.env.POSTGRES_USER || 'postgres',
},
readHost: process.env.POSTGRES_READ_HOST!,
useGlobalCert:
process.env.NODE_ENV === 'production' || process.env.CIRCLE_BRANCH,
useGlobalCert: true,
},
region,
s3Endpoint: isLocal
Expand Down

0 comments on commit ac9d2f3

Please sign in to comment.