Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cache): backup oom killed #1338

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 81 additions & 42 deletions scripts/plugins/s3-cache.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { S3, S3Bucket, S3Object } from "https://deno.land/x/s3@0.5.0/mod.ts";
import * as tar from "https://deno.land/std@0.153.0/archive/tar.ts";
import { parse } from "https://deno.land/[email protected]/flags/mod.ts";
import { ensureFile } from "https://deno.land/[email protected]/fs/ensure_file.ts";
import { ensureDir } from "https://deno.land/[email protected]/fs/ensure_dir.ts";
import { copy } from "https://deno.land/[email protected]/streams/conversion.ts";
import { readerFromStreamReader } from "https://deno.land/[email protected]/streams/conversion.ts";
import { ensureFile } from "https://deno.land/[email protected]/fs/ensure_file.ts";
import { walk } from "https://deno.land/[email protected]/fs/walk.ts";
import { Buffer } from "https://deno.land/[email protected]/io/buffer.ts";
import * as tar from "https://deno.land/[email protected]/archive/tar.ts";
import {
copy,
readableStreamFromReader,
readerFromStreamReader,
} from "https://deno.land/[email protected]/streams/conversion.ts";
import { S3, S3Bucket, S3Object } from "https://deno.land/x/[email protected]/mod.ts";
import { ClientOptions } from "https://deno.land/x/[email protected]/client.ts";
import { ServerError } from "https://deno.land/x/[email protected]/errors.ts";
import { S3Client } from "https://deno.land/x/[email protected]/mod.ts";
import * as transform from "https://deno.land/x/[email protected]/mod.ts";

