Skip to content

Commit

Permalink
feat(blob-fetcher): use updated blob fetcher (#124)
Browse files Browse the repository at this point in the history
# Goals

Read sharded dag indexes properly, even in the absense of location
claims

# Implementation

- use update blob-fetcher
- pass values to enable carpark fallback
- add two tests to verify:
- that normal index shard fetching works in the presence of location
claims
- that in the absense of location claims it can fill in the missing
values.
  • Loading branch information
hannahhoward authored Nov 1, 2024
1 parent 847829b commit 90bb605
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 39 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"author": "Alan Shaw",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@web3-storage/blob-fetcher": "^2.2.0",
"@web3-storage/blob-fetcher": "^2.3.1",
"@web3-storage/gateway-lib": "^5.1.2",
"dagula": "^8.0.0",
"http-range-parse": "^1.0.0",
Expand Down
4 changes: 4 additions & 0 deletions src/middleware/withLocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export function withLocator (handler) {
const locator = ContentClaimsLocator.create({
serviceURL: env.CONTENT_CLAIMS_SERVICE_URL
? new URL(env.CONTENT_CLAIMS_SERVICE_URL)
: undefined,
carpark: env.CARPARK,
carparkPublicBucketURL: env.CARPARK_PUBLIC_BUCKET_URL
? new URL(env.CARPARK_PUBLIC_BUCKET_URL)
: undefined
})
return handler(request, env, { ...ctx, locator })
Expand Down
3 changes: 3 additions & 0 deletions src/middleware/withLocator.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import {
Environment as MiddlewareEnvironment,
Context as MiddlewareContext,
} from '@web3-storage/gateway-lib'
import { R2Bucket } from '@cloudflare/workers-types'

export interface LocatorEnvironment extends MiddlewareEnvironment {
CONTENT_CLAIMS_SERVICE_URL?: string
CARPARK: R2Bucket
CARPARK_PUBLIC_BUCKET_URL?: string
}

export interface LocatorContext extends MiddlewareContext {
Expand Down
18 changes: 4 additions & 14 deletions test/helpers/bucket.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
import http from 'node:http'
import * as PublicBucket from '@web3-storage/public-bucket/server/node'

/**
* @typedef {import('@web3-storage/public-bucket').Bucket} Bucket
* @typedef {{
* url: URL
* close: () => void
* getCallCount: () => number
* resetCallCount: () => void
* }} MockBucketService
*/

/**
* @param {Bucket} bucket
* @param {import('node:http').Server} server
* @returns {Promise<MockBucketService>}
*/
export const mockBucketService = async (bucket) => {
export const mockBucketService = async (bucket, server) => {
let callCount = 0
const getCallCount = () => callCount
const resetCallCount = () => { callCount = 0 }

const handler = PublicBucket.createHandler({ bucket })
const server = http.createServer((request, response) => {
server.on('request', (request, response) => {
callCount++
handler(request, response)
})
await new Promise(resolve => server.listen(resolve))
const close = () => {
server.closeAllConnections()
server.close()
}
// @ts-expect-error
const { port } = server.address()
const url = new URL(`http://127.0.0.1:${port}`)
return { close, getCallCount, resetCallCount, url }
return { getCallCount, resetCallCount }
}
18 changes: 18 additions & 0 deletions test/helpers/content-claims.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ export const generateLocationClaim = async (signer, content, location, offset, l
return await encode(invocation)
}

/**
* @param {import('@ucanto/interface').Signer} signer
* @param {import('multiformats').UnknownLink} content
* @param {import('multiformats').Link} index
*/
export const generateIndexClaim = async (signer, content, index) => {
const invocation = Assert.index.invoke({
issuer: signer,
audience: signer,
with: signer.did(),
nb: {
content,
index
}
})
return await encode(invocation)
}

/**
* Encode a claim to a block.
* @param {import('@ucanto/interface').IPLDViewBuilder<import('@ucanto/interface').Delegation>} invocation
Expand Down
130 changes: 114 additions & 16 deletions test/miniflare/freeway.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import { Map as LinkMap } from 'lnmap'
import { CARReaderStream } from 'carstream'
import { MultipartByteRangeDecoder, decodePartHeader, getBoundary } from 'multipart-byte-range/decoder'
import { Builder, toBlobKey } from '../helpers/builder.js'
import { generateBlockLocationClaims, mockClaimsService, generateLocationClaim } from '../helpers/content-claims.js'
import { generateBlockLocationClaims, mockClaimsService, generateLocationClaim, generateIndexClaim } from '../helpers/content-claims.js'
import { mockBucketService } from '../helpers/bucket.js'

import { fromShardArchives } from '@web3-storage/blob-index/util'
import { CAR_CODE } from '../../src/constants.js'
import http from 'node:http'
/** @import { Block, Position } from 'carstream' */

/**
Expand All @@ -31,26 +33,36 @@ describe('freeway', () => {
let builder
/** @type {import('../helpers/content-claims.js').MockClaimsService} */
let claimsService
/** @type {import('miniflare').ReplaceWorkersTypes<import('@cloudflare/workers-types/experimental').R2Bucket>} */
let bucket
/** @type {http.Server} */
let server
/** @type {import('../helpers/bucket.js').MockBucketService} */
let bucketService

/** @type {URL} */
let url
before(async () => {
claimsService = await mockClaimsService()

server = http.createServer()
await new Promise((resolve) => server.listen(resolve))
// @ts-expect-error
const { port } = server.address()
url = new URL(`http://127.0.0.1:${port}`)
miniflare = new Miniflare({
bindings: {
CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString()
CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString(),
CARPARK_PUBLIC_BUCKET_URL: url.toString()
},
scriptPath: 'dist/worker.mjs',
modules: true,
compatibilityDate: '2023-06-17',
r2Buckets: ['CARPARK']
})

const bucket = await miniflare.getR2Bucket('CARPARK')
bucket = await miniflare.getR2Bucket('CARPARK')
bucketService = await mockBucketService(
/** @type {import('@web3-storage/public-bucket').Bucket} */
(bucket)
(bucket), server
)
builder = new Builder(bucket)
})
Expand All @@ -63,7 +75,8 @@ describe('freeway', () => {

after(() => {
claimsService.close()
bucketService.close()
server.closeAllConnections()
server.close()
miniflare.dispose()
})

Expand All @@ -72,7 +85,7 @@ describe('freeway', () => {
const { root, shards } = await builder.add(input)

for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), bucketService.url)
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
Expand All @@ -85,6 +98,91 @@ describe('freeway', () => {
assertBlobEqual(input, await res.blob())
})

it('should get a file through a sharded dag index', async () => {
const input = new Blob([randomBytes(256)])
const { root, shards } = await builder.add(input)
/** @type {Uint8Array[]} */
const archives = []
/** @type {import('../helpers/content-claims.js').Claims} */
const claims = new LinkMap()
// get all archives and build location claims for them
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const shardContents = new Uint8Array(await res.arrayBuffer())
archives.push(shardContents)
const blocks = claims.get(shard) || []
blocks.push(await generateLocationClaim(claimsService.signer, shard, location, 0, shardContents.length))
claims.set(shard, blocks)
}
// build sharded dag index
const index = await fromShardArchives(root, archives)
const indexArchive = await index.archive()
assert(indexArchive.ok)
const digest = await sha256.digest(indexArchive.ok)
const indexLink = Link.create(CAR_CODE, digest)

// store sharded dag index
await bucket.put(toBlobKey(digest), indexArchive.ok)

// generate location claim for the index
const blocks = claims.get(indexLink) || []
const location = new URL(toBlobKey(indexLink.multihash), url)
blocks.push(await generateLocationClaim(claimsService.signer, indexLink, location, 0, indexArchive.ok.length))
claims.set(indexLink, blocks)

// generate index claim
const indexBlocks = claims.get(root) || []
indexBlocks.push(await generateIndexClaim(claimsService.signer, root, indexLink))
claims.set(root, indexBlocks)

claimsService.addClaims(claims)

const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`)
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)

assertBlobEqual(input, await res.blob())
})

it('should get a file through a sharded dag index, even without location claims for shards', async () => {
const input = new Blob([randomBytes(256)])
const { root, shards } = await builder.add(input)
/** @type {Uint8Array[]} */
const archives = []
/** @type {import('../helpers/content-claims.js').Claims} */
const claims = new LinkMap()
// get all archives and build location claims for them
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const shardContents = new Uint8Array(await res.arrayBuffer())
archives.push(shardContents)
}
// build sharded dag index
const index = await fromShardArchives(root, archives)
const indexArchive = await index.archive()
assert(indexArchive.ok)
const digest = await sha256.digest(indexArchive.ok)
const indexLink = Link.create(CAR_CODE, digest)

// store sharded dag index
await bucket.put(toBlobKey(digest), indexArchive.ok)

// generate index claim
const indexBlocks = claims.get(root) || []
indexBlocks.push(await generateIndexClaim(claimsService.signer, root, indexLink))
claims.set(root, indexBlocks)

claimsService.addClaims(claims)

const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`)
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)

assertBlobEqual(input, await res.blob())
})

it('should get a file in a directory', async () => {
const input = [
new File([randomBytes(256)], 'data.txt'),
Expand All @@ -93,7 +191,7 @@ describe('freeway', () => {
const { root, shards } = await builder.add(input)

for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), bucketService.url)
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
Expand All @@ -111,7 +209,7 @@ describe('freeway', () => {
const { root, shards } = await builder.add(input)

for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), bucketService.url)
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
Expand All @@ -129,7 +227,7 @@ describe('freeway', () => {
const { root, shards } = await builder.add(input)

for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), bucketService.url)
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
Expand Down Expand Up @@ -261,7 +359,7 @@ describe('freeway', () => {
const blobKey = toBlobKey(root.multihash)
await carpark.put(blobKey, input)

const location = new URL(blobKey, bucketService.url)
const location = new URL(blobKey, url)
const claim = await generateLocationClaim(claimsService.signer, root, location, 0, input.length)
claimsService.addClaims(new LinkMap([[root, [claim]]]))

Expand All @@ -286,7 +384,7 @@ describe('freeway', () => {
const blobKey = toBlobKey(cid.multihash)
await carpark.put(blobKey, input)

const location = new URL(blobKey, bucketService.url)
const location = new URL(blobKey, url)
const claim = await generateLocationClaim(claimsService.signer, cid, location, 0, input.length)
claimsService.addClaims(new LinkMap([[cid, [claim]]]))

Expand All @@ -307,7 +405,7 @@ describe('freeway', () => {
const input = [new File([randomBytes(1024 * 1024 * 5)], 'sargo.tar.xz')]
const { shards } = await builder.add(input)

const location = new URL(toBlobKey(shards[0].multihash), bucketService.url)
const location = new URL(toBlobKey(shards[0].multihash), url)
const claim = await generateLocationClaim(claimsService.signer, shards[0], location, 0, input[0].size)
claimsService.addClaims(new LinkMap([[shards[0], [claim]]]))

Expand Down Expand Up @@ -341,7 +439,7 @@ describe('freeway', () => {
const input = [new File([randomBytes(1024 * 1024 * 5)], 'sargo.tar.xz')]
const { shards } = await builder.add(input)

const location = new URL(toBlobKey(shards[0].multihash), bucketService.url)
const location = new URL(toBlobKey(shards[0].multihash), url)
const claim = await generateLocationClaim(claimsService.signer, shards[0], location, 0, input[0].size)
claimsService.addClaims(new LinkMap([[shards[0], [claim]]]))

Expand Down
Loading

0 comments on commit 90bb605

Please sign in to comment.