Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion alchemy/src/cloudflare/bucket-object.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { R2PutOptions } from "@cloudflare/workers-types/experimental/index.ts";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it was causing a type error. Now it relies on the global from @cloudflare/workers-types.

import type { Context } from "../context.ts";
import { Resource } from "../resource.ts";
import { createCloudflareApi, type CloudflareApiOptions } from "./api.ts";
Expand Down
201 changes: 39 additions & 162 deletions alchemy/src/cloudflare/bucket.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import type { R2PutOptions } from "@cloudflare/workers-types/experimental/index.ts";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The R2PutOptions type from experimental is identical to the non-experimental type. I'm not sure the experimental type is needed any longer.

import * as mf from "miniflare";
import { isDeepStrictEqual } from "node:util";
import type { Context } from "../context.ts";
import { Resource, ResourceKind } from "../resource.ts";
import { Scope } from "../scope.ts";
import { streamToBuffer } from "../serde.ts";
import { isRetryableError } from "../state/r2-rest-state-store.ts";
import { withExponentialBackoff } from "../util/retry.ts";
import { CloudflareApiError, handleApiError } from "./api-error.ts";
Expand All @@ -22,7 +20,10 @@ import {
type R2BucketCustomDomainOptions,
} from "./bucket-custom-domain.ts";
import { deleteMiniflareBinding } from "./miniflare/delete.ts";
import { getDefaultPersistPath } from "./miniflare/paths.ts";
import {
makeAsyncProxy,
makeAsyncProxyForBinding,
} from "./miniflare/node-binding.ts";

export type R2BucketJurisdiction = "default" | "eu" | "fedramp";

Expand Down Expand Up @@ -306,23 +307,7 @@ export type R2Objects = {
}
);

export type R2Bucket = _R2Bucket & {
head(key: string): Promise<R2ObjectMetadata | null>;
get(key: string): Promise<R2ObjectContent | null>;
put(
key: string,
value:
| ReadableStream
| ArrayBuffer
| ArrayBufferView
| string
| null
| Blob,
options?: Pick<R2PutOptions, "httpMetadata">,
): Promise<PutR2ObjectResponse>;
delete(key: string): Promise<Response>;
list(options?: R2ListOptions): Promise<R2Objects>;
};
export type R2Bucket = _R2Bucket & globalThis.R2Bucket;