const DEFAULT_KEEP_COUNT = 0;
const UPLOAD_PART_BYTES = 64 * 1024 * 1024;
/**
* CLI args
* --op=backup/restore
Expand All @@ -22,22 +28,23 @@ const DEFAULT_KEEP_COUNT = 0;
await main();

// ---------------------implement-------------------

async function main() {
const bucket = getBucket();
const args = parse(Deno.args);

if ("op" in args) {
const path = args["path"] || ".";
const op = args["op"];
const key = args["key"];
const keyPrefix = args["key-prefix"];

const bucket = getBucket();
switch (op) {
case "restore":
await restoreToDir(bucket, path, key!, args["key-prefix"]);
break;
case "remove":
await bucket.deleteObject(key!);
console.log(`deleted ${key}`)
break;
case "shrink":
{
Expand All @@ -63,7 +70,6 @@ async function main() {
});
}
break;

default:
throw new Error(`not supported operation: ${args["op"]}`);
}
Expand All @@ -75,20 +81,41 @@ async function main() {
function getBucket() {
const bucketName = Deno.env.get("BUCKET_NAME")!;
// Create a S3 instance.
const port = Number(Deno.env.get("BUCKET_PORT") || "80");
let endpointURL = `http://${Deno.env.get("BUCKET_HOST")!}`;
if (port != 80) {
endpointURL += `:${port}`;
}

const s3 = new S3({
accessKeyID: Deno.env.get("AWS_ACCESS_KEY_ID")!,
secretKey: Deno.env.get("AWS_SECRET_ACCESS_KEY")!,
region: Deno.env.get("BUCKET_REGION") || "ci",
endpointURL: `http://${Deno.env.get("BUCKET_HOST")}:${
Deno.env.get("BUCKET_PORT")
}`!,
endpointURL,
});

return s3.getBucket(bucketName);
}

function getBucketForBigUpload() {
const params: ClientOptions = {
endPoint: Deno.env.get("BUCKET_HOST")!,
region: Deno.env.get("BUCKET_REGION") || "ci",
accessKey: Deno.env.get("AWS_ACCESS_KEY_ID")!,
secretKey: Deno.env.get("AWS_SECRET_ACCESS_KEY")!,
bucket: Deno.env.get("BUCKET_NAME")!,
useSSL: false,
};

const port = Number(Deno.env.get("BUCKET_PORT") || "80");
if (port != 80) params.port = port;

return new S3Client(params);
}

async function save(
bucket: S3Bucket,
putBucket: S3Client,
path: string,
key: string,
filter?: string,
Expand All @@ -98,44 +125,58 @@ async function save(
const ret = await bucket.headObject(key);
if (ret) {
console.debug("object existed, skip");
return;
return key;
}

const { GzEncoder } = transform.Transformers;
const to = new tar.Tar();
const matchRegs = filter ? [new RegExp(filter)] : undefined;
// first clean space, then push new one.
if (cleanPrefix && cleanKeepCount) {
await cleanOld(bucket, cleanPrefix, cleanKeepCount);
}

const cwd = Deno.cwd();
Deno.chdir(path);
const stream = await newBackupReadableStream(filter);

await putBucket.putObject(key, stream, {
metadata: {
contentType: "application/x-tar",
contentEncoding: "gzip",
cacheControl: "public, no-transform",
},
partSize: UPLOAD_PART_BYTES,
})
.catch((e: ServerError) => {
const { code, statusCode, cause, bucketName, key, resource } = e;
console.error({ code, statusCode, cause, bucketName, key, resource });
throw e
})
.finally(() => Deno.chdir(cwd));

const newRet = await bucket.headObject(key).catch((e: ServerError) => {
const { code, statusCode, cause, bucketName, key, resource } = e;
console.error({ code, statusCode, cause, bucketName, key, resource });
console.trace(e);
throw e
});
console.debug(`uploaded item:`);
console.debug(newRet)
}

async function newBackupReadableStream(filter?: string) {
const { GzEncoder } = transform.Transformers;
const to = new tar.Tar();
const matchRegs = filter ? [new RegExp(filter)] : undefined;

console.debug(`start tar at: ${Date.now()}`);
for await (const entry of walk("./", { match: matchRegs })) {
if (!entry.isFile) {
continue;
}

await to.append(entry.path, { filePath: entry.path });
}
console.debug(`end tar at: ${Date.now()}`);

console.debug(`start pipeline to buffer at: ${Date.now()}`);
const reader = await transform.pipeline(to.getReader(), new GzEncoder());
const buf = new Buffer();
await reader.to(buf).finally(() => Deno.chdir(cwd));
console.debug(`end pipeline to buffer at: ${Date.now()}`);

// first clean space, then push new one.
if (cleanPrefix && cleanKeepCount) {
await cleanOld(bucket, cleanPrefix, cleanKeepCount);
}

console.debug(`start put object at: ${Date.now()}`);
await bucket.putObject(key, buf.bytes(), {
contentType: "application/x-tar",
contentEncoding: "gzip",
cacheControl: "public, no-transform",
});
console.debug(`end put object at: ${Date.now()}`);
const reader = transform.pipeline(to.getReader(), new GzEncoder());
return readableStreamFromReader(reader);
}

async function restoreToDir(
Expand All @@ -157,14 +198,12 @@ async function restore(
keyPrefix?: string,
) {
const restoreKey = await getRestoreKey(bucket, key, keyPrefix);
console.debug(restoreKey);

if (!restoreKey) {
console.log("cache missed");
return;
}

console.log(`key: ${restoreKey}`);
console.log(`will restore from key: ${restoreKey}`);
const ret = await bucket.getObject(restoreKey);
if (!ret) {
console.error(`get content failed for key: ${restoreKey}`);
Expand Down Expand Up @@ -194,10 +233,10 @@ async function getRestoreKey(
) {
const ret = await bucket.headObject(key);
if (ret) {
console.debug("key hit");
console.debug("key existed");
return key;
}
console.debug("key miss");
console.debug("key not existed");

// restore from other objects.
if (!keyPrefix) {
Expand Down Expand Up @@ -238,7 +277,7 @@ async function cleanOld(

const list = await listObjectsByModifiedTime(bucket, keyPrefix);
if (!list) {
console.log(`none objects founds by prefix ${keyPrefix}`);
console.log(`none objects founds by prefix: ${keyPrefix}`);
return;
}

Expand Down