Skip to content

Commit

Permalink
feat: consume content claims (#65)
Browse files Browse the repository at this point in the history
Consumes relation claims from `https://claims.web3.storage` in order to
serve data.
  • Loading branch information
Alan Shaw authored Jul 17, 2023
1 parent accee0d commit 28ca299
Show file tree
Hide file tree
Showing 16 changed files with 7,968 additions and 14,099 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ The freeway currently works with the following R2 buckets:
* `CARPARK` - CAR file storage area. Key format `<CAR_CID>/<CAR_CID>.car`
* `SATNAV` - Indexes of block offsets within CARs. Key format `<CAR_CID>/<CAR_CID>.car.idx`, index format [`MultihashIndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0401-multihashindexsorted).
* `DUDEWHERE` - Mapping of root data CIDs to CAR CID(s). Key format `<DATA_CID>/<CAR_CID>`.
* `BLOCKLY` - Block+link [multi-indexes](https://github.com/web3-storage/specs/blob/73c386b999cf30fb648987ff9dce0516c1d91137/CARv2%20MultiIndex.md). Key format `<base58(BLOCK_MULTIHASH)>/.idx`.

How it works:

Expand Down
21,065 changes: 7,428 additions & 13,637 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-json": "^10.0.1",
"@ipld/dag-pb": "^4.0.2",
"@web3-storage/content-claims": "^2.1.1",
"@web3-storage/gateway-lib": "^3.2.4",
"cardex": "^2.2.2",
"cardex": "^2.3.1",
"chardet": "^1.5.0",
"dagula": "^7.0.0",
"lnmap": "^1.0.1",
"magic-bytes.js": "^1.0.12",
"mrmime": "^1.0.1",
"multiformats": "^11.0.2",
Expand All @@ -39,9 +41,11 @@
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20230628.0",
"@ucanto/principal": "^8.0.0",
"ava": "^5.2.0",
"carbites": "^1.0.6",
"esbuild": "^0.17.11",
"carstream": "^1.0.2",
"esbuild": "^0.18.13",
"files-from-path": "^0.2.6",
"ipfs-car": "^0.9.2",
"miniflare": "^2.14.0",
Expand Down
3 changes: 2 additions & 1 deletion scripts/r2-put.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import { R2Bucket } from '@miniflare/r2'
import { FileStorage } from '@miniflare/storage-file'
import { getFilesFromPath } from 'files-from-path'
import { Builder } from '../test/helpers.js'
import { Builder } from '../test/helpers/builder.js'

const bucketNames = ['CARPARK', 'SATNAV', 'DUDEWHERE']
const buckets = bucketNames.map(b => new R2Bucket(new FileStorage(`./.mf/r2/${b}`)))
// @ts-expect-error import('@miniflare/r2').R2Bucket does not satisfy import('@cloudflare/workers-types').R2Bucket interface 🙈
const builder = new Builder(buckets[0], buckets[1], buckets[2])

const paths = process.argv.slice(2).filter(p => p !== '--no-wrap')
Expand Down
2 changes: 1 addition & 1 deletion src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export interface Environment {
CARPARK: R2Bucket
DUDEWHERE: R2Bucket
SATNAV: R2Bucket
BLOCKLY: KVNamespace
MAX_SHARDS: string
CONTENT_CLAIMS_SERVICE_URL?: string
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { OrderedCarBlockBatcher } from './block-batch.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('cardex/api').IndexItem} IndexEntry
* @typedef {string} MultihashString
* @typedef {import('dagula').Block} Block
* @typedef {import('@cloudflare/workers-types').R2Bucket} R2Bucket
*/
Expand All @@ -22,7 +20,7 @@ const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61
export class R2Blockstore {
/**
* @param {R2Bucket} dataBucket
* @param {import('./car-index').CarIndex} index
* @param {import('./dag-index/api.js').Index} index
*/
constructor (dataBucket, index) {
this._dataBucket = dataBucket
Expand Down
11 changes: 11 additions & 0 deletions src/lib/dag-index/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { UnknownLink } from 'multiformats/link'
import { MultihashIndexItem } from 'cardex/multihash-index-sorted/api'
import { CARLink } from 'cardex/api'

export interface IndexEntry extends MultihashIndexItem {
origin: CARLink
}

export interface Index {
get (c: UnknownLink): Promise<IndexEntry|undefined>
}
90 changes: 7 additions & 83 deletions src/lib/car-index.js → src/lib/dag-index/car.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import * as raw from 'multiformats/codecs/raw'
import { base58btc } from 'multiformats/bases/base58'
import { UniversalReader } from 'cardex/universal'
import { MultiIndexReader } from 'cardex/multi-index'
import defer from 'p-defer'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('cardex/multi-index/api').MultiIndexItem & import('cardex/multihash-index-sorted/api').MultihashIndexItem} IndexEntry
* @typedef {import('./api').IndexEntry} IndexEntry
* @typedef {import('multiformats').ToString<import('multiformats').MultihashDigest, 'z'>} MultihashString
* @typedef {{ get: (c: UnknownLink) => Promise<IndexEntry|undefined> }} CarIndex
* @typedef {import('./api').Index} Index
*/

export class MultiCarIndex {
/** @type {CarIndex[]} */
/** @type {Index[]} */
#idxs

constructor () {
this.#idxs = []
}

/**
* @param {CarIndex} index
* @param {Index} index
*/
addIndex (index) {
this.#idxs.push(index)
Expand Down Expand Up @@ -52,10 +50,10 @@ export class MultiCarIndex {
}

/**
* @implements {CarIndex}
* @implements {Index}
*/
export class StreamingCarIndex {
/** @type {import('../bindings').IndexSource} */
/** @type {import('../../bindings').IndexSource} */
#source

/** @type {Map<MultihashString, IndexEntry>} */
Expand All @@ -70,7 +68,7 @@ export class StreamingCarIndex {
/** @type {Error?} */
#buildError = null

/** @param {import('../bindings').IndexSource} source */
/** @param {import('../../bindings').IndexSource} source */
constructor (source) {
this.#source = source
this.#buildIndex()
Expand Down Expand Up @@ -154,77 +152,3 @@ export class StreamingCarIndex {
* @returns {import('multiformats').ToString<import('multiformats').MultihashDigest, 'z'>}
*/
const mhToString = mh => base58btc.encode(mh.bytes)

export class BlocklyIndex {
/** Storage where indexes live. */
#store
/** Cached index entries. */
#cache
/** Indexes that have been read. */
#indexes

/**
* @param {import('@cloudflare/workers-types').KVNamespace} indexStore
*/
constructor (indexStore) {
this.#store = indexStore
/** @type {Map<MultihashString, IndexEntry>} */
this.#cache = new Map()
this.#indexes = new Set()
}

/** @param {UnknownLink} cid */
async get (cid) {
const key = mhToString(cid.multihash)

// get the index data for this CID (CAR CID & offset)
let indexItem = this.#cache.get(key)

// read the index for _this_ CID to get the index data for it's _links_.
//
// when we get to the bottom of the tree (raw blocks), we want to be able
// to send back the index information without having to read an index for
// each leaf. We can only do that if we read the index for the parent now.
if (indexItem) {
// we found the index data! ...if this CID is raw, then there's no links
// and no more index information to discover so don't read the index.
if (cid.code !== raw.code) {
await this.#readIndex(cid)
}
} else {
// we not found the index data! ...probably the DAG root.
// this time we read the index to get the root block index information
// _as well as_ the link index information.
await this.#readIndex(cid)
// seeing as we just read the index for this CID we _should_ have some
// index information for it now.
indexItem = this.#cache.get(key)
// if not then, well, it's not found!
if (!indexItem) return
}
return { cid, ...indexItem }
}

/**
* Read the index for the passed CID and populate the cache.
* @param {import('multiformats').UnknownLink} cid
*/
async #readIndex (cid) {
const key = mhToString(cid.multihash)
if (this.#indexes.has(key)) return

const res = await this.#store.get(`${key}/.idx`, { type: 'stream' })
if (!res) return

const reader = MultiIndexReader.createReader({ reader: res.getReader() })
while (true) {
const { done, value } = await reader.read()
if (done) break

if (!('multihash' in value)) throw new Error('not MultihashIndexSorted')
const entry = /** @type {IndexEntry} */(value)
this.#cache.set(mhToString(entry.multihash), entry)
}
this.#indexes.add(key)
}
}
142 changes: 142 additions & 0 deletions src/lib/dag-index/content-claims.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/* global ReadableStream */
import * as Link from 'multiformats/link'
import * as raw from 'multiformats/codecs/raw'
import * as Claims from '@web3-storage/content-claims/client'
import { MultihashIndexSortedReader } from 'cardex/multihash-index-sorted'
import { Map as LinkMap } from 'lnmap'
import { CAR_CODE } from '../../constants'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('./api').IndexEntry} IndexEntry
* @typedef {import('./api').Index} Index
*/

/** @implements {Index} */
export class ContentClaimsIndex {
/**
* Cached index entries.
* @type {Map<UnknownLink, IndexEntry>}
*/
#cache
/**
* CIDs for which we have already fetched claims.
*
* Note: _only_ the CIDs which have been explicitly queried, for which we
* have made a content claim request. Not using `this.#cache` because reading
* a claim may cause us to add other CIDs to the cache that we haven't read
* claims for.
*
* Note: implemented as a Map not a Set so that we take advantage of the
* key cache that `lnmap` provides, so we don't duplicate base58 encoded
* multihash keys.
* @type {Map<UnknownLink, true>}
*/
#claimFetched
/**
* @type {URL|undefined}
*/
#serviceURL

/**
* @param {{ serviceURL?: URL }} [options]
*/
constructor (options) {
this.#cache = new LinkMap()
this.#claimFetched = new LinkMap()
this.#serviceURL = options?.serviceURL
}

/**
* @param {UnknownLink} cid
* @returns {Promise<IndexEntry | undefined>}
*/
async get (cid) {
// get the index data for this CID (CAR CID & offset)
let indexItem = this.#cache.get(cid)

// read the index for _this_ CID to get the index data for it's _links_.
//
// when we get to the bottom of the tree (raw blocks), we want to be able
// to send back the index information without having to read claims for
// each leaf. We can only do that if we read the claims for the parent now.
if (indexItem) {
// we found the index data! ...if this CID is raw, then there's no links
// and no more index information to discover so don't read claims.
if (cid.code !== raw.code) {
await this.#readClaims(cid)
}
} else {
// we not found the index data!
await this.#readClaims(cid)
// seeing as we just read the index for this CID we _should_ have some
// index information for it now.
indexItem = this.#cache.get(cid)
// if not then, well, it's not found!
if (!indexItem) return
}
return indexItem
}

/**
* Read claims for the passed CID and populate the cache.
* @param {import('multiformats').UnknownLink} cid
*/
async #readClaims (cid) {
if (this.#claimFetched.has(cid)) return

const claims = await Claims.read(cid, { serviceURL: this.#serviceURL })
for (const claim of claims) {
// skip anything that is not a relation claim, since we know by
// our naming convention that our CAR files are named after their hash
// and we don't serve anything that we don't have in our own bucket.
if (claim.type !== 'assert/relation') continue

// export the blocks from the claim - should include the CARv2 indexes
const blocks = [...claim.export()]

// each part is a tuple of CAR CID (content) & CARv2 index CID (includes)
for (const { content, includes } of claim.parts) {
if (!isCARLink(content)) continue

const block = blocks.find(b => b.cid.toString() === includes.toString())
if (!block) continue

const entries = await decodeIndex(content, block.bytes)
for (const entry of entries) {
this.#cache.set(Link.create(raw.code, entry.multihash), entry)
}
}
}
this.#claimFetched.set(cid, true)
}
}

/**
* @param {import('multiformats').Link} cid
* @returns {cid is import('cardex/api').CARLink}
*/
const isCARLink = cid => cid.code === CAR_CODE

/**
* Read a MultihashIndexSorted index for the passed origin CAR and return a
* list of IndexEntry.
* @param {import('cardex/api').CARLink} origin
* @param {Uint8Array} bytes
*/
const decodeIndex = async (origin, bytes) => {
const entries = []
const readable = new ReadableStream({
pull (controller) {
controller.enqueue(bytes)
controller.close()
}
})
const reader = MultihashIndexSortedReader.createReader({ reader: readable.getReader() })
while (true) {
const { done, value } = await reader.read()
if (done) break
entries.push(/** @type {IndexEntry} */({ origin, ...value }))
}
return entries
}
9 changes: 6 additions & 3 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { CarReader } from '@ipld/car'
import { parseCid, HttpError, toIterable } from '@web3-storage/gateway-lib/util'
import { BatchingR2Blockstore } from './lib/blockstore.js'
import { version } from '../package.json'
import { BlocklyIndex, MultiCarIndex, StreamingCarIndex } from './lib/car-index.js'
import { ContentClaimsIndex } from './lib/dag-index/content-claims.js'
import { MultiCarIndex, StreamingCarIndex } from './lib/dag-index/car.js'
import { CachingBucket, asSimpleBucket } from './lib/bucket.js'
import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js'

Expand Down Expand Up @@ -96,7 +97,7 @@ export function withIndexSources (handler) {
}))

if (indexSources.length > maxShards) {
console.warn('exceeds maximum DAG shards') // fallback to blockly
console.warn('exceeds maximum DAG shards') // fallback to content claims
indexSources = []
}

Expand Down Expand Up @@ -142,7 +143,9 @@ export function withDagula (handler) {
blockstore = new BatchingR2Blockstore(env.CARPARK, index)
}
} else {
const index = new BlocklyIndex(env.BLOCKLY)
const index = new ContentClaimsIndex({
serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined
})
const found = await index.get(dataCid)
if (!found) throw new HttpError('missing index', { status: 404 })
blockstore = new BatchingR2Blockstore(env.CARPARK, index)
Expand Down
Loading

0 comments on commit 28ca299

Please sign in to comment.