/**
* Output returned after R2 Bucket creation/update
Expand Down Expand Up @@ -461,9 +446,6 @@ export async function R2Bucket(
id: string,
props: BucketProps = {},
): Promise<R2Bucket> {
const scope = Scope.current;
const isLocal = scope.local && props.dev?.remote !== true;
const api = await createCloudflareApi(props);
const bucket = await _R2Bucket(id, {
...props,
dev: {
Expand All @@ -472,144 +454,31 @@ export async function R2Bucket(
},
});

let _miniflare: mf.Miniflare | undefined;
const miniflare = () => {
if (_miniflare) {
return _miniflare;
}
_miniflare = new mf.Miniflare({
script: "",
modules: true,
defaultPersistRoot: getDefaultPersistPath(scope.rootDir),
r2Buckets: [bucket.dev.id],
log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined,
});
scope.onCleanup(async () => _miniflare?.dispose());
return _miniflare;
};
const localBucket = () => miniflare().getR2Bucket(bucket.dev.id);

return {
...bucket,
head: async (key: string) => {
if (isLocal) {
const result = await (await localBucket()).head(key);
if (result) {
return {
key: result.key,
etag: result.etag,
uploaded: result.uploaded,
size: result.size,
httpMetadata: result.httpMetadata,
} as R2ObjectMetadata;
}
return null;
}
return headObject(api, {
bucketName: bucket.name,
key,
});
},
get: async (key: string) => {
if (isLocal) {
const result = await (await localBucket()).get(key);
if (result) {
// cast because workers vs node built-ins
return result as unknown as R2ObjectContent;
}
return null;
}
const response = await getObject(api, {
bucketName: bucket.name,
key,
});
if (response.ok) {
return parseR2Object(key, response);
} else if (response.status === 404) {
return null;
} else {
throw await handleApiError(response, "get", "object", key);
}
},
list: async (options?: R2ListOptions): Promise<R2Objects> => {
if (isLocal) {
return (await localBucket()).list(options);
}
return listObjects(api, bucket.name, {
...options,
jurisdiction: bucket.jurisdiction,
});
},
put: async (
key: string,
value: PutObjectObject,
options?: Pick<R2PutOptions, "httpMetadata">,
): Promise<PutR2ObjectResponse> => {
if (isLocal) {
return await (await localBucket()).put(
key,
typeof value === "string"
? value
: Buffer.isBuffer(value) ||
value instanceof Uint8Array ||
value instanceof ArrayBuffer
? new Uint8Array(value)
: value instanceof Blob
? new Uint8Array(await value.arrayBuffer())
: value instanceof ReadableStream
? new Uint8Array(await streamToBuffer(value))
: value,
options,
);
}
const response = await putObject(api, {
bucketName: bucket.name,
key: key,
object: value,
options: options,
});
const body = (await response.json()) as {
result: {
key: string;
etag: string;
uploaded: string;
version: string;
size: string;
};
};
return {
key: body.result.key,
etag: body.result.etag,
uploaded: new Date(body.result.uploaded),
version: body.result.version,
size: Number(body.result.size),
};
},
delete: async (key: string) => {
if (isLocal) {
await (await localBucket()).delete(key);
}
return deleteObject(api, {
bucketName: bucket.name,
key: key,
});
return makeAsyncProxyForBinding({
apiOptions: props,
name: id,
binding: bucket as Omit<R2Bucket, keyof globalThis.R2Bucket>,
properties: {
createMultipartUpload: true,
delete: true,
get: true,
head: true,
list: true,
put: true,
resumeMultipartUpload: (promise) => (key: string, uploadId: string) =>
makeAsyncProxy(
{ key, uploadId },
promise.then((bucket) => bucket.resumeMultipartUpload(key, uploadId)),
{
uploadPart: true,
abort: true,
complete: true,
},
),
Comment on lines +468 to +477
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resumeMultipartUpload does not return a promise. To match the type of the runtime API, we make this synchronous by returning another proxy.

This is also done for D1 prepared statements and sessions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show me an example of what the proxy fixes? Are all the properties Promise<T> or functions returning Promises?

},
};
});
}

const parseR2Object = (key: string, response: Response): R2ObjectContent => ({
etag: response.headers.get("ETag")!,
uploaded: parseDate(response.headers),
key,
size: Number(response.headers.get("Content-Length")),
httpMetadata: mapHeadersToHttpMetadata(response.headers),
arrayBuffer: () => response.arrayBuffer(),
bytes: () => response.bytes(),
text: () => response.text(),
json: () => response.json(),
blob: () => response.blob(),
});

const parseDate = (headers: Headers) =>
new Date(headers.get("Last-Modified") ?? headers.get("Date")!);

Expand Down Expand Up @@ -1143,7 +1012,9 @@ export async function putBucketLifecycleRules(
api.put(
`/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`,
rulesBody,
{ headers: withJurisdiction(props) },
{
headers: withJurisdiction(props),
},
),
);
}
Expand All @@ -1158,7 +1029,9 @@ export async function getBucketLifecycleRules(
): Promise<R2BucketLifecycleRule[]> {
const res = await api.get(
`/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`,
{ headers: withJurisdiction(props) },
{
headers: withJurisdiction(props),
},
);
const json: any = await res.json();
if (!json?.success) {
Expand Down Expand Up @@ -1196,7 +1069,9 @@ export async function putBucketLockRules(
api.put(
`/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`,
rulesBody,
{ headers: withJurisdiction(props) },
{
headers: withJurisdiction(props),
},
),
);
}
Expand All @@ -1211,7 +1086,9 @@ export async function getBucketLockRules(
): Promise<R2BucketLockRule[]> {
const res = await api.get(
`/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`,
{ headers: withJurisdiction(props) },
{
headers: withJurisdiction(props),
},
);
const json: any = await res.json();
if (!json?.success) {
Expand Down
61 changes: 58 additions & 3 deletions alchemy/src/cloudflare/d1-database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import { cloneD1Database } from "./d1-clone.ts";
import { applyLocalD1Migrations } from "./d1-local-migrations.ts";
import { applyMigrations, listMigrationsFiles } from "./d1-migrations.ts";
import { deleteMiniflareBinding } from "./miniflare/delete.ts";
import {
makeAsyncProxy,
makeAsyncProxyForBinding,
} from "./miniflare/node-binding.ts";

const DEFAULT_MIGRATIONS_TABLE = "d1_migrations";

Expand Down Expand Up @@ -171,7 +175,7 @@ export type D1Database = Pick<
* The jurisdiction of the database
*/
jurisdiction: D1DatabaseJurisdiction;
};
} & globalThis.D1Database;

/**
* Creates and manages Cloudflare D1 Databases.
Expand Down Expand Up @@ -257,7 +261,7 @@ export async function D1Database(
? await listMigrationsFiles(props.migrationsDir)
: [];

return _D1Database(id, {
const database = await _D1Database(id, {
...props,
migrationsFiles,
dev: {
Expand All @@ -267,6 +271,57 @@ export async function D1Database(
force: Scope.current.local,
},
});

function makePreparedStatementProxy(
promise: Promise<D1PreparedStatement>,
): D1PreparedStatement {
return makeAsyncProxy({}, promise, {
bind:
(promise) =>
(...args) =>
makePreparedStatementProxy(
promise.then((statement) => statement.bind(...args)),
),
first: true,
run: true,
all: true,
raw: true,
});
}

return makeAsyncProxyForBinding({
apiOptions: props,
name: id,
binding: database,
properties: {
prepare: (promise) => (query) =>
makePreparedStatementProxy(
promise.then((database) => database.prepare(query)),
),
batch: true,
exec: true,
withSession: (promise) => (constraintOrBookmark) =>
makeAsyncProxy(
{},
promise.then((database) =>
database.withSession(constraintOrBookmark),
),
{
prepare: (session) => (query) =>
makePreparedStatementProxy(
session.then((session) => session.prepare(query)),
),
batch: true,
getBookmark: () => () => {
throw new Error(
"D1DatabaseSession.getBookmark is not implemented",
);
},
},
),
Comment on lines +303 to +321
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add some tests for each of these APIs on all the resources given how much custom logic there is. Please add tests

dump: true,
},
});
}

const _D1Database = Resource(
Expand All @@ -275,7 +330,7 @@ const _D1Database = Resource(
this: Context<D1Database>,
id: string,
props: D1DatabaseProps,
): Promise<D1Database> {
): Promise<Omit<D1Database, keyof globalThis.D1Database>> {
const databaseName =
props.name ?? this.output?.name ?? this.scope.createPhysicalName(id);
const jurisdiction = props.jurisdiction ?? "default";
Expand Down
Loading
Loading