Skip to content

Commit

Permalink
Merge pull request #5 from chientrm/batch
Browse files Browse the repository at this point in the history
D1.batch()
  • Loading branch information
chientrm committed Aug 28, 2023
2 parents e05513b + 5b7a078 commit 520ca04
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 44 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ import { connectD1 } from 'wrangler-proxy';
| Function | Status |
| ----------- | ------ |
| `prepare()` ||
| `batch()` | |
| `batch()` | |
| `dump()` ||
| `exec()` ||

Expand Down
5 changes: 5 additions & 0 deletions src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ export const jsonInit: ResponseInit = {
export const stringInit: ResponseInit = {
headers: { 'Content-Type': 'text/plain' },
};

export type R2PutOptionsExtra = R2PutOptions & {
onlyIfArr: [key: string, value: string][] | undefined;
httpMetadataArr: [key: string, value: string][] | undefined;
};
3 changes: 3 additions & 0 deletions src/factory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Data, Params } from './data';
import { D1DatabaseBatchProxy } from './proxies/d1_database/batch/proxy';
import { D1DatabaseExecProxy } from './proxies/d1_database/exec/proxy';
import { D1DatabasePreparedStatementAllProxy } from './proxies/d1_database/prepared_statement/all/proxy';
import { D1DatabasePreparedStatementFirstProxy } from './proxies/d1_database/prepared_statement/first/proxy';
Expand Down Expand Up @@ -31,6 +32,8 @@ class ProxyFactory {
return new D1DatabasePreparedStatementRawProxy({ name, metadata });
case D1DatabaseExecProxy.proxyType:
return new D1DatabaseExecProxy({ name, metadata });
case D1DatabaseBatchProxy.proxyType:
return new D1DatabaseBatchProxy({ name, metadata });
case FetcherFetchProxy.proxyType:
return new FetcherFetchProxy({ name, metadata, data });
case KVPutProxy.proxyType:
Expand Down
42 changes: 42 additions & 0 deletions src/proxies/d1_database/batch/proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { jsonInit } from '../../../data';
import { Proxy } from '../../proxy';
import {
D1DatabasePreparedStatementMetadata,
D1DatabasePreparedStatementProxy,
} from '../prepared_statement/proxy';

class D1DatabaseBatchProxy extends Proxy<
D1DatabasePreparedStatementMetadata[]
> {
static readonly proxyType = 'D1DatabaseBatchProxy';
constructor({
host,
name,
metadata,
}: {
host?: string;
name: string;
metadata: D1DatabasePreparedStatementMetadata[];
}) {
const proxyType = D1DatabaseBatchProxy.proxyType;
super({ proxyType, host, name, metadata, data: null });
}
async execute(env: any) {
const { name, metadata } = this,
d1 = env[name] as D1Database,
preparedStatements = metadata.map((metadata) =>
new D1DatabasePreparedStatementProxy({
name,
metadata,
data: null,
}).statement(d1)
),
result = await d1.batch(preparedStatements);
return new Response(JSON.stringify(result), jsonInit);
}
receive(response: Response): Promise<any> {
return response.json();
}
}

export { D1DatabaseBatchProxy };
11 changes: 11 additions & 0 deletions src/proxies/d1_database/prepared_statement/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ interface Metadata {
values?: any[];
}

export type D1DatabasePreparedStatementMetadata = Metadata;

class D1DatabasePreparedStatementProxy
extends ProxyHolder<Metadata>
implements D1PreparedStatement
Expand Down Expand Up @@ -61,6 +63,15 @@ class D1DatabasePreparedStatementProxy
});
return proxy.post();
}
statement(d1: D1Database) {
const { metadata } = this,
{ query, values } = metadata;
if (values) {
return d1.prepare(query).bind(...values);
} else {
return d1.prepare(query);
}
}
}

export { D1DatabasePreparedStatementProxy };
8 changes: 6 additions & 2 deletions src/proxies/d1_database/proxy_holder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ProxyHolder } from '../proxy';
import { D1DatabaseBatchProxy } from './batch/proxy';
import { D1DatabaseExecProxy } from './exec/proxy';
import { D1DatabasePreparedStatementProxy } from './prepared_statement/proxy';

