Skip to content

Commit 85fd4cb

Browse files
author
Alan Shaw
authored
feat: upgrade cardex (#45)
Upgrade to the new version of cardex, which has a web stream-y API similar to https://github.com/ipld/js-unixfs.
1 parent 5aefdd7 commit 85fd4cb

File tree

5 files changed

+34
-30
lines changed

5 files changed

+34
-30
lines changed

package-lock.json

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
"@ipld/dag-json": "^10.0.1",
2929
"@ipld/dag-pb": "^4.0.2",
3030
"@web3-storage/gateway-lib": "^3.1.1",
31-
"cardex": "^1.0.1",
31+
"cardex": "^2.1.0",
3232
"chardet": "^1.5.0",
3333
"dagula": "^7.0.0",
3434
"magic-bytes.js": "^1.0.12",

src/lib/blockstore.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
22
import { base58btc } from 'multiformats/bases/base58'
33
import defer from 'p-defer'
4-
import { toIterable } from '@web3-storage/gateway-lib/util'
54
import { MultiCarIndex, StreamingCarIndex } from './car-index.js'
65
import { OrderedCarBlockBatcher } from './block-batch.js'
76

87
/**
98
* @typedef {import('multiformats').CID} CID
10-
* @typedef {import('cardex/mh-index-sorted').IndexEntry} IndexEntry
9+
* @typedef {import('cardex/api').IndexItem} IndexEntry
1110
* @typedef {string} MultihashString
1211
* @typedef {import('dagula').Block} Block
1312
* @typedef {import('../bindings').R2Bucket} R2Bucket
@@ -31,14 +30,14 @@ export class R2Blockstore {
3130
this._dataBucket = dataBucket
3231
this._idx = new MultiCarIndex()
3332
for (const carCid of carCids) {
34-
this._idx.addIndex(carCid, new StreamingCarIndex((async function * () {
33+
this._idx.addIndex(carCid, new StreamingCarIndex(async () => {
3534
const idxPath = `${carCid}/${carCid}.car.idx`
3635
const idxObj = await indexBucket.get(idxPath)
3736
if (!idxObj) {
3837
throw Object.assign(new Error(`index not found: ${carCid}`), { code: 'ERR_MISSING_INDEX' })
3938
}
40-
yield * toIterable(idxObj.body)
41-
})()))
39+
return idxObj.body
40+
}))
4241
}
4342
}
4443

src/lib/car-index.js

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { base58btc } from 'multiformats/bases/base58'
2-
import { MultihashIndexSortedReader } from 'cardex'
2+
import { UniversalReader } from 'cardex/universal'
33
import defer from 'p-defer'
44

55
/**
66
* @typedef {import('multiformats').CID} CID
7-
* @typedef {import('cardex/mh-index-sorted').IndexEntry} IndexEntry
7+
* @typedef {import('cardex/multihash-index-sorted/api').MultihashIndexItem} IndexEntry
88
* @typedef {string} MultihashString
99
* @typedef {{ get: (c: CID) => Promise<IndexEntry|undefined> }} CarIndex
1010
*/
@@ -67,19 +67,22 @@ export class StreamingCarIndex {
6767
/** @type {Error?} */
6868
#buildError = null
6969

70-
/** @param {AsyncIterable<Uint8Array>} stream */
71-
constructor (stream) {
72-
this.#buildIndex(stream)
70+
/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
71+
constructor (fetchIndex) {
72+
this.#buildIndex(fetchIndex)
7373
}
7474

75-
/** @param {AsyncIterable<Uint8Array>} stream */
76-
async #buildIndex (stream) {
77-
console.log('building index')
75+
/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
76+
async #buildIndex (fetchIndex) {
7877
this.#building = true
7978
try {
80-
const idxReader = MultihashIndexSortedReader.fromIterable(stream)
81-
for await (const entry of idxReader.entries()) {
82-
if (!entry.multihash) throw new Error('missing entry multihash')
79+
const stream = await fetchIndex()
80+
const idxReader = UniversalReader.createReader({ reader: stream.getReader() })
81+
while (true) {
82+
const { done, value } = await idxReader.read()
83+
if (done) break
84+
85+
const entry = /** @type {IndexEntry} */(value)
8386
const key = mhToKey(entry.multihash.bytes)
8487

8588
// set this value in the index so any future requests for this key get
@@ -100,7 +103,7 @@ export class StreamingCarIndex {
100103

101104
// signal we are done building the index
102105
this.#building = false
103-
console.log('finished building index')
106+
104107
// resolve any keys in the promised index as "not found" - we're done
105108
// building so they will not get resolved otherwise.
106109
for (const [key, promises] of this.#promisedIdx.entries()) {

test/helpers.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* global TransformStream */
12
import { pack } from 'ipfs-car/pack'
23
import { CID } from 'multiformats/cid'
34
import { sha256 } from 'multiformats/hashes/sha2'
@@ -40,14 +41,15 @@ export class Builder {
4041
*/
4142
async #writeIndex (cid, bytes) {
4243
const indexer = await CarIndexer.fromBytes(bytes)
43-
const { writer, out: index } = MultihashIndexSortedWriter.create()
44+
const { readable, writable } = new TransformStream()
45+
const writer = MultihashIndexSortedWriter.createWriter({ writer: writable.getWriter() })
4446

4547
for await (const entry of indexer) {
46-
writer.put(entry)
48+
writer.add(entry.cid, entry.offset)
4749
}
4850
writer.close()
49-
const indexBytes = concat(await collect(index))
50-
await this.#satnav.put(`${cid}/${cid}.car.idx`, indexBytes)
51+
// @ts-expect-error node web stream is not web stream
52+
await this.#satnav.put(`${cid}/${cid}.car.idx`, readable)
5153
}
5254

5355
/**

0 commit comments

Comments
 (0)