Skip to content

Commit

Permalink
cleanup S3 client before exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
fennifith committed Dec 9, 2024
1 parent f7ef1f9 commit df3f24b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 27 deletions.
2 changes: 1 addition & 1 deletion worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"scripts": {
"build": "pkgroll",
"start": "node dist/index.js",
"dev": "DOTENV_CONFIG_PATH=../.env tsx -r dotenv/config src/index.ts",
"dev": "DOTENV_CONFIG_PATH=../.env tsx --inspect -r dotenv/config src/index.ts",
"lint": "eslint \"src/**/*.{js,mjs,ts}\" \"*.{js,mjs,ts}\"",
"lint:fix": "eslint --cache --fix \"src/**/*.{js,mjs,ts}\" \"*.{js,mjs,ts}\""
},
Expand Down
51 changes: 31 additions & 20 deletions worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
import { S3 } from "./s3";
import sql from "./sql";
import { urlMetadataTask } from "./url-metadata/task";
import { setTimeout } from "node:timers/promises";

const BATCH_SIZE = 1;

let lastTaskTime = Date.now();

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const tasks: Record<string, (input: any) => Promise<any>> = {
["url-metadata"]: urlMetadataTask,
};

while (true) {
async function pollTasks(): Promise<boolean> {
let hadRequests: boolean = false;

await sql.begin(async tx => {
const requests = await tx`SELECT * FROM TaskRequests FOR UPDATE SKIP LOCKED LIMIT ${BATCH_SIZE}`;
hadRequests = requests.length > 0;

await Promise.all(requests.map(async request => {
console.log(request);
const taskName = String(request.task);
const id = String(request.id);
console.log(`Begin task: ${taskName} / ${id}`);

const task = tasks[taskName];

Expand All @@ -37,27 +36,39 @@ while (true) {
result = null;
}

console.log("result", result);
await tx`DELETE FROM TaskRequests WHERE task=${taskName} AND id=${id}`;
await tx`INSERT INTO TaskResults (task, id, output) VALUES (${taskName}, ${id}, ${result})`;

console.log(`End task: ${taskName} / ${id}`, result);
}));
});

if (hadRequests) {
lastTaskTime = Date.now();
}

// WORKER_EXIT_WHEN_DONE will be used in production, to avoid running when no tasks are needed
// - workers are then manually invoked by the fly.io API on any incoming request
if (process.env.WORKER_EXIT_WHEN_DONE && Date.now() - lastTaskTime > 10000) {
console.log("10s withou any new tasks; exiting task loop...")
break;
} else {
await setTimeout(100);
}
return hadRequests;
}

console.log("Closing sql connection...");
await sql.end();
let lastTaskTime = Date.now();

try {
while (true) {
const hadRequests = await pollTasks();

if (hadRequests) {
lastTaskTime = Date.now();
}

console.log("Exiting...");
// WORKER_EXIT_WHEN_DONE will be used in production, to avoid running when no tasks are needed
// - workers are then manually invoked by the fly.io API on any incoming request
if (process.env.WORKER_EXIT_WHEN_DONE && Date.now() - lastTaskTime > 10000) {
console.log("10s without any new tasks; exiting task loop...")
break;
} else {
await setTimeout(100);
}
}
} finally {
console.log("Closing sql connection...");
await sql.end();
S3.destroy();

console.log("Exiting...");
}
1 change: 1 addition & 0 deletions worker/src/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export async function exists(bucket: string, key: string): Promise<boolean> {
}

export async function upload(bucket: string, key: string, tags: Record<string, string>, file: stream.Readable, contentType: string) {
console.log(`Uploading ${bucket}/${key}`);
const searchParams = new URLSearchParams(tags);

const upload = new Upload({
Expand Down
14 changes: 8 additions & 6 deletions worker/src/url-metadata/image-to-s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ export async function imageToS3(

if (path.extname(url.pathname) === ".svg") {
// If the image is an svg, optimize with svgo
const svg = await request.text();
const optimizedSvg = svgo.optimize(svg, { multipass: true }).data;
const uploadKey = `${key}-${urlHash}.svg`;

if (await exists(bucket, uploadKey)) {
console.log(`Using existing object for ${uploadKey}`);
await body.cancel();
return uploadKey;
} else {
console.log("Optimizing svg...");
const svg = await request.text();
const optimizedSvg = svgo.optimize(svg, { multipass: true }).data;
await upload(bucket, uploadKey, tags, stream.Readable.from([optimizedSvg]), "image/svg+xml");
return uploadKey;
}
Expand All @@ -42,15 +44,15 @@ export async function imageToS3(
const extension = metadata.format;
if (!extension) throw new Error(`Image format for ${url} could not be found.`);

// rescale the image to [size]
const transformer = sharp().resize(Math.min(width, metadata.width || width));
const transformerStream = metadataStream.pipe(transformer);

const uploadKey = `${key}-${urlHash}.${extension}`;
if (await exists(bucket, uploadKey)) {
console.log(`Using existing object for ${uploadKey}`);
return uploadKey;
} else {
// rescale the image to [size]
const transformer = sharp().resize(Math.min(width, metadata.width || width));
const transformerStream = metadataStream.pipe(transformer);

await upload(bucket, uploadKey, tags, transformerStream, `image/${extension}`);
return uploadKey;
}
Expand Down

0 comments on commit df3f24b

Please sign in to comment.