diff --git a/alchemy/src/cloudflare/bucket-object.ts b/alchemy/src/cloudflare/bucket-object.ts index 5242a97a1..cc32848cc 100644 --- a/alchemy/src/cloudflare/bucket-object.ts +++ b/alchemy/src/cloudflare/bucket-object.ts @@ -1,4 +1,3 @@ -import type { R2PutOptions } from "@cloudflare/workers-types/experimental/index.ts"; import type { Context } from "../context.ts"; import { Resource } from "../resource.ts"; import { createCloudflareApi, type CloudflareApiOptions } from "./api.ts"; diff --git a/alchemy/src/cloudflare/bucket.ts b/alchemy/src/cloudflare/bucket.ts index 5c36957e1..d00806ece 100644 --- a/alchemy/src/cloudflare/bucket.ts +++ b/alchemy/src/cloudflare/bucket.ts @@ -1,10 +1,8 @@ import type { R2PutOptions } from "@cloudflare/workers-types/experimental/index.ts"; -import * as mf from "miniflare"; import { isDeepStrictEqual } from "node:util"; import type { Context } from "../context.ts"; import { Resource, ResourceKind } from "../resource.ts"; import { Scope } from "../scope.ts"; -import { streamToBuffer } from "../serde.ts"; import { isRetryableError } from "../state/r2-rest-state-store.ts"; import { withExponentialBackoff } from "../util/retry.ts"; import { CloudflareApiError, handleApiError } from "./api-error.ts"; @@ -22,7 +20,10 @@ import { type R2BucketCustomDomainOptions, } from "./bucket-custom-domain.ts"; import { deleteMiniflareBinding } from "./miniflare/delete.ts"; -import { getDefaultPersistPath } from "./miniflare/paths.ts"; +import { + makeAsyncProxy, + makeAsyncProxyForBinding, +} from "./miniflare/node-binding.ts"; export type R2BucketJurisdiction = "default" | "eu" | "fedramp"; @@ -306,23 +307,7 @@ export type R2Objects = { } ); -export type R2Bucket = _R2Bucket & { - head(key: string): Promise; - get(key: string): Promise; - put( - key: string, - value: - | ReadableStream - | ArrayBuffer - | ArrayBufferView - | string - | null - | Blob, - options?: Pick, - ): Promise; - delete(key: string): Promise; - list(options?: R2ListOptions): Promise; -}; +export type R2Bucket = _R2Bucket & globalThis.R2Bucket; /** * Output returned after R2 Bucket creation/update @@ -461,9 +446,6 @@ export async function R2Bucket( id: string, props: BucketProps = {}, ): Promise { - const scope = Scope.current; - const isLocal = scope.local && props.dev?.remote !== true; - const api = await createCloudflareApi(props); const bucket = await _R2Bucket(id, { ...props, dev: { @@ -472,144 +454,31 @@ export async function R2Bucket( }, }); - let _miniflare: mf.Miniflare | undefined; - const miniflare = () => { - if (_miniflare) { - return _miniflare; - } - _miniflare = new mf.Miniflare({ - script: "", - modules: true, - defaultPersistRoot: getDefaultPersistPath(scope.rootDir), - r2Buckets: [bucket.dev.id], - log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined, - }); - scope.onCleanup(async () => _miniflare?.dispose()); - return _miniflare; - }; - const localBucket = () => miniflare().getR2Bucket(bucket.dev.id); - - return { - ...bucket, - head: async (key: string) => { - if (isLocal) { - const result = await (await localBucket()).head(key); - if (result) { - return { - key: result.key, - etag: result.etag, - uploaded: result.uploaded, - size: result.size, - httpMetadata: result.httpMetadata, - } as R2ObjectMetadata; - } - return null; - } - return headObject(api, { - bucketName: bucket.name, - key, - }); - }, - get: async (key: string) => { - if (isLocal) { - const result = await (await localBucket()).get(key); - if (result) { - // cast because workers vs node built-ins - return result as unknown as R2ObjectContent; - } - return null; - } - const response = await getObject(api, { - bucketName: bucket.name, - key, - }); - if (response.ok) { - return parseR2Object(key, response); - } else if (response.status === 404) { - return null; - } else { - throw await handleApiError(response, "get", "object", key); - } - }, - list: async (options?: R2ListOptions): Promise => { - if (isLocal) { - return (await localBucket()).list(options); - } - return listObjects(api, bucket.name, { - ...options, - jurisdiction: bucket.jurisdiction, - }); - }, - put: async ( - key: string, - value: PutObjectObject, - options?: Pick, - ): Promise => { - if (isLocal) { - return await (await localBucket()).put( - key, - typeof value === "string" - ? value - : Buffer.isBuffer(value) || - value instanceof Uint8Array || - value instanceof ArrayBuffer - ? new Uint8Array(value) - : value instanceof Blob - ? new Uint8Array(await value.arrayBuffer()) - : value instanceof ReadableStream - ? new Uint8Array(await streamToBuffer(value)) - : value, - options, - ); - } - const response = await putObject(api, { - bucketName: bucket.name, - key: key, - object: value, - options: options, - }); - const body = (await response.json()) as { - result: { - key: string; - etag: string; - uploaded: string; - version: string; - size: string; - }; - }; - return { - key: body.result.key, - etag: body.result.etag, - uploaded: new Date(body.result.uploaded), - version: body.result.version, - size: Number(body.result.size), - }; - }, - delete: async (key: string) => { - if (isLocal) { - await (await localBucket()).delete(key); - } - return deleteObject(api, { - bucketName: bucket.name, - key: key, - }); + return makeAsyncProxyForBinding({ + apiOptions: props, + name: id, + binding: bucket as Omit, + properties: { + createMultipartUpload: true, + delete: true, + get: true, + head: true, + list: true, + put: true, + resumeMultipartUpload: (promise) => (key: string, uploadId: string) => + makeAsyncProxy( + { key, uploadId }, + promise.then((bucket) => bucket.resumeMultipartUpload(key, uploadId)), + { + uploadPart: true, + abort: true, + complete: true, + }, + ), }, - }; + }); } -const parseR2Object = (key: string, response: Response): R2ObjectContent => ({ - etag: response.headers.get("ETag")!, - uploaded: parseDate(response.headers), - key, - size: Number(response.headers.get("Content-Length")), - httpMetadata: mapHeadersToHttpMetadata(response.headers), - arrayBuffer: () => response.arrayBuffer(), - bytes: () => response.bytes(), - text: () => response.text(), - json: () => response.json(), - blob: () => response.blob(), -}); - const parseDate = (headers: Headers) => new Date(headers.get("Last-Modified") ?? headers.get("Date")!); @@ -1143,7 +1012,9 @@ export async function putBucketLifecycleRules( api.put( `/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`, rulesBody, - { headers: withJurisdiction(props) }, + { + headers: withJurisdiction(props), + }, ), ); } @@ -1158,7 +1029,9 @@ export async function getBucketLifecycleRules( ): Promise { const res = await api.get( `/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`, - { headers: withJurisdiction(props) }, + { + headers: withJurisdiction(props), + }, ); const json: any = await res.json(); if (!json?.success) { @@ -1196,7 +1069,9 @@ export async function putBucketLockRules( api.put( `/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`, rulesBody, - { headers: withJurisdiction(props) }, + { + headers: withJurisdiction(props), + }, ), ); } @@ -1211,7 +1086,9 @@ export async function getBucketLockRules( ): Promise { const res = await api.get( `/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`, - { headers: withJurisdiction(props) }, + { + headers: withJurisdiction(props), + }, ); const json: any = await res.json(); if (!json?.success) { diff --git a/alchemy/src/cloudflare/d1-database.ts b/alchemy/src/cloudflare/d1-database.ts index 672ff62a8..dec7e6dc9 100644 --- a/alchemy/src/cloudflare/d1-database.ts +++ b/alchemy/src/cloudflare/d1-database.ts @@ -13,6 +13,10 @@ import { cloneD1Database } from "./d1-clone.ts"; import { applyLocalD1Migrations } from "./d1-local-migrations.ts"; import { applyMigrations, listMigrationsFiles } from "./d1-migrations.ts"; import { deleteMiniflareBinding } from "./miniflare/delete.ts"; +import { + makeAsyncProxy, + makeAsyncProxyForBinding, +} from "./miniflare/node-binding.ts"; const DEFAULT_MIGRATIONS_TABLE = "d1_migrations"; @@ -171,7 +175,7 @@ export type D1Database = Pick< * The jurisdiction of the database */ jurisdiction: D1DatabaseJurisdiction; -}; +} & globalThis.D1Database; /** * Creates and manages Cloudflare D1 Databases. @@ -257,7 +261,7 @@ export async function D1Database( ? await listMigrationsFiles(props.migrationsDir) : []; - return _D1Database(id, { + const database = await _D1Database(id, { ...props, migrationsFiles, dev: { @@ -267,6 +271,57 @@ export async function D1Database( force: Scope.current.local, }, }); + + function makePreparedStatementProxy( + promise: Promise, + ): D1PreparedStatement { + return makeAsyncProxy({}, promise, { + bind: + (promise) => + (...args) => + makePreparedStatementProxy( + promise.then((statement) => statement.bind(...args)), + ), + first: true, + run: true, + all: true, + raw: true, + }); + } + + return makeAsyncProxyForBinding({ + apiOptions: props, + name: id, + binding: database, + properties: { + prepare: (promise) => (query) => + makePreparedStatementProxy( + promise.then((database) => database.prepare(query)), + ), + batch: true, + exec: true, + withSession: (promise) => (constraintOrBookmark) => + makeAsyncProxy( + {}, + promise.then((database) => + database.withSession(constraintOrBookmark), + ), + { + prepare: (session) => (query) => + makePreparedStatementProxy( + session.then((session) => session.prepare(query)), + ), + batch: true, + getBookmark: () => () => { + throw new Error( + "D1DatabaseSession.getBookmark is not implemented", + ); + }, + }, + ), + dump: true, + }, + }); } const _D1Database = Resource( @@ -275,7 +330,7 @@ const _D1Database = Resource( this: Context, id: string, props: D1DatabaseProps, - ): Promise { + ): Promise> { const databaseName = props.name ?? this.output?.name ?? this.scope.createPhysicalName(id); const jurisdiction = props.jurisdiction ?? "default"; diff --git a/alchemy/src/cloudflare/kv-namespace.ts b/alchemy/src/cloudflare/kv-namespace.ts index f83c6e50c..00ee8106e 100644 --- a/alchemy/src/cloudflare/kv-namespace.ts +++ b/alchemy/src/cloudflare/kv-namespace.ts @@ -1,3 +1,4 @@ +import * as mf from "miniflare"; import type { Context } from "../context.ts"; import { Resource, ResourceKind } from "../resource.ts"; import { Scope } from "../scope.ts"; @@ -13,7 +14,7 @@ import { type CloudflareApiOptions, } from "./api.ts"; import { deleteMiniflareBinding } from "./miniflare/delete.ts"; -import * as mf from "miniflare"; +import { makeAsyncProxyForBinding } from "./miniflare/node-binding.ts"; import { getDefaultPersistPath } from "./miniflare/paths.ts"; /** @@ -136,7 +137,7 @@ export type KVNamespace = Omit & { */ remote: boolean; }; -}; +} & globalThis.KVNamespace; /** * A Cloudflare KV Namespace is a key-value store that can be used to store data for your application. @@ -196,13 +197,26 @@ export async function KVNamespace( id: string, props: KVNamespaceProps = {}, ): Promise { - return await _KVNamespace(id, { + const namespace = await _KVNamespace(id, { ...props, dev: { ...(props.dev ?? {}), force: Scope.current.local, }, }); + + return makeAsyncProxyForBinding({ + apiOptions: props, + name: id, + binding: namespace, + properties: { + get: true, + list: true, + put: true, + getWithMetadata: true, + delete: true, + }, + }); } const _KVNamespace = Resource( @@ -211,7 +225,7 @@ const _KVNamespace = Resource( this: Context, id: string, props: KVNamespaceProps, - ): Promise { + ): Promise> { const title = props.title ?? this.output?.title ?? this.scope.createPhysicalName(id); diff --git a/alchemy/src/cloudflare/miniflare/build-worker-options.ts b/alchemy/src/cloudflare/miniflare/build-worker-options.ts index 22bd66566..7ae907486 100644 --- a/alchemy/src/cloudflare/miniflare/build-worker-options.ts +++ b/alchemy/src/cloudflare/miniflare/build-worker-options.ts @@ -69,11 +69,53 @@ export const buildWorkerOptions = async ( watch: (signal: AbortSignal) => AsyncGenerator; remoteProxy: HTTPServer | undefined; }> => { - const remoteBindings: RemoteBinding[] = []; - const options: Partial = { + const baseOptions: Partial = { name: input.name, compatibilityDate: input.compatibilityDate, compatibilityFlags: input.compatibilityFlags, + // This exposes the worker as a route that can be accessed by setting the MF-Route-Override header. + routes: [input.name], + }; + const port = input.port ?? (await reservePort(input.name)); + const { options: bindingsOptions, remoteProxy } = await buildBindings({ + api: input.api, + name: input.name, + bindings: input.bindings ?? {}, + eventSources: input.eventSources ?? [], + assets: input.assets, + port, + cwd: input.cwd, + }); + async function* watch(signal: AbortSignal) { + for await (const bundle of input.bundle.watch(signal)) { + const { modules, rootPath } = normalizeBundle(bundle); + yield { + ...baseOptions, + ...bindingsOptions, + modules, + rootPath, + }; + } + } + return { + watch, + remoteProxy, + }; +}; + +export async function buildBindings(input: { + api: CloudflareApi; + name: string; + bindings: Bindings; + eventSources: EventSource[] | undefined; + assets?: AssetsConfig; + port: number; + cwd: string; +}): Promise<{ + options: Partial; + remoteProxy: HTTPServer | undefined; +}> { + const options: Partial = { unsafeDirectSockets: [ // This matches the Wrangler configuration by exposing the default handler (e.g. `export default { fetch }`). { @@ -81,10 +123,8 @@ export const buildWorkerOptions = async ( proxy: true, }, ], - // This exposes the worker as a route that can be accessed by setting the MF-Route-Override header. - routes: [input.name], }; - const port = input.port ?? (await reservePort(input.name)); + const remoteBindings: RemoteBinding[] = []; for (const [key, binding] of Object.entries(input.bindings ?? {})) { if (typeof binding === "string") { (options.bindings ??= {})[key] = binding; @@ -95,11 +135,11 @@ export const buildWorkerOptions = async ( continue; } if (binding.type === "cloudflare::Worker::DevDomain") { - (options.bindings ??= {})[key] = `localhost:${port}`; + (options.bindings ??= {})[key] = `localhost:${input.port}`; continue; } if (binding.type === "cloudflare::Worker::DevUrl") { - (options.bindings ??= {})[key] = `http://localhost:${port}`; + (options.bindings ??= {})[key] = `http://localhost:${input.port}`; continue; } switch (binding.type) { @@ -363,16 +403,6 @@ export const buildWorkerOptions = async ( (options.queueConsumers ??= {})[eventSource.name] = {}; } } - async function* watch(signal: AbortSignal) { - for await (const bundle of input.bundle.watch(signal)) { - const { modules, rootPath } = normalizeBundle(bundle); - yield { - ...options, - modules, - rootPath, - }; - } - } if (remoteBindings.length > 0) { const remoteProxy = await createRemoteProxyWorker({ api: input.api, @@ -453,15 +483,15 @@ export const buildWorkerOptions = async ( } } return { - watch, + options, remoteProxy: remoteProxy.server, }; } return { - watch, + options, remoteProxy: undefined, }; -}; +} const moduleTypes = { esm: "ESModule", diff --git a/alchemy/src/cloudflare/miniflare/node-binding.ts b/alchemy/src/cloudflare/miniflare/node-binding.ts new file mode 100644 index 000000000..cc6e570a1 --- /dev/null +++ b/alchemy/src/cloudflare/miniflare/node-binding.ts @@ -0,0 +1,97 @@ +import * as mf from "miniflare"; +import { Scope } from "../../scope.ts"; +import { createCloudflareApi, type CloudflareApiOptions } from "../api.ts"; +import type { Binding } from "../bindings.ts"; +import type { Bound } from "../bound.ts"; +import { buildBindings } from "./build-worker-options.ts"; +import { getDefaultPersistPath } from "./paths.ts"; + +export function makeAsyncProxyForBinding< + B extends Extract, + const P extends Properties>, +>(input: { + apiOptions: CloudflareApiOptions; + name: string; + binding: Omit>; + properties: P; +}): B { + return makeAsyncProxy( + input.binding, + async () => { + const { options, remoteProxy } = await buildBindings({ + api: makeAsyncProxy({}, () => createCloudflareApi(input.apiOptions)), + name: input.name, + bindings: { + binding: input.binding as B, + }, + eventSources: undefined, + assets: undefined, + port: 0, + cwd: Scope.current.rootDir, + }); + const miniflare = new mf.Miniflare({ + script: "", + modules: true, + defaultPersistRoot: getDefaultPersistPath(Scope.current.rootDir), + log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined, + ...options, + analyticsEngineDatasetsPersist: !!options.analyticsEngineDatasets, + d1Persist: !!options.d1Databases, + durableObjectsPersist: !!options.durableObjects, + kvPersist: !!options.kvNamespaces, + r2Persist: !!options.r2Buckets, + secretsStorePersist: !!options.secretsStoreSecrets, + workflowsPersist: !!options.workflows, + } as mf.MiniflareOptions); + Scope.current.onCleanup(async () => { + await remoteProxy?.close(); + await miniflare.dispose(); + }); + await miniflare.ready; + return (await miniflare.getBindings())["binding"] as Bound; + }, + input.properties as any, + ) as B; +} + +type Properties = { + [K in keyof T]: T[K] extends (...args: any[]) => Promise + ? true + : (promise: Promise) => T[K]; +}; + +export function makeAsyncProxy< + Target extends object, + Value, + P extends Properties>, +>( + target: Target, + get: Promise | (() => Promise), + properties?: P, +): Target & Value { + let promise = typeof get === "function" ? undefined : get; + return new Proxy(target, { + get(target, prop) { + const property = properties?.[prop as keyof Omit]; + if (Reflect.has(target, prop) || !property) { + return Reflect.get(target, prop); + } + if (typeof property === "function") { + promise ??= typeof get === "function" ? get() : get; + return property(promise); + } + return async (...args: any[]) => { + promise ??= typeof get === "function" ? get() : get; + const obj = await promise; + // @ts-expect-error - prop is a valid key of Value + return obj[prop].apply(obj, args); + }; + }, + has(target, prop) { + return ( + !!properties?.[prop as keyof Omit] || + Reflect.has(target, prop) + ); + }, + }) as any; +} diff --git a/alchemy/src/cloudflare/queue.ts b/alchemy/src/cloudflare/queue.ts index 51df4dab4..b7dd83ee2 100644 --- a/alchemy/src/cloudflare/queue.ts +++ b/alchemy/src/cloudflare/queue.ts @@ -7,6 +7,7 @@ import { type CloudflareApi, type CloudflareApiOptions, } from "./api.ts"; +import { makeAsyncProxyForBinding } from "./miniflare/node-binding.ts"; /** * Settings for a Cloudflare Queue @@ -148,7 +149,7 @@ export type Queue = Omit & { */ remote: boolean; }; -}; +} & globalThis.Queue; /** * Creates and manages Cloudflare Queues. @@ -233,19 +234,28 @@ export async function Queue( id: string, props: QueueProps = {}, ): Promise> { - return await _Queue(id, { + const queue = await _Queue(id, { ...props, dev: { ...(props.dev ?? {}), force: Scope.current.local, }, }); + return makeAsyncProxyForBinding({ + apiOptions: props, + name: id, + binding: queue, + properties: { + send: true, + sendBatch: true, + }, + }); } const _Queue = Resource("cloudflare::Queue", async function < T = unknown, >(this: Context>, id: string, props: QueueProps = {}): Promise< - Queue + Omit, keyof globalThis.Queue> > { const queueName = props.name ?? this.output?.name ?? this.scope.createPhysicalName(id); diff --git a/alchemy/test/cloudflare/bucket.test.ts b/alchemy/test/cloudflare/bucket.test.ts index 262cb3020..5c8b9cb43 100644 --- a/alchemy/test/cloudflare/bucket.test.ts +++ b/alchemy/test/cloudflare/bucket.test.ts @@ -451,14 +451,15 @@ describe("R2 Bucket Resource", async () => { const testContent = "Hello, R2 Bucket Operations!"; const updatedContent = "Updated content for testing"; await bucket.delete(testKey); - let putObj = await bucket.put(testKey, testContent); + // TODO(john): this is a problem with @cloudflare/workers-types, it should not be nullable unless options.onlyIf is used + let putObj = (await bucket.put(testKey, testContent)) as R2Object; expect(putObj.size).toBeTypeOf("number"); expect(putObj.size).toEqual(testContent.length); let obj = await bucket.head(testKey); expect(obj).toBeDefined(); expect(obj?.etag).toEqual(putObj.etag); expect(obj?.size).toEqual(putObj.size); - putObj = await bucket.put(testKey, updatedContent); + putObj = (await bucket.put(testKey, updatedContent)) as R2Object; obj = await bucket.head(testKey); expect(obj?.etag).toEqual(putObj.etag); const getObj = await bucket.get(testKey);