-
Notifications
You must be signed in to change notification settings - Fork 100
feat(cloudflare): node bindings for kv, d1, r2, and queue #1255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,8 @@ | ||
| import type { R2PutOptions } from "@cloudflare/workers-types/experimental/index.ts"; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| import * as mf from "miniflare"; | ||
| import { isDeepStrictEqual } from "node:util"; | ||
| import type { Context } from "../context.ts"; | ||
| import { Resource, ResourceKind } from "../resource.ts"; | ||
| import { Scope } from "../scope.ts"; | ||
| import { streamToBuffer } from "../serde.ts"; | ||
| import { isRetryableError } from "../state/r2-rest-state-store.ts"; | ||
| import { withExponentialBackoff } from "../util/retry.ts"; | ||
| import { CloudflareApiError, handleApiError } from "./api-error.ts"; | ||
|
|
@@ -22,7 +20,10 @@ import { | |
| type R2BucketCustomDomainOptions, | ||
| } from "./bucket-custom-domain.ts"; | ||
| import { deleteMiniflareBinding } from "./miniflare/delete.ts"; | ||
| import { getDefaultPersistPath } from "./miniflare/paths.ts"; | ||
| import { | ||
| makeAsyncProxy, | ||
| makeAsyncProxyForBinding, | ||
| } from "./miniflare/node-binding.ts"; | ||
|
|
||
| export type R2BucketJurisdiction = "default" | "eu" | "fedramp"; | ||
|
|
||
|
|
@@ -306,23 +307,7 @@ export type R2Objects = { | |
| } | ||
| ); | ||
|
|
||
| export type R2Bucket = _R2Bucket & { | ||
| head(key: string): Promise<R2ObjectMetadata | null>; | ||
| get(key: string): Promise<R2ObjectContent | null>; | ||
| put( | ||
| key: string, | ||
| value: | ||
| | ReadableStream | ||
| | ArrayBuffer | ||
| | ArrayBufferView | ||
| | string | ||
| | null | ||
| | Blob, | ||
| options?: Pick<R2PutOptions, "httpMetadata">, | ||
| ): Promise<PutR2ObjectResponse>; | ||
| delete(key: string): Promise<Response>; | ||
| list(options?: R2ListOptions): Promise<R2Objects>; | ||
| }; | ||
| export type R2Bucket = _R2Bucket & globalThis.R2Bucket; | ||
|
|
||
| /** | ||
| * Output returned after R2 Bucket creation/update | ||
|
|
@@ -461,9 +446,6 @@ export async function R2Bucket( | |
| id: string, | ||
| props: BucketProps = {}, | ||
| ): Promise<R2Bucket> { | ||
| const scope = Scope.current; | ||
| const isLocal = scope.local && props.dev?.remote !== true; | ||
| const api = await createCloudflareApi(props); | ||
| const bucket = await _R2Bucket(id, { | ||
| ...props, | ||
| dev: { | ||
|
|
@@ -472,144 +454,31 @@ export async function R2Bucket( | |
| }, | ||
| }); | ||
|
|
||
| let _miniflare: mf.Miniflare | undefined; | ||
| const miniflare = () => { | ||
| if (_miniflare) { | ||
| return _miniflare; | ||
| } | ||
| _miniflare = new mf.Miniflare({ | ||
| script: "", | ||
| modules: true, | ||
| defaultPersistRoot: getDefaultPersistPath(scope.rootDir), | ||
| r2Buckets: [bucket.dev.id], | ||
| log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined, | ||
| }); | ||
| scope.onCleanup(async () => _miniflare?.dispose()); | ||
| return _miniflare; | ||
| }; | ||
| const localBucket = () => miniflare().getR2Bucket(bucket.dev.id); | ||
|
|
||
| return { | ||
| ...bucket, | ||
| head: async (key: string) => { | ||
| if (isLocal) { | ||
| const result = await (await localBucket()).head(key); | ||
| if (result) { | ||
| return { | ||
| key: result.key, | ||
| etag: result.etag, | ||
| uploaded: result.uploaded, | ||
| size: result.size, | ||
| httpMetadata: result.httpMetadata, | ||
| } as R2ObjectMetadata; | ||
| } | ||
| return null; | ||
| } | ||
| return headObject(api, { | ||
| bucketName: bucket.name, | ||
| key, | ||
| }); | ||
| }, | ||
| get: async (key: string) => { | ||
| if (isLocal) { | ||
| const result = await (await localBucket()).get(key); | ||
| if (result) { | ||
| // cast because workers vs node built-ins | ||
| return result as unknown as R2ObjectContent; | ||
| } | ||
| return null; | ||
| } | ||
| const response = await getObject(api, { | ||
| bucketName: bucket.name, | ||
| key, | ||
| }); | ||
| if (response.ok) { | ||
| return parseR2Object(key, response); | ||
| } else if (response.status === 404) { | ||
| return null; | ||
| } else { | ||
| throw await handleApiError(response, "get", "object", key); | ||
| } | ||
| }, | ||
| list: async (options?: R2ListOptions): Promise<R2Objects> => { | ||
| if (isLocal) { | ||
| return (await localBucket()).list(options); | ||
| } | ||
| return listObjects(api, bucket.name, { | ||
| ...options, | ||
| jurisdiction: bucket.jurisdiction, | ||
| }); | ||
| }, | ||
| put: async ( | ||
| key: string, | ||
| value: PutObjectObject, | ||
| options?: Pick<R2PutOptions, "httpMetadata">, | ||
| ): Promise<PutR2ObjectResponse> => { | ||
| if (isLocal) { | ||
| return await (await localBucket()).put( | ||
| key, | ||
| typeof value === "string" | ||
| ? value | ||
| : Buffer.isBuffer(value) || | ||
| value instanceof Uint8Array || | ||
| value instanceof ArrayBuffer | ||
| ? new Uint8Array(value) | ||
| : value instanceof Blob | ||
| ? new Uint8Array(await value.arrayBuffer()) | ||
| : value instanceof ReadableStream | ||
| ? new Uint8Array(await streamToBuffer(value)) | ||
| : value, | ||
| options, | ||
| ); | ||
| } | ||
| const response = await putObject(api, { | ||
| bucketName: bucket.name, | ||
| key: key, | ||
| object: value, | ||
| options: options, | ||
| }); | ||
| const body = (await response.json()) as { | ||
| result: { | ||
| key: string; | ||
| etag: string; | ||
| uploaded: string; | ||
| version: string; | ||
| size: string; | ||
| }; | ||
| }; | ||
| return { | ||
| key: body.result.key, | ||
| etag: body.result.etag, | ||
| uploaded: new Date(body.result.uploaded), | ||
| version: body.result.version, | ||
| size: Number(body.result.size), | ||
| }; | ||
| }, | ||
| delete: async (key: string) => { | ||
| if (isLocal) { | ||
| await (await localBucket()).delete(key); | ||
| } | ||
| return deleteObject(api, { | ||
| bucketName: bucket.name, | ||
| key: key, | ||
| }); | ||
| return makeAsyncProxyForBinding({ | ||
| apiOptions: props, | ||
| name: id, | ||
| binding: bucket as Omit<R2Bucket, keyof globalThis.R2Bucket>, | ||
| properties: { | ||
| createMultipartUpload: true, | ||
| delete: true, | ||
| get: true, | ||
| head: true, | ||
| list: true, | ||
| put: true, | ||
| resumeMultipartUpload: (promise) => (key: string, uploadId: string) => | ||
| makeAsyncProxy( | ||
| { key, uploadId }, | ||
| promise.then((bucket) => bucket.resumeMultipartUpload(key, uploadId)), | ||
| { | ||
| uploadPart: true, | ||
| abort: true, | ||
| complete: true, | ||
| }, | ||
| ), | ||
|
Comment on lines
+468
to
+477
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is also done for D1 prepared statements and sessions.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you show me an example of what the proxy fixes? Are all the properties |
||
| }, | ||
| }; | ||
| }); | ||
| } | ||
|
|
||
| const parseR2Object = (key: string, response: Response): R2ObjectContent => ({ | ||
| etag: response.headers.get("ETag")!, | ||
| uploaded: parseDate(response.headers), | ||
| key, | ||
| size: Number(response.headers.get("Content-Length")), | ||
| httpMetadata: mapHeadersToHttpMetadata(response.headers), | ||
| arrayBuffer: () => response.arrayBuffer(), | ||
| bytes: () => response.bytes(), | ||
| text: () => response.text(), | ||
| json: () => response.json(), | ||
| blob: () => response.blob(), | ||
| }); | ||
|
|
||
| const parseDate = (headers: Headers) => | ||
| new Date(headers.get("Last-Modified") ?? headers.get("Date")!); | ||
|
|
||
|
|
@@ -1143,7 +1012,9 @@ export async function putBucketLifecycleRules( | |
| api.put( | ||
| `/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`, | ||
| rulesBody, | ||
| { headers: withJurisdiction(props) }, | ||
| { | ||
| headers: withJurisdiction(props), | ||
| }, | ||
| ), | ||
| ); | ||
| } | ||
|
|
@@ -1158,7 +1029,9 @@ export async function getBucketLifecycleRules( | |
| ): Promise<R2BucketLifecycleRule[]> { | ||
| const res = await api.get( | ||
| `/accounts/${api.accountId}/r2/buckets/${bucketName}/lifecycle`, | ||
| { headers: withJurisdiction(props) }, | ||
| { | ||
| headers: withJurisdiction(props), | ||
| }, | ||
| ); | ||
| const json: any = await res.json(); | ||
| if (!json?.success) { | ||
|
|
@@ -1196,7 +1069,9 @@ export async function putBucketLockRules( | |
| api.put( | ||
| `/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`, | ||
| rulesBody, | ||
| { headers: withJurisdiction(props) }, | ||
| { | ||
| headers: withJurisdiction(props), | ||
| }, | ||
| ), | ||
| ); | ||
| } | ||
|
|
@@ -1211,7 +1086,9 @@ export async function getBucketLockRules( | |
| ): Promise<R2BucketLockRule[]> { | ||
| const res = await api.get( | ||
| `/accounts/${api.accountId}/r2/buckets/${bucketName}/lock`, | ||
| { headers: withJurisdiction(props) }, | ||
| { | ||
| headers: withJurisdiction(props), | ||
| }, | ||
| ); | ||
| const json: any = await res.json(); | ||
| if (!json?.success) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,10 @@ import { cloneD1Database } from "./d1-clone.ts"; | |
| import { applyLocalD1Migrations } from "./d1-local-migrations.ts"; | ||
| import { applyMigrations, listMigrationsFiles } from "./d1-migrations.ts"; | ||
| import { deleteMiniflareBinding } from "./miniflare/delete.ts"; | ||
| import { | ||
| makeAsyncProxy, | ||
| makeAsyncProxyForBinding, | ||
| } from "./miniflare/node-binding.ts"; | ||
|
|
||
| const DEFAULT_MIGRATIONS_TABLE = "d1_migrations"; | ||
|
|
||
|
|
@@ -171,7 +175,7 @@ export type D1Database = Pick< | |
| * The jurisdiction of the database | ||
| */ | ||
| jurisdiction: D1DatabaseJurisdiction; | ||
| }; | ||
| } & globalThis.D1Database; | ||
|
|
||
| /** | ||
| * Creates and manages Cloudflare D1 Databases. | ||
|
|
@@ -257,7 +261,7 @@ export async function D1Database( | |
| ? await listMigrationsFiles(props.migrationsDir) | ||
| : []; | ||
|
|
||
| return _D1Database(id, { | ||
| const database = await _D1Database(id, { | ||
| ...props, | ||
| migrationsFiles, | ||
| dev: { | ||
|
|
@@ -267,6 +271,57 @@ export async function D1Database( | |
| force: Scope.current.local, | ||
| }, | ||
| }); | ||
|
|
||
| function makePreparedStatementProxy( | ||
| promise: Promise<D1PreparedStatement>, | ||
| ): D1PreparedStatement { | ||
| return makeAsyncProxy({}, promise, { | ||
| bind: | ||
| (promise) => | ||
| (...args) => | ||
| makePreparedStatementProxy( | ||
| promise.then((statement) => statement.bind(...args)), | ||
| ), | ||
| first: true, | ||
| run: true, | ||
| all: true, | ||
| raw: true, | ||
| }); | ||
| } | ||
|
|
||
| return makeAsyncProxyForBinding({ | ||
| apiOptions: props, | ||
| name: id, | ||
| binding: database, | ||
| properties: { | ||
| prepare: (promise) => (query) => | ||
| makePreparedStatementProxy( | ||
| promise.then((database) => database.prepare(query)), | ||
| ), | ||
| batch: true, | ||
| exec: true, | ||
| withSession: (promise) => (constraintOrBookmark) => | ||
| makeAsyncProxy( | ||
| {}, | ||
| promise.then((database) => | ||
| database.withSession(constraintOrBookmark), | ||
| ), | ||
| { | ||
| prepare: (session) => (query) => | ||
| makePreparedStatementProxy( | ||
| session.then((session) => session.prepare(query)), | ||
| ), | ||
| batch: true, | ||
| getBookmark: () => () => { | ||
| throw new Error( | ||
| "D1DatabaseSession.getBookmark is not implemented", | ||
| ); | ||
| }, | ||
| }, | ||
| ), | ||
|
Comment on lines
+303
to
+321
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will need to add some tests for each of these APIs on all the resources given how much custom logic there is. Please add tests |
||
| dump: true, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| const _D1Database = Resource( | ||
|
|
@@ -275,7 +330,7 @@ const _D1Database = Resource( | |
| this: Context<D1Database>, | ||
| id: string, | ||
| props: D1DatabaseProps, | ||
| ): Promise<D1Database> { | ||
| ): Promise<Omit<D1Database, keyof globalThis.D1Database>> { | ||
| const databaseName = | ||
| props.name ?? this.output?.name ?? this.scope.createPhysicalName(id); | ||
| const jurisdiction = props.jurisdiction ?? "default"; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because it was causing a type error. Now it relies on the global from @cloudflare/workers-types.