Expand All @@ -16,9 +17,12 @@ class D1DatabaseProxyHolder extends ProxyHolder<{}> implements D1Database {
throw new Error('Method not implemented.');
}
batch<T = unknown>(
statements: D1PreparedStatement[]
statements: D1DatabasePreparedStatementProxy[]
): Promise<D1Result<T>[]> {
throw new Error('Method not implemented.');
const { host, name } = this,
metadata = statements.map((statement) => statement.metadata),
proxy = new D1DatabaseBatchProxy({ host, name, metadata });
return proxy.post();
}
async exec(query: string) {
const { host, name } = this,
Expand Down
47 changes: 27 additions & 20 deletions src/proxies/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import { Data } from '../data';
import { ProxyHolder } from './proxy_holder';

const checkOk = async (response: Response) => {
if (!response.ok) {
throw new Error(
JSON.stringify({
status: response.status,
statusText: response.statusText,
message: await response.text(),
})
);
}
};

abstract class Proxy<T> extends ProxyHolder<T> {
proxyType: string;
constructor({
Expand All @@ -21,29 +33,24 @@ abstract class Proxy<T> extends ProxyHolder<T> {
}
async post() {
const { host, name, proxyType, metadata, data } = this,
params = new URLSearchParams({
name,
proxyType,
metadata: JSON.stringify(metadata),
}),
method = 'POST',
response = await fetch(`${host!}?${params}`, {
method,
body: data,
code = Math.floor(Math.random() * 1000000).toString(),
response1 = await fetch(`${host!}/instruction`, {
method: 'POST',
headers: { 'X-Code': code },
body: JSON.stringify({ name, proxyType, metadata }),
// @ts-ignore
duplex: 'half',
});
if (response.ok) {
return this.receive(response);
} else {
throw new Error(
JSON.stringify({
status: response.status,
statusText: response.statusText,
message: await response.text(),
})
);
}
await checkOk(response1);
const response2 = await fetch(`${host!}/data`, {
method: 'POST',
headers: { 'X-Code': code },
body: data,
// @ts-ignore
duplex: 'half',
});
await checkOk(response2);
return this.receive(response2);
}
abstract execute(env: any): Promise<Response>;
abstract receive(response: Response): Promise<any>;
Expand Down
31 changes: 27 additions & 4 deletions src/proxies/r2/proxy_holder.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { R2PutOptionsExtra } from '../../data';
import { ProxyHolder } from '../proxy';
import { R2DeleteProxy } from './delete/proxy';
import { R2GetProxy } from './get/proxy';
Expand Down Expand Up @@ -99,14 +100,36 @@ export class R2ProxyHolder extends ProxyHolder<{}> implements R2Bucket {
if (typeof key !== 'string' || !data) {
throw new Error('Method not implemented.');
}
const { host, name } = this,
proxy = new R2PutProxy({
const { host, name } = this;
if (!options) {
const proxy = new R2PutProxy({
host,
name,
metadata: { key, options: options as R2PutOptions },
metadata: { key },
data,
});
return proxy.post();
return proxy.post();
} else {
const _options = options as R2PutOptions,
newOptions: R2PutOptionsExtra = {
..._options,
onlyIfArr:
_options.onlyIf instanceof Headers
? [..._options.onlyIf]
: undefined,
httpMetadataArr:
_options.httpMetadata instanceof Headers
? [..._options.httpMetadata]
: undefined,
},
proxy = new R2PutProxy({
host,
name,
metadata: { key, options: newOptions },
data,
});
return proxy.post();
}
}
createMultipartUpload(
key: string,
Expand Down
19 changes: 16 additions & 3 deletions src/proxies/r2/put/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Data } from '../../../data';
import { Data, R2PutOptionsExtra } from '../../../data';
import { Proxy } from '../../proxy';

interface Metadata {
key: string;
options?: R2PutOptions;
options?: R2PutOptionsExtra;
}

export class R2PutProxy extends Proxy<Metadata> {
Expand All @@ -27,7 +27,20 @@ export class R2PutProxy extends Proxy<Metadata> {
{ key, options } = metadata,
r2 = env[name] as R2Bucket,
value = data!;
await r2.put(key, value, options);
if (options) {
const newOptions: R2PutOptions = {
...options,
onlyIf: options.onlyIfArr
? new Headers(options.onlyIfArr)
: options.onlyIf,
httpMetadata: options.httpMetadataArr
? new Headers(options.httpMetadataArr)
: options.httpMetadata,
};
await r2.put(key, value, newOptions);
} else {
await r2.put(key, value);
}
return new Response();
}
receive(response: Response): Promise<any> {
Expand Down
37 changes: 23 additions & 14 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
import { Params } from './data';
import { ProxyFactory } from './factory';
import { D1DatabaseProxyHolder } from './proxies/d1_database/proxy_holder';
import { FetcherProxyHolder } from './proxies/fetcher/proxy_holder';
import { KVProxyHolder } from './proxies/kv/proxy_holder';
import { R2ProxyHolder } from './proxies/r2/proxy_holder';

const defaultHostname = 'http://127.0.0.1:8787',
paramsMap: Record<string, any> = {},
createWorker = () =>
<ExportedHandler>{
async fetch(
request: Request,
env: any,
ctx: ExecutionContext
): Promise<Response> {
if (request.method === 'POST')
if (request.method === 'POST') {
try {
const url = new URL(request.url),
searchParams = url.searchParams,
params: Params = {
name: searchParams.get('name')!,
proxyType: searchParams.get('proxyType')!,
metadata: JSON.parse(searchParams.get('metadata')!),
},
data = request.body,
proxy = ProxyFactory.getProxy(params, data),
response = await proxy.execute(env);
return response;
const url = new URL(request.url);
const code = request.headers.get('X-Code')!;
if (url.pathname === '/instruction') {
const values: Uint8Array[] = [];
for await (const value of request.body!) {
values.push(value);
}
const buffer = await new Blob(values).arrayBuffer(),
params = JSON.parse(new TextDecoder().decode(buffer));
paramsMap[code] = params;
return new Response();
} else if (url.pathname === '/data') {
const params = paramsMap[code];
delete paramsMap[code];
const proxy = ProxyFactory.getProxy(params, request.body),
response = await proxy.execute(env);
return response;
}
} catch (e: any) {
return new Response(
JSON.stringify({
Expand All @@ -36,6 +43,7 @@ const defaultHostname = 'http://127.0.0.1:8787',
{ status: 500 }
);
}
}
return new Response(null, { status: 404 });
},
},
Expand Down Expand Up @@ -87,5 +95,6 @@ export {
connectR2,
connectServiceBinding,
createWorker,
waitUntil,
waitUntil
};

0 comments on commit 520ca04

Please sign in to comment.