diff --git a/alchemy-web/src/content/docs/providers/cloudflare/d1-database.md b/alchemy-web/src/content/docs/providers/cloudflare/d1-database.md index e10c7479f..ee2f24005 100644 --- a/alchemy-web/src/content/docs/providers/cloudflare/d1-database.md +++ b/alchemy-web/src/content/docs/providers/cloudflare/d1-database.md @@ -60,6 +60,21 @@ const db = await D1Database("my-db", { }); ``` +## Importing Data + +Create a database with import SQL files. + +```ts +import { D1Database } from "alchemy/cloudflare"; + +const db = await D1Database("my-db", { + name: "my-db", + importDir: "./imports", +}); +``` + +The import files will be run using [Cloudflare's D1 import API](https://developers.cloudflare.com/d1/best-practices/import-export-data/). + ## With Location Hint Create a database with a specific location hint for optimal performance. diff --git a/alchemy/src/cloudflare/d1-database.ts b/alchemy/src/cloudflare/d1-database.ts index 672ff62a8..f3f0f0028 100644 --- a/alchemy/src/cloudflare/d1-database.ts +++ b/alchemy/src/cloudflare/d1-database.ts @@ -10,8 +10,10 @@ import { } from "./api.ts"; import { withJurisdiction } from "./bucket.ts"; import { cloneD1Database } from "./d1-clone.ts"; +import { importD1Database } from "./d1-import.ts"; import { applyLocalD1Migrations } from "./d1-local-migrations.ts"; -import { applyMigrations, listMigrationsFiles } from "./d1-migrations.ts"; +import { applyMigrations } from "./d1-migrations.ts"; +import { listSqlFiles, type D1SqlFile } from "./d1-sql-file.ts"; import { deleteMiniflareBinding } from "./miniflare/delete.ts"; const DEFAULT_MIGRATIONS_TABLE = "d1_migrations"; @@ -85,11 +87,10 @@ export interface D1DatabaseProps extends CloudflareApiOptions { clone?: D1Database | { id: string } | { name: string }; /** - * These files will be generated internally with the D1Database wrapper function when migrationsDir is specified - * - * @private + * Directory containing import SQL files. + * After migrations are applied, these files will be run using [Cloudflare's D1 import API](https://developers.cloudflare.com/d1/best-practices/import-export-data/). */ - migrationsFiles?: Array<{ id: string; sql: string }>; + importDir?: string; /** * Name of the table used to track migrations. Only used if migrationsDir is specified. Defaults to 'd1_migrations' @@ -102,6 +103,7 @@ export interface D1DatabaseProps extends CloudflareApiOptions { * This is analogous to wrangler's `migrations_dir`. */ migrationsDir?: string; + /** * Whether to emulate the database locally when Alchemy is running in watch mode. */ @@ -139,6 +141,7 @@ export type D1Database = Pick< | "migrationsTable" | "primaryLocationHint" | "readReplication" + | "importDir" > & { type: "d1"; /** @@ -251,15 +254,17 @@ export type D1Database = Pick< */ export async function D1Database( id: string, - props: Omit = {}, + props: Omit = {}, ): Promise { - const migrationsFiles = props.migrationsDir - ? await listMigrationsFiles(props.migrationsDir) - : []; + const [migrationsFiles, importFiles] = await Promise.all([ + props.migrationsDir ? await listSqlFiles(props.migrationsDir) : [], + props.importDir ? await listSqlFiles(props.importDir) : [], + ]); return _D1Database(id, { ...props, migrationsFiles, + importFiles, dev: { ...(props.dev ?? {}), // force local migrations to run even if the database was already deployed live @@ -274,7 +279,10 @@ const _D1Database = Resource( async function ( this: Context, id: string, - props: D1DatabaseProps, + props: D1DatabaseProps & { + migrationsFiles: Array; + importFiles: Array; + }, ): Promise { const databaseName = props.name ?? this.output?.name ?? this.scope.createPhysicalName(id); @@ -292,11 +300,12 @@ const _D1Database = Resource( const adopt = props.adopt ?? this.scope.adopt; if (local) { - if (props.migrationsFiles && props.migrationsFiles.length > 0) { + if (props.migrationsFiles?.length || props.importFiles?.length) { await applyLocalD1Migrations({ databaseId: dev.id, migrationsTable: props.migrationsTable ?? DEFAULT_MIGRATIONS_TABLE, - migrations: props.migrationsFiles, + migrations: props.migrationsFiles ?? [], + imports: props.importFiles ?? [], rootDir: this.scope.rootDir, }); } @@ -308,6 +317,7 @@ const _D1Database = Resource( primaryLocationHint: props.primaryLocationHint, migrationsDir: props.migrationsDir, migrationsTable: props.migrationsTable ?? DEFAULT_MIGRATIONS_TABLE, + importDir: props.importDir, dev, jurisdiction, }; @@ -401,12 +411,13 @@ const _D1Database = Resource( dbData = await createDatabase(api, databaseName, props); } + const databaseId = dbData.result.uuid!; + // Run migrations if provided if (props.migrationsFiles && props.migrationsFiles.length > 0) { try { const migrationsTable = props.migrationsTable || DEFAULT_MIGRATIONS_TABLE; - const databaseId = dbData.result.uuid || this.output?.id; if (!databaseId) { throw new Error("Database ID not found for migrations"); @@ -424,19 +435,27 @@ const _D1Database = Resource( throw migrationErr; } } - if (!dbData.result.uuid) { - // TODO(sam): why would this ever happen? - throw new Error("Database ID not found"); + if (props.importFiles?.length) { + await Promise.all( + props.importFiles.map(async (file) => { + await importD1Database(api, { + databaseId, + sqlData: file.sql, + filename: file.id, + }); + }), + ); } return { type: "d1", - id: dbData.result.uuid!, + id: databaseId, name: databaseName, readReplication: props.readReplication, primaryLocationHint: props.primaryLocationHint, dev, migrationsDir: props.migrationsDir, migrationsTable: props.migrationsTable ?? DEFAULT_MIGRATIONS_TABLE, + importDir: props.importDir, jurisdiction, }; }, diff --git a/alchemy/src/cloudflare/d1-local-migrations.ts b/alchemy/src/cloudflare/d1-local-migrations.ts index 51d26202b..271f7c227 100644 --- a/alchemy/src/cloudflare/d1-local-migrations.ts +++ b/alchemy/src/cloudflare/d1-local-migrations.ts @@ -1,11 +1,13 @@ import * as mf from "miniflare"; +import type { D1SqlFile } from "./d1-sql-file.ts"; import { getDefaultPersistPath } from "./miniflare/paths.ts"; export interface D1LocalMigrationOptions { rootDir: string; databaseId: string; migrationsTable: string; - migrations: { id: string; sql: string }[]; + migrations: Array; + imports: Array; } export const applyLocalD1Migrations = async ( @@ -21,9 +23,8 @@ export const applyLocalD1Migrations = async ( }); try { await miniflare.ready; - // TODO(sam): don't use `any` once prisma is fixed upstream - const db: any = await miniflare.getD1Database("DB"); - const session: any = db.withSession("first-primary"); + const db = await miniflare.getD1Database("DB"); + const session = db.withSession("first-primary"); await session .prepare( `CREATE TABLE IF NOT EXISTS ${options.migrationsTable} ( @@ -33,22 +34,42 @@ export const applyLocalD1Migrations = async ( )`, ) .run(); - const appliedMigrations: { - results: { name: string }[]; + await session + .prepare( + `ALTER TABLE ${options.migrationsTable} ADD COLUMN type TEXT NOT NULL DEFAULT 'migration';`, + ) + .run(); + const applied: { + results: { name: string; type: "migration" | "import" }[]; } = await session .prepare( - `SELECT name FROM ${options.migrationsTable} ORDER BY applied_at ASC`, + `SELECT name, type FROM ${options.migrationsTable} ORDER BY applied_at ASC`, ) .all(); const insertRecord = session.prepare( - `INSERT INTO ${options.migrationsTable} (name) VALUES (?)`, + `INSERT INTO ${options.migrationsTable} (name, type) VALUES (?, ?)`, ); - for (const migration of options.migrations) { - if (appliedMigrations.results.some((m) => m.name === migration.id)) { + for (const { id, sql } of options.migrations) { + if (applied.results.some((m) => m.name === id)) { + continue; + } + await session.prepare(sql).run(); + await insertRecord.bind(id, "migration").run(); + } + for (const { id, sql, hash } of options.imports) { + const name = `${id}-${hash}`; + if (applied.results.some((m) => m.name === name)) { continue; } - await session.prepare(migration.sql).run(); - await insertRecord.bind(migration.id).run(); + await session.batch( + // Split into statements to prevent D1_ERROR: statement too long: SQLITE_TOOBIG. + // This is split naively by semicolons followed by newlines - not perfect but should work 99% of the time. + sql + .split(/;\r?\n/) + .filter((s) => s.trim()) + .map((s) => session.prepare(s)), + ); + await insertRecord.bind(name, "import").run(); } } finally { await miniflare.dispose(); diff --git a/alchemy/src/cloudflare/d1-migrations.ts b/alchemy/src/cloudflare/d1-migrations.ts index 0c054d0a7..7d8afbc6e 100644 --- a/alchemy/src/cloudflare/d1-migrations.ts +++ b/alchemy/src/cloudflare/d1-migrations.ts @@ -1,6 +1,3 @@ -import { glob } from "glob"; -import * as fs from "node:fs/promises"; -import path from "pathe"; import { logger } from "../util/logger.ts"; import { handleApiError } from "./api-error.ts"; import type { CloudflareApi } from "./api.ts"; @@ -14,16 +11,6 @@ export interface D1MigrationOptions { quiet?: boolean; } -const getPrefix = (name: string) => { - const prefix = name.split("_")[0]; - const num = Number.parseInt(prefix, 10); - return Number.isNaN(num) ? null : num; -}; - -async function readMigrationFile(filePath: string): Promise { - return fs.readFile(filePath, "utf-8"); -} - /** * Detects the current schema of the migration table. * Returns info about the table structure to determine if migration is needed. @@ -153,36 +140,6 @@ async function migrateLegacySchema( } } -/** - * Reads migration SQL files from the migrationsDir, sorted by filename. - * @param migrationsDir Directory containing .sql migration files - */ -export async function listMigrationsFiles( - migrationsDir: string, -): Promise> { - const entries = await glob("**/*.sql", { - cwd: migrationsDir, - }); - - const sqlFiles = entries.sort((a: string, b: string) => { - const aNum = getPrefix(a); - const bNum = getPrefix(b); - - if (aNum !== null && bNum !== null) return aNum - bNum; - if (aNum !== null) return -1; - if (bNum !== null) return 1; - - return a.localeCompare(b); - }); - - return await Promise.all( - sqlFiles.map(async (file) => ({ - id: file, - sql: await readMigrationFile(path.join(migrationsDir, file)), - })), - ); -} - /** * Ensures the migrations table exists in the D1 database with wrangler-compatible schema. * Handles migration from legacy 2-column schema to 3-column schema if needed. diff --git a/alchemy/src/cloudflare/d1-sql-file.ts b/alchemy/src/cloudflare/d1-sql-file.ts new file mode 100644 index 000000000..33ce10d99 --- /dev/null +++ b/alchemy/src/cloudflare/d1-sql-file.ts @@ -0,0 +1,49 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import path from "pathe"; + +export interface D1SqlFile { + id: string; + sql: string; + hash: string; +} + +/** + * Lists SQL files from the directory, sorted by filename. + * @param directory Directory containing .sql files + */ +export async function listSqlFiles(directory: string): Promise { + const files = await Array.fromAsync( + fs.glob("**/*.sql", { + cwd: directory, + }), + ); + + const sortedFiles = files.sort((a: string, b: string) => { + const aNum = getPrefix(a); + const bNum = getPrefix(b); + + if (aNum !== null && bNum !== null) return aNum - bNum; + if (aNum !== null) return -1; + if (bNum !== null) return 1; + + return a.localeCompare(b); + }); + + return Promise.all( + sortedFiles.map(async (id) => { + const sql = await fs.readFile(path.join(directory, id), "utf-8"); + const hash = crypto.createHash("sha256").update(sql).digest("hex"); + const file: D1SqlFile = { id, sql, hash }; + // Make the sql property non-enumerable so it's not included in state. This prevents state store errors caused by large sql files. + Object.defineProperty(file, "sql", { enumerable: false }); + return file; + }), + ); +} + +const getPrefix = (name: string) => { + const prefix = name.split("_")[0]; + const num = Number.parseInt(prefix, 10); + return Number.isNaN(num) ? null : num; +}; diff --git a/alchemy/src/state/d1-state-store.ts b/alchemy/src/state/d1-state-store.ts index 378ca143a..4452619f8 100644 --- a/alchemy/src/state/d1-state-store.ts +++ b/alchemy/src/state/d1-state-store.ts @@ -78,12 +78,11 @@ const upsertDatabase = async (api: CloudflareApi, databaseName: string) => { const { listDatabases, createDatabase } = await import( "../cloudflare/d1-database.ts" ); - const { applyMigrations, listMigrationsFiles } = await import( - "../cloudflare/d1-migrations.ts" - ); + const { listSqlFiles } = await import("../cloudflare/d1-sql-file.ts"); + const { applyMigrations } = await import("../cloudflare/d1-migrations.ts"); const migrate = async (databaseId: string) => { await applyMigrations({ - migrationsFiles: await listMigrationsFiles(MIGRATIONS_DIRECTORY), + migrationsFiles: await listSqlFiles(MIGRATIONS_DIRECTORY), migrationsTable: "migrations", accountId: api.accountId, databaseId, diff --git a/examples/cloudflare-worker-simple/.gitignore b/examples/cloudflare-worker-simple/.gitignore index 3bca80fbd..b281b50d1 100644 --- a/examples/cloudflare-worker-simple/.gitignore +++ b/examples/cloudflare-worker-simple/.gitignore @@ -1 +1,2 @@ -.alchemy/ \ No newline at end of file +.alchemy/ +imports/ \ No newline at end of file diff --git a/examples/cloudflare-worker-simple/alchemy.run.ts b/examples/cloudflare-worker-simple/alchemy.run.ts index 076cdd8ba..fb247f972 100644 --- a/examples/cloudflare-worker-simple/alchemy.run.ts +++ b/examples/cloudflare-worker-simple/alchemy.run.ts @@ -9,6 +9,7 @@ import { } from "alchemy/cloudflare"; import assert from "node:assert"; import { spawn } from "node:child_process"; +import fs from "node:fs/promises"; import type { DO } from "./src/worker1.ts"; const app = await alchemy("cloudflare-worker-simple"); @@ -16,11 +17,30 @@ const app = await alchemy("cloudflare-worker-simple"); // to test with remote bindings, set to true const remote = false; +// create a large seed file for the database to test large migrations +if ( + await fs + .stat("imports/seed.sql") + .then(() => false) + .catch(() => true) +) { + await fs.mkdir("imports", { recursive: true }); + await fs.writeFile( + "imports/seed.sql", + Array.from( + { length: 40_000 }, + () => + `INSERT INTO users (name, email) VALUES ('${crypto.randomUUID()}', '${crypto.randomUUID()}@example.com');\n`, + ).join(""), + ); +} + const [d1, kv, r2] = await Promise.all([ D1Database("d1", { name: `${app.name}-${app.stage}-d1`, adopt: true, migrationsDir: "migrations", + importDir: "imports", dev: { remote }, }), KVNamespace("kv", { diff --git a/examples/cloudflare-worker-simple/e2e.test.ts b/examples/cloudflare-worker-simple/e2e.test.ts index 190421950..8ccce7c11 100644 --- a/examples/cloudflare-worker-simple/e2e.test.ts +++ b/examples/cloudflare-worker-simple/e2e.test.ts @@ -43,23 +43,40 @@ describe("cloudflare-worker-simple", () => { describe("d1", () => { before(async () => { - const res = await api.d1.$get().then((res) => res.json()); - for (const item of res) { - const res = await api.d1[":id"].$delete({ - param: { - id: item.id, - }, - }); - assert.equal(res.status, 200); - await res.body?.cancel(); - } + const res = await api.d1.$get(); + const json = await res.json(); + assert.equal(res.status, 200); + assert.equal(json.length, 40_000); // this is the number of users in the seed file }); after(async () => { const res = await api.d1.$get(); const json = await res.json(); assert.equal(res.status, 200); - assert.deepEqual(json, []); + assert.equal(json.length, 40_000); + }); + + it("import", async () => { + const line = await fs + .readFile("imports/seed.sql", "utf-8") + .then(async (file) => file.split("\n")[0]); + const match = line.match( + /INSERT INTO users \(name, email\) VALUES \('([^']+)', '([^']+)'\);/, + ); + const name = match?.[1]; + const email = match?.[2]; + assert.equal(typeof name, "string"); + assert.equal(typeof email, "string"); + const res = await api.d1[":id"].$get({ + param: { + id: "1", + }, + }); + const json = await res.json(); + assert.equal(res.status, 200); + assert.equal(json.name, name); + assert.equal(json.email, email); + assert.equal(typeof json.created_at, "string"); }); it("create, get, delete", async () => {