Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions alchemy/src/cloudflare/d1-database.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { D1Database as D1DatabaseApi } from "@cloudflare/workers-types/experimental/index.ts";
import type { Context } from "../context.ts";
import { Resource, ResourceKind } from "../resource.ts";
import { Scope } from "../scope.ts";
Expand All @@ -10,9 +11,13 @@ import {
} from "./api.ts";
import { withJurisdiction } from "./bucket.ts";
import { cloneD1Database } from "./d1-clone.ts";
import { applyLocalD1Migrations } from "./d1-local-migrations.ts";
import { applyLocalD1Migrations } from "./d1-local.ts";
import { applyMigrations, listMigrationsFiles } from "./d1-migrations.ts";
import { deleteMiniflareBinding } from "./miniflare/delete.ts";
import {
makeAsyncProxyForBinding,
type Lazy,
} from "./miniflare/node-binding.ts";

const DEFAULT_MIGRATIONS_TABLE = "d1_migrations";

Expand Down Expand Up @@ -171,7 +176,7 @@ export type D1Database = Pick<
* The jurisdiction of the database
*/
jurisdiction: D1DatabaseJurisdiction;
};
} & Lazy<D1DatabaseApi>;

/**
* Creates and manages Cloudflare D1 Databases.
Expand Down Expand Up @@ -257,7 +262,7 @@ export async function D1Database(
? await listMigrationsFiles(props.migrationsDir)
: [];

return _D1Database(id, {
const database = await _D1Database(id, {
...props,
migrationsFiles,
dev: {
Expand All @@ -267,6 +272,13 @@ export async function D1Database(
force: Scope.current.local,
},
});

return makeAsyncProxyForBinding({
apiOptions: props,
name: `d1-${database.id}`,
binding: database,
properties: ["batch", "dump", "exec", "prepare", "withSession"],
});
}

const _D1Database = Resource(
Expand All @@ -275,7 +287,7 @@ const _D1Database = Resource(
this: Context<D1Database>,
id: string,
props: D1DatabaseProps,
): Promise<D1Database> {
): Promise<Omit<D1Database, keyof D1DatabaseApi>> {
const databaseName =
props.name ?? this.output?.name ?? this.scope.createPhysicalName(id);
const jurisdiction = props.jurisdiction ?? "default";
Expand All @@ -297,7 +309,6 @@ const _D1Database = Resource(
databaseId: dev.id,
migrationsTable: props.migrationsTable ?? DEFAULT_MIGRATIONS_TABLE,
migrations: props.migrationsFiles,
rootDir: this.scope.rootDir,
});
}
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
import * as mf from "miniflare";
import { Scope } from "../scope.ts";
import { getDefaultPersistPath } from "./miniflare/paths.ts";

export interface D1LocalMigrationOptions {
rootDir: string;
databaseId: string;
migrationsTable: string;
migrations: { id: string; sql: string }[];
}

export const applyLocalD1Migrations = async (
options: D1LocalMigrationOptions,
) => {
export interface MiniflareD1Options {
id: string;
remoteProxyConnectionString?: mf.RemoteProxyConnectionString;
}

export async function makeMiniflareD1(database: MiniflareD1Options) {
const miniflare = new mf.Miniflare({
script: "",
modules: true,
defaultPersistRoot: getDefaultPersistPath(options.rootDir),
defaultPersistRoot: getDefaultPersistPath(Scope.current.rootDir),
d1Persist: true,
d1Databases: { DB: options.databaseId },
d1Databases: {
DB: database,
},
log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined,
});
await miniflare.ready;
return {
db: await miniflare.getD1Database("DB"),
dispose: async () => {
await miniflare.dispose();
},
};
}

export const applyLocalD1Migrations = async (
options: D1LocalMigrationOptions,
) => {
const { db, dispose } = await makeMiniflareD1({ id: options.databaseId });
try {
await miniflare.ready;
// TODO(sam): don't use `any` once prisma is fixed upstream
const db: any = await miniflare.getD1Database("DB");
const session: any = db.withSession("first-primary");
await session
.prepare(
Expand All @@ -47,10 +62,17 @@ export const applyLocalD1Migrations = async (
if (appliedMigrations.results.some((m) => m.name === migration.id)) {
continue;
}
await session.prepare(migration.sql).run();
// split large migrations to prevent D1_ERROR: statement too long: SQLITE_TOOBIG
await session.batch(
migration.sql
.split(";")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if splitting by semicolon is a good idea. Drizzle splits by statement breakpoint (https://github.com/drizzle-team/drizzle-orm/blob/c445637df39366bcf47b12601896ce851771c1c2/drizzle-orm/src/migrator.ts#L44)

.flatMap((statement) =>
statement.trim() ? session.prepare(statement) : [],
),
);
await insertRecord.bind(migration.id).run();
}
} finally {
await miniflare.dispose();
await dispose();
}
};
90 changes: 73 additions & 17 deletions alchemy/src/cloudflare/miniflare/build-worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ export const buildWorkerOptions = async (
watch: (signal: AbortSignal) => AsyncGenerator<miniflare.WorkerOptions>;
remoteProxy: HTTPServer | undefined;
}> => {
const remoteBindings: RemoteBinding[] = [];
const options: Partial<BaseWorkerOptions> = {
const baseOptions: Partial<BaseWorkerOptions> = {
name: input.name,
compatibilityDate: input.compatibilityDate,
compatibilityFlags: input.compatibilityFlags,
Expand All @@ -85,6 +84,73 @@ export const buildWorkerOptions = async (
routes: [input.name],
};
const port = input.port ?? (await reservePort(input.name));
const { options: bindingsOptions, remoteProxy } = await buildBindings({
api: input.api,
name: input.name,
bindings: input.bindings ?? {},
eventSources: input.eventSources ?? [],
assets: input.assets,
port,
cwd: input.cwd,
});
async function* watch(signal: AbortSignal) {
for await (const bundle of input.bundle.watch(signal)) {
const { modules, rootPath } = normalizeBundle(bundle);
yield {
...baseOptions,
...bindingsOptions,
modules,
rootPath,
};
}
}
return {
watch,
remoteProxy,
};
};

export function setPersistenceOptions(
options: miniflare.MiniflareOptions,
worker: Partial<miniflare.WorkerOptions>,
) {
if (worker.analyticsEngineDatasets) {
options.analyticsEngineDatasetsPersist = true;
}
if (worker.d1Databases) {
options.d1Persist = true;
}
if (worker.durableObjects) {
options.durableObjectsPersist = true;
}
if (worker.kvNamespaces) {
options.kvPersist = true;
}
if (worker.r2Buckets) {
options.r2Persist = true;
}
if (worker.secretsStoreSecrets) {
options.secretsStorePersist = true;
}
if (worker.workflows) {
options.workflowsPersist = true;
}
}

export async function buildBindings(input: {
api: CloudflareApi;
name: string;
bindings: Bindings;
eventSources: EventSource[] | undefined;
assets?: AssetsConfig;
port: number;
cwd: string;
}): Promise<{
options: Partial<BaseWorkerOptions>;
remoteProxy: HTTPServer | undefined;
}> {
const options: Partial<BaseWorkerOptions> = {};
const remoteBindings: RemoteBinding[] = [];
for (const [key, binding] of Object.entries(input.bindings ?? {})) {
if (typeof binding === "string") {
(options.bindings ??= {})[key] = binding;
Expand All @@ -95,11 +161,11 @@ export const buildWorkerOptions = async (
continue;
}
if (binding.type === "cloudflare::Worker::DevDomain") {
(options.bindings ??= {})[key] = `localhost:${port}`;
(options.bindings ??= {})[key] = `localhost:${input.port}`;
continue;
}
if (binding.type === "cloudflare::Worker::DevUrl") {
(options.bindings ??= {})[key] = `http://localhost:${port}`;
(options.bindings ??= {})[key] = `http://localhost:${input.port}`;
continue;
}
switch (binding.type) {
Expand Down Expand Up @@ -363,16 +429,6 @@ export const buildWorkerOptions = async (
(options.queueConsumers ??= {})[eventSource.name] = {};
}
}
async function* watch(signal: AbortSignal) {
for await (const bundle of input.bundle.watch(signal)) {
const { modules, rootPath } = normalizeBundle(bundle);
yield {
...options,
modules,
rootPath,
};
}
}
if (remoteBindings.length > 0) {
const remoteProxy = await createRemoteProxyWorker({
api: input.api,
Expand Down Expand Up @@ -453,15 +509,15 @@ export const buildWorkerOptions = async (
}
}
return {
watch,
options,
remoteProxy: remoteProxy.server,
};
}
return {
watch,
options,
remoteProxy: undefined,
};
};
}

const moduleTypes = {
esm: "ESModule",
Expand Down
115 changes: 115 additions & 0 deletions alchemy/src/cloudflare/miniflare/node-binding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import * as mf from "miniflare";
import { Scope } from "../../scope.ts";
import {
createCloudflareApi,
type CloudflareApi,
type CloudflareApiOptions,
} from "../api.ts";
import type { Binding, Bindings } from "../bindings.ts";
import type { Bound } from "../bound.ts";
import { buildBindings } from "./build-worker-options.ts";
import { getDefaultPersistPath } from "./paths.ts";

async function makeMiniflare(
api: CloudflareApi,
name: string,
bindings: Bindings,
) {
const { options, remoteProxy } = await buildBindings({
api,
name,
bindings,
eventSources: undefined,
assets: undefined,
port: 0,
cwd: Scope.current.rootDir,
});
const miniflare = new mf.Miniflare({
script: "",
modules: true,
defaultPersistRoot: getDefaultPersistPath(Scope.current.rootDir),
log: process.env.DEBUG ? new mf.Log(mf.LogLevel.DEBUG) : undefined,
...options,
analyticsEngineDatasetsPersist: !!options.analyticsEngineDatasets,
d1Persist: !!options.d1Databases,
durableObjectsPersist: !!options.durableObjects,
kvPersist: !!options.kvNamespaces,
r2Persist: !!options.r2Buckets,
secretsStorePersist: !!options.secretsStoreSecrets,
workflowsPersist: !!options.workflows,
} as mf.MiniflareOptions);
await miniflare.ready;
return {
miniflare,
dispose: async () => {
await Promise.all([miniflare.dispose(), remoteProxy?.close()]);
},
};
}

export type Lazy<T> = T[keyof T] extends (...args: any[]) => Promise<any>
? T
: {
[k in keyof T]: EnsurePromiseReturnType<T[k]>;
};

type EnsurePromiseReturnType<T> = T extends (
...args: infer Args
) => infer Return
? (...args: Args) => Return extends Promise<any> ? Return : Promise<Return>
: never;

function makeAsyncProxy<Target extends object, Functions>(
target: Target,
make: () => Promise<Functions>,
properties: (keyof Functions)[],
): Target & Lazy<Functions> {
let promise: Promise<Functions> | undefined;
return new Proxy(target, {
get(target, prop) {
if (!properties.includes(prop as keyof Functions)) {
return Reflect.get(target, prop);
}
return async (...args: any[]) => {
promise ??= make();
const obj = await promise;
// @ts-expect-error - prop is a valid key of T
return obj[prop as keyof T].apply(obj, args);
};
},
has(target, prop) {
return (
properties.includes(prop as keyof Functions) ||
Reflect.has(target, prop)
);
},
}) as Target & Lazy<Functions>;
}

export function makeAsyncProxyForBinding<
B extends Extract<Binding, object>,
>(options: {
apiOptions: CloudflareApiOptions;
name: string;
binding: Omit<B, keyof Lazy<Bound<B>>>;
properties: (keyof NoInfer<Bound<B>>)[];
}): B {
const api = makeAsyncProxy(
{},
async () => await createCloudflareApi(options.apiOptions),
["get", "post", "put", "delete", "patch", "head"],
) as CloudflareApi;
return makeAsyncProxy(
options.binding,
async () => {
const { miniflare, dispose } = await makeMiniflare(api, options.name, {
binding: options.binding as B,
});
Scope.current.onCleanup(async () => {
await dispose();
});
return (await miniflare.getBindings())["binding"] as Bound<B>;
},
options.properties,
) as B;
}
4 changes: 3 additions & 1 deletion examples/cloudflare-worker-simple/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.alchemy/
.alchemy

migrations/02-seed.sql
Loading
Loading