From 14e8b6d0ccb0d9ab78939bb95fbb1ea0c7d7a840 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 28 May 2024 13:19:59 +0100 Subject: [PATCH] feat: use blob-fetcher lib (#105) Uses `@web3-storage/blob-fetcher`, `@web3-storage/public-bucket` and `multipart-byte-range` to simplify code and ensure that freeway uses the same code as hoverboard to fetch blocks via HTTP range requests informed by location content claims. This removes the content claims code from the existing R2 blockstore so that it can be decomssioned more easily as there is clean separation between new world and old world. --- package-lock.json | 227 ++++++++++++++++++++++++++-- package.json | 5 +- src/lib/blockstore.js | 55 +++---- src/lib/dag-index/api.ts | 19 +-- src/lib/dag-index/car.js | 10 +- src/lib/dag-index/content-claims.js | 176 --------------------- src/lib/dag-index/entry.js | 25 --- src/middleware.js | 53 +++++-- test/helpers/bucket.js | 71 +++------ test/helpers/content-claims.js | 119 ++------------- test/index.spec.js | 84 +++------- 11 files changed, 328 insertions(+), 516 deletions(-) delete mode 100644 src/lib/dag-index/content-claims.js delete mode 100644 src/lib/dag-index/entry.js diff --git a/package-lock.json b/package-lock.json index 4e2f961..25e667c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@ipld/dag-cbor": "^9.0.8", "@ipld/dag-json": "^10.1.7", "@ipld/dag-pb": "^4.0.8", + "@web3-storage/blob-fetcher": "^2.1.0", "@web3-storage/content-claims": "^4.0.5", "@web3-storage/gateway-lib": "^5.0.1", "cardex": "^3.0.0", @@ -26,16 +27,16 @@ "@aws-sdk/client-s3": "^3.490.0", "@cloudflare/workers-types": "^4.20231218.0", "@ucanto/principal": "^8.1.0", - "@web3-storage/public-r2-bucket": "^1.2.1", + "@web3-storage/public-bucket": "^1.0.0", "ava": "^5.3.1", - "byteranges": "^1.1.0", "carbites": "^1.0.6", "carstream": "^2.1.0", "dotenv": "^16.3.1", "esbuild": "^0.18.20", "files-from-path": "^0.2.6", "ipfs-car": "^0.9.2", - "miniflare": "^2.14.2", + "miniflare": "^2.14.1", + "multipart-byte-range": "^3.0.1", "standard": "^17.1.0", "typescript": "^5.3.3", "uint8arrays": "^4.0.10" @@ -5373,6 +5374,149 @@ "web-streams-polyfill": "^3.1.1" } }, + "node_modules/@web3-storage/blob-fetcher": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/blob-fetcher/-/blob-fetcher-2.1.0.tgz", + "integrity": "sha512-Mb8pVWYA2/xBzKFMXtRBuFQOQ2tO4ZVjICfr2J/1WuZVw8PEJJQIerVr6pDHzFhzLGb+r8XbsH2Ynk5ctde73A==", + "dependencies": { + "@ucanto/interface": "^10.0.1", + "@web3-storage/blob-index": "^1.0.2", + "@web3-storage/content-claims": "^4.0.5", + "multiformats": "^13.1.0", + "multipart-byte-range": "^3.0.1", + "p-defer": "^4.0.1", + "p-queue": "^8.0.1" + } + }, + "node_modules/@web3-storage/blob-fetcher/node_modules/@ucanto/interface": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-10.0.1.tgz", + "integrity": "sha512-+Vr/N4mLsdynV9/bqtdFiq7WsUf3265/Qx2aHJmPtXo9/QvWKthJtpe0g8U4NWkWpVfqIFvyAO2db6D9zWQfQw==", + "dependencies": { + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" + } + }, + "node_modules/@web3-storage/blob-fetcher/node_modules/@ucanto/interface/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/blob-fetcher/node_modules/multiformats": { + "version": "13.1.0", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.1.0.tgz", + "integrity": "sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==" + }, + "node_modules/@web3-storage/blob-index": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.2.tgz", + "integrity": "sha512-N+yMIk2cmgaGYVy9EewsRx1sxSDv67i2IBlZ4y72a/+lVIAmb3ZP0IwZ+Med0xrNZShA4blxIGJm1LVF7Q4mSg==", + "dependencies": { + "@ipld/dag-cbor": "^9.0.6", + "@ucanto/core": "^10.0.1", + "@ucanto/interface": "^10.0.1", + "@web3-storage/capabilities": "^17.1.0", + "carstream": "^2.1.0", + "multiformats": "^13.0.1", + "uint8arrays": "^5.0.3" + }, + "engines": { + "node": ">=16.15" + } + }, + "node_modules/@web3-storage/blob-index/node_modules/@ucanto/interface": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-10.0.1.tgz", + "integrity": "sha512-+Vr/N4mLsdynV9/bqtdFiq7WsUf3265/Qx2aHJmPtXo9/QvWKthJtpe0g8U4NWkWpVfqIFvyAO2db6D9zWQfQw==", + "dependencies": { + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" + } + }, + "node_modules/@web3-storage/blob-index/node_modules/@ucanto/interface/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/blob-index/node_modules/uint8arrays": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", + "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", + "dependencies": { + "multiformats": "^13.0.0" + } + }, + "node_modules/@web3-storage/capabilities": { + "version": "17.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.1.0.tgz", + "integrity": "sha512-p5Wn2O3TSEZ7JFSph2KY9OuFnofbkhKi7Tp+1zcPEYAUsEvDWGabd1NvSPDDMpFBE74UX4ZljE8aQzDAtI3qRw==", + "dependencies": { + "@ucanto/core": "^10.0.1", + "@ucanto/interface": "^10.0.1", + "@ucanto/principal": "^9.0.1", + "@ucanto/transport": "^9.1.1", + "@ucanto/validator": "^9.0.2", + "@web3-storage/data-segment": "^3.2.0", + "uint8arrays": "^5.0.3" + } + }, + "node_modules/@web3-storage/capabilities/node_modules/@ucanto/interface": { + "version": "10.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-10.0.1.tgz", + "integrity": "sha512-+Vr/N4mLsdynV9/bqtdFiq7WsUf3265/Qx2aHJmPtXo9/QvWKthJtpe0g8U4NWkWpVfqIFvyAO2db6D9zWQfQw==", + "dependencies": { + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" + } + }, + "node_modules/@web3-storage/capabilities/node_modules/@ucanto/interface/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/capabilities/node_modules/@ucanto/principal": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-9.0.1.tgz", + "integrity": "sha512-8eAvaZHW1vyET4X90rkJv6pmW1IOdEYlZYwO3wDgTkC5m9VytBEywCvpzP57cavdYIbbPse5QS9nMEGvk87zhw==", + "dependencies": { + "@ipld/dag-ucan": "^3.4.0", + "@noble/curves": "^1.2.0", + "@noble/ed25519": "^1.7.3", + "@noble/hashes": "^1.3.2", + "@ucanto/interface": "^10.0.0", + "multiformats": "^11.0.2", + "one-webcrypto": "^1.0.3" + } + }, + "node_modules/@web3-storage/capabilities/node_modules/@ucanto/principal/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@web3-storage/capabilities/node_modules/uint8arrays": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", + "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", + "dependencies": { + "multiformats": "^13.0.0" + } + }, "node_modules/@web3-storage/content-claims": { "version": "4.0.5", "resolved": "https://registry.npmjs.org/@web3-storage/content-claims/-/content-claims-4.0.5.tgz", @@ -5423,6 +5567,25 @@ "npm": ">=7.0.0" } }, + "node_modules/@web3-storage/data-segment": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.2.0.tgz", + "integrity": "sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==", + "dependencies": { + "@ipld/dag-cbor": "^9.0.5", + "multiformats": "^11.0.2", + "sync-multihash-sha2": "^1.0.0" + } + }, + "node_modules/@web3-storage/data-segment/node_modules/multiformats": { + "version": "11.0.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", + "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, "node_modules/@web3-storage/gateway-lib": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/@web3-storage/gateway-lib/-/gateway-lib-5.0.1.tgz", @@ -5482,6 +5645,14 @@ "dagula": "bin.js" } }, + "node_modules/@web3-storage/gateway-lib/node_modules/multipart-byte-range": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/multipart-byte-range/-/multipart-byte-range-2.0.2.tgz", + "integrity": "sha512-S1t3lY/FeYpfpVG6XD1BIRBOsGwwBXyqK/T+w9sKVNDS1OSUFxjjhTBYkRyMw2VT5oBwZInlKK58qAgB8Rcniw==", + "dependencies": { + "byteranges": "^1.1.0" + } + }, "node_modules/@web3-storage/gateway-lib/node_modules/uint8arrays": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.0.1.tgz", @@ -5510,17 +5681,25 @@ "uglify-js": "^3.1.4" } }, - "node_modules/@web3-storage/public-r2-bucket": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/@web3-storage/public-r2-bucket/-/public-r2-bucket-1.2.1.tgz", - "integrity": "sha512-l5Ac3U9vxescuOBdn6j94bwZfu9CK6ZyE1fN7VWYJZyxAKqUyAGDlN8DMVmGbpvRqcfNVEdUzCuCsGZCl7zfeQ==", + "node_modules/@web3-storage/public-bucket": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@web3-storage/public-bucket/-/public-bucket-1.0.0.tgz", + "integrity": "sha512-Yn7E4ze50ldLPtK6zAuyYM/mEGGoQhFan6pX0zPScEoB8ehlJCwqALD+vLg7KA4DO3DGEPuFbhN0O+qR3ihr3Q==", "dev": true, "dependencies": { "@httpland/range-parser": "^1.2.0", - "@web3-storage/gateway-lib": "^5.0.1", "multipart-byte-range": "^2.0.2" } }, + "node_modules/@web3-storage/public-bucket/node_modules/multipart-byte-range": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/multipart-byte-range/-/multipart-byte-range-2.0.2.tgz", + "integrity": "sha512-S1t3lY/FeYpfpVG6XD1BIRBOsGwwBXyqK/T+w9sKVNDS1OSUFxjjhTBYkRyMw2VT5oBwZInlKK58qAgB8Rcniw==", + "dev": true, + "dependencies": { + "byteranges": "^1.1.0" + } + }, "node_modules/@zxing/text-encoding": { "version": "0.9.0", "resolved": "https://registry.npmjs.org/@zxing/text-encoding/-/text-encoding-0.9.0.tgz", @@ -6407,7 +6586,6 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/carstream/-/carstream-2.1.0.tgz", "integrity": "sha512-4kYIT1Y+GW/+o6wxS2tZlKnnINcgm4ceODBmyoLNaiQ17G2FNmzvUnQnVQkugC4NORTMCzD6KZEMT534XMJ4Yw==", - "dev": true, "dependencies": { "@ipld/dag-cbor": "^9.0.3", "multiformats": "^13.0.1", @@ -11342,11 +11520,12 @@ "integrity": "sha512-bt3R5iXe2O8xpp3wkmQhC73b/lC4S2ihU8Dndwcsysqbydqb8N+bpP116qMcClZ17g58iSIwtXUTcg2zT4sniA==" }, "node_modules/multipart-byte-range": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/multipart-byte-range/-/multipart-byte-range-2.0.2.tgz", - "integrity": "sha512-S1t3lY/FeYpfpVG6XD1BIRBOsGwwBXyqK/T+w9sKVNDS1OSUFxjjhTBYkRyMw2VT5oBwZInlKK58qAgB8Rcniw==", + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/multipart-byte-range/-/multipart-byte-range-3.0.1.tgz", + "integrity": "sha512-+LVbrwBmkEyClExud3FSN9xZuo0ZFWJ2yg0PxJCvnwjMch2x4Q0IXkpc97rF8k17/kNG37vN82NxVgOrQ07wlg==", "dependencies": { - "byteranges": "^1.1.0" + "streamsearch-web": "^1.0.0", + "uint8arraylist": "^2.4.8" } }, "node_modules/murmurhash3js-revisited": { @@ -11723,9 +11902,9 @@ } }, "node_modules/p-defer": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-4.0.0.tgz", - "integrity": "sha512-Vb3QRvQ0Y5XnF40ZUWW7JfLogicVh/EnA5gBIvKDJoYpeI82+1E3AlB9yOcKFS0AhHrWVnAQO39fbR0G99IVEQ==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-4.0.1.tgz", + "integrity": "sha512-Mr5KC5efvAK5VUptYEIopP1bakB85k2IWXaRC0rsh1uwn1L6M0LVml8OIQ4Gudg4oyZakf7FmeRLkMMtZW1i5A==", "engines": { "node": ">=12" }, @@ -13152,6 +13331,14 @@ "node": ">=10.0.0" } }, + "node_modules/streamsearch-web": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/streamsearch-web/-/streamsearch-web-1.0.0.tgz", + "integrity": "sha512-KBBU/O/xSjbr1z+NPwLE9iTrE3Pc/Ue7HumjvjjP1t7oYIM35OOMYRy/lZBoIwsiSKTnQ+uF8QbaJEa7FdJIzA==", + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/string_decoder": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", @@ -13380,6 +13567,14 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/sync-multihash-sha2": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/sync-multihash-sha2/-/sync-multihash-sha2-1.0.0.tgz", + "integrity": "sha512-A5gVpmtKF0ov+/XID0M0QRJqF2QxAsj3x/LlDC8yivzgoYCoWkV+XaZPfVu7Vj1T/hYzYS1tfjwboSbXjqocug==", + "dependencies": { + "@noble/hashes": "^1.3.1" + } + }, "node_modules/temp-dir": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/temp-dir/-/temp-dir-3.0.0.tgz", diff --git a/package.json b/package.json index 4edbea7..8135a51 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "@ipld/dag-cbor": "^9.0.8", "@ipld/dag-json": "^10.1.7", "@ipld/dag-pb": "^4.0.8", + "@web3-storage/blob-fetcher": "^2.1.0", "@web3-storage/content-claims": "^4.0.5", "@web3-storage/gateway-lib": "^5.0.1", "cardex": "^3.0.0", @@ -53,9 +54,8 @@ "@aws-sdk/client-s3": "^3.490.0", "@cloudflare/workers-types": "^4.20231218.0", "@ucanto/principal": "^8.1.0", - "@web3-storage/public-r2-bucket": "^1.2.1", + "@web3-storage/public-bucket": "^1.0.0", "ava": "^5.3.1", - "byteranges": "^1.1.0", "carbites": "^1.0.6", "carstream": "^2.1.0", "dotenv": "^16.3.1", @@ -63,6 +63,7 @@ "files-from-path": "^0.2.6", "ipfs-car": "^0.9.2", "miniflare": "^2.14.1", + "multipart-byte-range": "^3.0.1", "standard": "^17.1.0", "typescript": "^5.3.3", "uint8arrays": "^4.0.10" diff --git a/src/lib/blockstore.js b/src/lib/blockstore.js index 88bb13b..c499467 100644 --- a/src/lib/blockstore.js +++ b/src/lib/blockstore.js @@ -2,7 +2,7 @@ import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder' import { base58btc } from 'multiformats/bases/base58' import defer from 'p-defer' import { OrderedCarBlockBatcher } from './block-batch.js' -import * as IndexEntry from './dag-index/entry.js' +import { resolveRange } from '@web3-storage/blob-fetcher/fetcher' /** * @typedef {import('multiformats').UnknownLink} UnknownLink @@ -37,19 +37,6 @@ export class R2Blockstore { const entry = await this._idx.get(cid) if (!entry) return - if (IndexEntry.isLocated(entry)) { - for (const { url, range } of IndexEntry.toRequestCandidates(entry)) { - const headers = { Range: `bytes=${range[0]}-${range[1]}` } - const res = await fetch(url, { headers }) - if (!res.ok) { - console.warn(`failed to fetch ${url}: ${res.status} ${await res.text()}`) - continue - } - return { cid, bytes: new Uint8Array(await res.arrayBuffer()) } - } - return - } - const carPath = `${entry.origin}/${entry.origin}.car` const range = { offset: entry.offset } const res = await this._dataBucket.get(carPath, { range }) @@ -72,13 +59,8 @@ export class R2Blockstore { /** @param {UnknownLink} cid */ async stat (cid) { - const entry = await this._idx.get(cid) - if (!entry) return - - // stat API exists only for blobs (i.e. location claimed) - if (IndexEntry.isLocated(entry)) { - return { size: entry.site.range.length } - } + const block = await this.get(cid) + return block ? { size: block.bytes.size } : undefined } /** @@ -86,18 +68,21 @@ export class R2Blockstore { * @param {import('dagula').AbortOptions & import('dagula').RangeOptions} [options] */ async stream (cid, options) { - const entry = await this._idx.get(cid) - if (!entry) return - - for (const { url, range } of IndexEntry.toRequestCandidates(entry, options)) { - const headers = { Range: `bytes=${range[0]}-${range[1]}` } - const res = await fetch(url, { headers }) - if (!res.ok) { - console.warn(`failed to fetch ${url}: ${res.status} ${await res.text()}`) - continue + // Simulated streaming - it's not anticipated that legacy blockstore will + // be serving large blobs. + const block = await this.get(cid) + if (!block) return + return new ReadableStream({ + pull (controller) { + if (options?.range) { + const range = resolveRange(options.range, block.bytes.length) + controller.enqueue(block.bytes.slice(range[0], range[1] + 1)) + } else { + controller.enqueue(block.bytes) + } + controller.close() } - return /** @type {ReadableStream|undefined} */ (res?.body) - } + }) } } @@ -247,12 +232,6 @@ export class BatchingR2Blockstore extends R2Blockstore { const entry = await this._idx.get(cid) if (!entry) return - // TODO: batch with multipart gyte range request when we switch to reading - // from any URL. - if (IndexEntry.isLocated(entry)) { - return super.get(cid) - } - this.#batcher.add({ carCid: entry.origin, blockCid: cid, offset: entry.offset }) if (!entry.multihash) throw new Error('missing entry multihash') diff --git a/src/lib/dag-index/api.ts b/src/lib/dag-index/api.ts index a358135..e1a7a39 100644 --- a/src/lib/dag-index/api.ts +++ b/src/lib/dag-index/api.ts @@ -1,31 +1,16 @@ -import { MultihashDigest, UnknownLink } from 'multiformats' +import { UnknownLink } from 'multiformats' import { MultihashIndexItem } from 'cardex/multihash-index-sorted/api' import { CARLink } from 'cardex/api' -import { ByteRange } from '@web3-storage/content-claims/client/api' /** * A legacy index entry for which the exact location of the blob that contains * the block is unknown - assumed to be present in a bucket that freeway has * access to. */ -export interface NotLocatedIndexEntry extends MultihashIndexItem { +export interface IndexEntry extends MultihashIndexItem { origin: CARLink } -/** - * An index entry where the exact location of the block (URL and byte offset + - * length) has been found via a content claim. - */ -export interface LocatedIndexEntry { - digest: MultihashDigest - site: { - location: URL[], - range: Required - } -} - -export type IndexEntry = NotLocatedIndexEntry | LocatedIndexEntry - export interface Index { get (c: UnknownLink): Promise } diff --git a/src/lib/dag-index/car.js b/src/lib/dag-index/car.js index 79db37d..64477c7 100644 --- a/src/lib/dag-index/car.js +++ b/src/lib/dag-index/car.js @@ -4,7 +4,7 @@ import defer from 'p-defer' /** * @typedef {import('multiformats').UnknownLink} UnknownLink - * @typedef {import('./api.js').NotLocatedIndexEntry} NotLocatedIndexEntry + * @typedef {import('./api.js').IndexEntry} IndexEntry * @typedef {import('multiformats').ToString} MultihashString * @typedef {import('./api.js').Index} Index */ @@ -26,7 +26,7 @@ export class MultiCarIndex { /** * @param {UnknownLink} cid - * @returns {Promise} + * @returns {Promise} */ async get (cid) { const deferred = defer() @@ -56,10 +56,10 @@ export class StreamingCarIndex { /** @type {import('../../bindings.js').IndexSource} */ #source - /** @type {Map} */ + /** @type {Map} */ #idx = new Map() - /** @type {Map>>} */ + /** @type {Map>>} */ #promisedIdx = new Map() /** @type {boolean} */ @@ -86,7 +86,7 @@ export class StreamingCarIndex { const { done, value } = await idxReader.read() if (done) break - const entry = /** @type {import('./api.js').NotLocatedIndexEntry} */(value) + const entry = /** @type {import('./api.js').IndexEntry} */(value) entry.origin = entry.origin ?? this.#source.origin const key = mhToString(entry.multihash) diff --git a/src/lib/dag-index/content-claims.js b/src/lib/dag-index/content-claims.js deleted file mode 100644 index 58add91..0000000 --- a/src/lib/dag-index/content-claims.js +++ /dev/null @@ -1,176 +0,0 @@ -/* global ReadableStream */ -import * as Link from 'multiformats/link' -import * as raw from 'multiformats/codecs/raw' -import * as Claims from '@web3-storage/content-claims/client' -import { MultihashIndexSortedReader } from 'cardex/multihash-index-sorted' -import { Map as LinkMap } from 'lnmap' -import * as CAR from '../car.js' - -/** - * @typedef {import('multiformats').UnknownLink} UnknownLink - * @typedef {import('./api.js').NotLocatedIndexEntry} NotLocatedIndexEntry - * @typedef {import('./api.js').IndexEntry} IndexEntry - * @typedef {import('./api.js').Index} Index - */ - -/** @implements {Index} */ -export class ContentClaimsIndex { - /** - * Index store. - * @type {import('../../bindings.js').SimpleBucket} - */ - #bucket - /** - * Cached index entries. - * @type {Map} - */ - #cache - /** - * CIDs for which we have already fetched claims. - * - * Note: _only_ the CIDs which have been explicitly queried, for which we - * have made a content claim request. Not using `this.#cache` because reading - * a claim may cause us to add other CIDs to the cache that we haven't read - * claims for. - * - * Note: implemented as a Map not a Set so that we take advantage of the - * key cache that `lnmap` provides, so we don't duplicate base58 encoded - * multihash keys. - * @type {Map} - */ - #claimFetched - /** - * @type {URL|undefined} - */ - #serviceURL - - /** - * @param {import('../../bindings.js').SimpleBucket} bucket Bucket that stores CARs. - * @param {{ serviceURL?: URL }} [options] - */ - constructor (bucket, options) { - this.#bucket = bucket - this.#cache = new LinkMap() - this.#claimFetched = new LinkMap() - this.#serviceURL = options?.serviceURL - } - - /** - * @param {UnknownLink} cid - * @returns {Promise} - */ - async get (cid) { - // get the index data for this CID (CAR CID & offset) - let indexItem = this.#cache.get(cid) - - // read the index for _this_ CID to get the index data for it's _links_. - // - // when we get to the bottom of the tree (raw blocks), we want to be able - // to send back the index information without having to read claims for - // each leaf. We can only do that if we read the claims for the parent now. - if (indexItem) { - // we found the index data! ...if this CID is raw, then there's no links - // and no more index information to discover so don't read claims. - if (cid.code !== raw.code) { - await this.#readClaims(cid) - } - } else { - // we not found the index data! - await this.#readClaims(cid) - // seeing as we just read the index for this CID we _should_ have some - // index information for it now. - indexItem = this.#cache.get(cid) - // if not then, well, it's not found! - if (!indexItem) return - } - return indexItem - } - - /** - * Read claims for the passed CID and populate the cache. - * @param {import('multiformats').UnknownLink} cid - */ - async #readClaims (cid) { - if (this.#claimFetched.has(cid)) return - - const claims = await Claims.read(cid, { serviceURL: this.#serviceURL }) - for (const claim of claims) { - if (claim.type === 'assert/location' && claim.range?.length != null) { - this.#cache.set(cid, { - digest: cid.multihash, - site: { - location: claim.location.map(l => new URL(l)), - range: { offset: claim.range.offset, length: claim.range.length } - } - }) - continue - } - - // skip anything that is not a relation claim, since we know by - // our naming convention that our CAR files are named after their hash - // and we don't serve anything that we don't have in our own bucket. - if (claim.type !== 'assert/relation') continue - - // export the blocks from the claim - may include the CARv2 indexes - const blocks = [...claim.export()] - - // each part is a tuple of CAR CID (content) & CARv2 index CID (includes) - for (const { content, includes } of claim.parts) { - if (!isCARLink(content)) continue - if (!includes) continue - - /** @type {{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }|undefined} */ - let block = blocks.find(b => b.cid.toString() === includes.content.toString()) - - // if the index is not included in the claim, it should be in CARPARK - if (!block && includes.parts?.length) { - const obj = await this.#bucket.get(`${includes.parts[0]}/${includes.parts[0]}.car`) - if (!obj) continue - const blocks = await CAR.decode(new Uint8Array(await obj.arrayBuffer())) - block = blocks.find(b => b.cid.toString() === includes.content.toString()) - } - if (!block) continue - - const entries = await decodeIndex(content, block.bytes) - for (const entry of entries) { - const entryCid = Link.create(raw.code, entry.multihash) - // do not overwrite an existing LocatedIndexEntry - if (!this.#cache.has(entryCid)) { - this.#cache.set(entryCid, entry) - } - } - } - break - } - this.#claimFetched.set(cid, true) - } -} - -/** - * @param {import('multiformats').Link} cid - * @returns {cid is import('cardex/api').CARLink} - */ -const isCARLink = cid => cid.code === CAR.code - -/** - * Read a MultihashIndexSorted index for the passed origin CAR and return a - * list of IndexEntry. - * @param {import('cardex/api').CARLink} origin - * @param {Uint8Array} bytes - */ -const decodeIndex = async (origin, bytes) => { - const entries = [] - const readable = new ReadableStream({ - pull (controller) { - controller.enqueue(bytes) - controller.close() - } - }) - const reader = MultihashIndexSortedReader.createReader({ reader: readable.getReader() }) - while (true) { - const { done, value } = await reader.read() - if (done) break - entries.push(/** @type {NotLocatedIndexEntry} */({ origin, ...value })) - } - return entries -} diff --git a/src/lib/dag-index/entry.js b/src/lib/dag-index/entry.js deleted file mode 100644 index 28669a1..0000000 --- a/src/lib/dag-index/entry.js +++ /dev/null @@ -1,25 +0,0 @@ -/** - * An index entry is "located" if a content claim has specified it's location - * i.e. it is of type `LocatedIndexEntry`. - * - * @param {import('./api.js').IndexEntry} entry - * @returns {entry is import('./api.js').LocatedIndexEntry} - */ -export const isLocated = entry => 'site' in entry - -/** - * Convert an index entry into a list of URL+byterange for requesting the - * content. - * - * @typedef {{ url: URL, range: import('dagula').AbsoluteRange }} Candidate - * @param {import('./api.js').IndexEntry} entry - * @param {import('dagula').RangeOptions} [options] - * @returns {Candidate[]} - */ -export const toRequestCandidates = (entry, options) => { - if (!isLocated(entry)) return [] - const first = entry.site.range.offset + (options?.range?.[0] ?? 0) - const last = entry.site.range.offset + (options?.range?.[1] ?? (entry.site.range.length - 1)) - const range = /** @type {import('dagula').AbsoluteRange} */ ([first, last]) - return entry.site.location.map(url => ({ url, range })) -} diff --git a/src/middleware.js b/src/middleware.js index 0c7ba1c..89cde4b 100644 --- a/src/middleware.js +++ b/src/middleware.js @@ -4,9 +4,10 @@ import { composeMiddleware } from '@web3-storage/gateway-lib/middleware' import { CarReader } from '@ipld/car' import { parseCid, HttpError, toIterable } from '@web3-storage/gateway-lib/util' import { base32 } from 'multiformats/bases/base32' +import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching' +import * as ContentClaimsLocator from '@web3-storage/blob-fetcher/locator/content-claims' import { BatchingR2Blockstore } from './lib/blockstore.js' import { version } from '../package.json' -import { ContentClaimsIndex } from './lib/dag-index/content-claims.js' import { MultiCarIndex, StreamingCarIndex } from './lib/dag-index/car.js' import { CachingBucket, asSimpleBucket } from './lib/bucket.js' import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js' @@ -56,27 +57,55 @@ export function withCarBlockHandler (handler) { } /** - * Creates a dagula instance backed by the R2 blockstore backed by content claims. + * Creates a dagula instance backed by content claims. * * @type {import('@web3-storage/gateway-lib').Middleware} */ export function withContentClaimsDagula (handler) { return async (request, env, ctx) => { const { dataCid } = ctx - const index = new ContentClaimsIndex(asSimpleBucket(env.CARPARK), { + const locator = ContentClaimsLocator.create({ serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined }) - const found = await index.get(dataCid) - if (!found) { - // fallback to old index sources and dagula fallback - return composeMiddleware( - withIndexSources, - withDagulaFallback - )(handler)(request, env, ctx) + const locRes = await locator.locate(dataCid.multihash) + if (locRes.error) { + if (locRes.error.name === 'NotFound') { + // fallback to old index sources and dagula fallback + return composeMiddleware( + withIndexSources, + withDagulaFallback + )(handler)(request, env, ctx) + } + throw new Error(`failed to locate: ${dataCid}`, { cause: locRes.error }) } - const blockstore = new BatchingR2Blockstore(env.CARPARK, index) - const dagula = new Dagula(blockstore) + const fetcher = BatchingFetcher.create(locator) + const dagula = new Dagula({ + async get (cid) { + const res = await fetcher.fetch(cid.multihash) + return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined + }, + async stream (cid, options) { + const res = await fetcher.fetch(cid.multihash, options) + // In Miniflare, the response data is `structuredClone`'d - this + // causes the underlying `ArrayBuffer` to become "detached" and + // all Uint8Array views are reset to zero! So after the first + // chunk is sent, any additional chunks that are views on the + // same `ArrayBuffer` become Uint8Array(0), instead of the + // content they're supposed to contain. + // @ts-expect-error `MINIFLARE` is not a property of `globalThis` + if (globalThis.MINIFLARE && res.ok) { + return res.ok.stream().pipeThrough(new TransformStream({ + transform: (chunk, controller) => controller.enqueue(chunk.slice()) + })) + } + return res.ok ? res.ok.stream() : undefined + }, + async stat (cid) { + const res = await locator.locate(cid.multihash) + return res.ok ? { size: res.ok.site[0].range.length } : undefined + } + }) return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula }) } } diff --git a/test/helpers/bucket.js b/test/helpers/bucket.js index ace09f2..2a7bfc3 100644 --- a/test/helpers/bucket.js +++ b/test/helpers/bucket.js @@ -1,30 +1,29 @@ import http from 'node:http' -import { Readable } from 'node:stream' -import { pipeline } from 'node:stream/promises' -import * as PublicBucket from '@web3-storage/public-r2-bucket' +import * as PublicBucket from '@web3-storage/public-bucket/server/node' /** - * @typedef {{ close: () => void, getCallCount: () => number, resetCallCount: () => void, url: URL }} MockBucketService + * @typedef {import('@web3-storage/public-bucket').Bucket} Bucket + * @typedef {{ + * url: URL + * close: () => void + * getCallCount: () => number + * resetCallCount: () => void + * }} MockBucketService */ -/** @param {import('@cloudflare/workers-types').R2Bucket} bucket */ +/** + * @param {Bucket} bucket + * @returns {Promise} + */ export const mockBucketService = async (bucket) => { let callCount = 0 - const getCallCount = () => callCount - const resetCallCount = () => { - callCount = 0 - } + const resetCallCount = () => { callCount = 0 } - const handler = toNodeHttpHandler(PublicBucket.handler, { BUCKET: bucket }) - - const server = http.createServer(async (request, response) => { + const handler = PublicBucket.createHandler({ bucket }) + const server = http.createServer((request, response) => { callCount++ - if (!['GET', 'HEAD'].includes(request.method ?? 'GET')) { - response.writeHead(405) - return response.end() - } - await handler(request, response) + handler(request, response) }) await new Promise(resolve => server.listen(resolve)) const close = () => { @@ -34,41 +33,5 @@ export const mockBucketService = async (bucket) => { // @ts-expect-error const { port } = server.address() const url = new URL(`http://127.0.0.1:${port}`) - return { close, port, getCallCount, resetCallCount, url } -} - -/** - * @template E - * @template C - * @param {(request: Request, env: E, ctx?: C) => Promise} handler - * @param {E} env - * @param {C} [ctx] - */ -const toNodeHttpHandler = (handler, env, ctx) => { - /** @type {import('node:http').RequestListener} */ - return async (req, res) => { - const url = new URL(req.url || '', `http://${req.headers.host}`) - const headers = new Headers() - for (let i = 0; i < req.rawHeaders.length; i += 2) { - headers.append(req.rawHeaders[i], req.rawHeaders[i + 1]) - } - const { method } = req - const body = - /** @type {ReadableStream|undefined} */ - (['GET', 'HEAD'].includes(method ?? '') ? undefined : Readable.toWeb(req)) - const request = new Request(url, { method, headers, body }) - - const response = await handler(request, env, ctx) - - res.statusCode = response.status - res.statusMessage = response.statusText - response.headers.forEach((v, k) => res.setHeader(k, v)) - if (!response.body) { - res.end() - return - } - - // @ts-expect-error - await pipeline(Readable.fromWeb(response.body), res) - } + return { close, getCallCount, resetCallCount, url } } diff --git a/test/helpers/content-claims.js b/test/helpers/content-claims.js index 705e127..5d88ae0 100644 --- a/test/helpers/content-claims.js +++ b/test/helpers/content-claims.js @@ -2,15 +2,8 @@ import http from 'node:http' import { Writable } from 'node:stream' import { CARReaderStream, CARWriterStream } from 'carstream' -import * as raw from 'multiformats/codecs/raw' -import * as Block from 'multiformats/block' import { sha256 } from 'multiformats/hashes/sha2' -import { identity } from 'multiformats/hashes/identity' -import { blake2b256 } from '@multiformats/blake2/blake2b' import * as Link from 'multiformats/link' -import * as pb from '@ipld/dag-pb' -import * as cbor from '@ipld/dag-cbor' -import * as json from '@ipld/dag-json' import { Map as LinkMap } from 'lnmap' import { Assert } from '@web3-storage/content-claims/capability' import * as ed25519 from '@ucanto/principal/ed25519' @@ -19,104 +12,16 @@ import { CAR_CODE } from '../../src/constants.js' /** * @typedef {import('carstream/api').Block & { children: import('multiformats').UnknownLink[] }} RelationIndexData * @typedef {Map} Claims - * @typedef {{ setClaims: (c: Claims) => void, close: () => void, port: number, signer: import('@ucanto/interface').Signer, getCallCount: () => number, resetCallCount: () => void }} MockClaimsService + * @typedef {{ + * url: URL + * close: () => void + * signer: import('@ucanto/interface').Signer + * setClaims: (c: Claims) => void + * getCallCount: () => number + * resetCallCount: () => void + * }} MockClaimsService */ -const Decoders = { - [raw.code]: raw, - [pb.code]: pb, - [cbor.code]: cbor, - [json.code]: json -} - -const Hashers = { - [identity.code]: identity, - [sha256.code]: sha256, - [blake2b256.code]: blake2b256 -} - -/** - * @param {import('@ucanto/interface').Signer} signer - * @param {import('multiformats').UnknownLink} dataCid - * @param {import('cardex/api').CARLink} carCid - * @param {ReadableStream} carStream CAR file data - * @param {import('multiformats').Link} indexCid - * @param {import('cardex/api').CARLink} indexCarCid - */ -export const generateClaims = async (signer, dataCid, carCid, carStream, indexCid, indexCarCid) => { - /** @type {Claims} */ - const claims = new LinkMap() - - // partition claim for the data CID - claims.set(dataCid, [ - await encode(Assert.partition.invoke({ - issuer: signer, - audience: signer, - with: signer.did(), - nb: { - content: dataCid, - parts: [carCid] - } - })) - ]) - - /** @type {Map} */ - const indexData = new LinkMap() - - await carStream - .pipeThrough(new CARReaderStream()) - .pipeTo(new WritableStream({ - async write ({ cid, bytes }) { - const decoder = Decoders[cid.code] - if (!decoder) throw Object.assign(new Error(`missing decoder: ${cid.code}`), { code: 'ERR_MISSING_DECODER' }) - - const hasher = Hashers[cid.multihash.code] - if (!hasher) throw Object.assign(new Error(`missing hasher: ${cid.multihash.code}`), { code: 'ERR_MISSING_HASHER' }) - - const block = await Block.decode({ bytes, codec: decoder, hasher }) - indexData.set(cid, { cid, bytes, children: [...block.links()].map(([, cid]) => cid) }) - } - })) - - for (const [cid, { children }] of indexData) { - const invocation = Assert.relation.invoke({ - issuer: signer, - audience: signer, - with: signer.did(), - nb: { - content: cid, - children, - parts: [{ - content: carCid, - includes: { - content: indexCid, - parts: [indexCarCid] - } - }] - } - }) - - const blocks = claims.get(cid) ?? [] - blocks.push(await encode(invocation)) - claims.set(cid, blocks) - } - - // partition claim for the index - claims.set(indexCid, [ - await encode(Assert.partition.invoke({ - issuer: signer, - audience: signer, - with: signer.did(), - nb: { - content: indexCid, - parts: [indexCarCid] - } - })) - ]) - - return claims -} - /** * @param {import('@ucanto/interface').Signer} signer * @param {import('multiformats').Link} shard @@ -175,6 +80,7 @@ const encode = async invocation => { return { cid: Link.create(CAR_CODE, await sha256.digest(bytes.ok)), bytes: bytes.ok } } +/** @returns {Promise} */ export const mockClaimsService = async () => { let callCount = 0 /** @type {Claims} */ @@ -182,9 +88,7 @@ export const mockClaimsService = async () => { /** @param {Claims} s */ const setClaims = s => { claims = s } const getCallCount = () => callCount - const resetCallCount = () => { - callCount = 0 - } + const resetCallCount = () => { callCount = 0 } const server = http.createServer(async (req, res) => { callCount++ @@ -208,5 +112,6 @@ export const mockClaimsService = async () => { } // @ts-expect-error const { port } = server.address() - return { setClaims, close, port, signer: await ed25519.generate(), getCallCount, resetCallCount } + const url = new URL(`http://127.0.0.1:${port}`) + return { setClaims, close, url, signer: await ed25519.generate(), getCallCount, resetCallCount } } diff --git a/test/index.spec.js b/test/index.spec.js index 48357a0..fd725c6 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -1,6 +1,5 @@ import { describe, before, beforeEach, after, it } from 'node:test' import assert from 'node:assert' -import { Buffer } from 'node:buffer' import { randomBytes } from 'node:crypto' import { Miniflare } from 'miniflare' import { equals } from 'uint8arrays' @@ -10,10 +9,10 @@ import { sha256 } from 'multiformats/hashes/sha2' import * as raw from 'multiformats/codecs/raw' import { Map as LinkMap } from 'lnmap' import { CARReaderStream } from 'carstream' -import * as ByteRanges from 'byteranges' +import { MultipartByteRangeDecoder, decodePartHeader, getBoundary } from 'multipart-byte-range/decoder' import { Builder, toBlobKey } from './helpers/builder.js' import { MAX_CAR_BYTES_IN_MEMORY } from '../src/constants.js' -import { generateClaims, generateBlockLocationClaims, mockClaimsService, generateLocationClaim } from './helpers/content-claims.js' +import { generateBlockLocationClaims, mockClaimsService, generateLocationClaim } from './helpers/content-claims.js' import { mockBucketService } from './helpers/bucket.js' describe('freeway', () => { @@ -33,7 +32,7 @@ describe('freeway', () => { miniflare = new Miniflare({ bindings: { - CONTENT_CLAIMS_SERVICE_URL: `http://127.0.0.1:${claimsService.port}` + CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString() }, scriptPath: 'dist/worker.mjs', packagePath: true, @@ -188,53 +187,6 @@ describe('freeway', () => { assert(equals(input[0].content, output)) }) - it('should use content claims', async () => { - const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }] - // no dudewhere or satnav so only content claims can satisfy the request - const { dataCid, carCids, indexes } = await builder.add(input, { - dudewhere: false, - satnav: false - }) - - const carpark = await miniflare.getR2Bucket('CARPARK') - const res = await carpark.get(`${carCids[0]}/${carCids[0]}.car`) - assert(res) - - // @ts-expect-error nodejs ReadableStream does not implement ReadableStream interface correctly - const claims = await generateClaims(claimsService.signer, dataCid, carCids[0], res.body, indexes[0].cid, indexes[0].carCid) - claimsService.setClaims(claims) - - const res1 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${dataCid}/${input[0].path}`) - if (!res1.ok) assert.fail(`unexpected response: ${await res1.text()}`) - - const output = new Uint8Array(await res1.arrayBuffer()) - assert(equals(input[0].content, output)) - }) - - it('should use content claims by default', async () => { - const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }] - // no dudewhere or satnav so only content claims can satisfy the request - const { dataCid, carCids, indexes } = await builder.add(input, { - dudewhere: true, - satnav: true - }) - - const carpark = await miniflare.getR2Bucket('CARPARK') - const res = await carpark.get(`${carCids[0]}/${carCids[0]}.car`) - assert(res) - - // @ts-expect-error nodejs ReadableStream does not implement ReadableStream interface correctly - const claims = await generateClaims(claimsService.signer, dataCid, carCids[0], res.body, indexes[0].cid, indexes[0].carCid) - claimsService.setClaims(claims) - - const res1 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${dataCid}/${input[0].path}`) - if (!res1.ok) assert.fail(`unexpected response: ${await res1.text()}`) - - const output = new Uint8Array(await res1.arrayBuffer()) - assert(equals(input[0].content, output)) - assert.equal(claimsService.getCallCount(), 2) - }) - it('should use location content claim', async () => { const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }] // no dudewhere or satnav so only content claims can satisfy the request @@ -422,7 +374,7 @@ describe('freeway', () => { await res.body .pipeThrough(new CARReaderStream()) .pipeTo(new WritableStream({ - async write ({ cid, bytes, blockOffset, blockLength }) { + async write ({ bytes, blockOffset, blockLength }) { const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${blobCids[0]}?format=raw`, { headers: { Range: `bytes=${blockOffset}-${blockOffset + blockLength - 1}` @@ -466,19 +418,23 @@ describe('freeway', () => { } }) assert(res1.ok) + assert(res1.body) - const contentType = res1.headers.get('Content-Type') - assert(contentType) - - const boundary = contentType.replace('multipart/byteranges; boundary=', '') - const body = Buffer.from(await res1.arrayBuffer()) - - const parts = ByteRanges.parse(body, boundary) - assert.equal(parts.length, blocks.length) + const boundary = getBoundary(new Headers([...res1.headers.entries()])) + assert(boundary) - for (let i = 0; i < parts.length; i++) { - assert.equal(parts[i].type, 'application/vnd.ipld.raw') - assert(equals(parts[i].octets, blocks[i]?.bytes)) - } + let partsCount = 0 + await /** @type {ReadableStream} */ (res1.body) + .pipeThrough(new MultipartByteRangeDecoder(boundary)) + .pipeTo(new WritableStream({ + write (part) { + const block = blocks[partsCount] + const range = [block.blockOffset, block.blockOffset + block.blockLength - 1] + const headers = decodePartHeader(part.header) + assert.equal(headers.get('content-type'), 'application/vnd.ipld.raw') + assert.equal(headers.get('content-range'), `bytes ${range[0]}-${range[1]}/${input[0].content.length}`) + partsCount++ + } + })) }) })