Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support for multiple hash types #9

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ import sade from 'sade'
import Conf from 'conf'
import { CID } from 'multiformats/cid'
import { Block } from 'multiformats/block'
import { sha256 } from 'multiformats/hashes/sha2'
import { blake2b256 } from '@multiformats/blake2/blake2b'
import { Readable } from 'stream'
import { pipeline } from 'stream/promises'
import { CarWriter } from '@ipld/car'
import { TimeoutController } from 'timeout-abort-controller'
import { Dagula } from './index.js'
import { getLibp2p } from './p2p.js'
import { getLibp2p, fromNetwork } from './p2p.js'
import archy from 'archy'

const pkg = JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url)).toString())
const TIMEOUT = 10_000

// Includes Filecoin native blake2b hasher in CLI
const hashers = {
[sha256.code]: sha256,
[blake2b256.code]: blake2b256
}

const config = new Conf({
projectName: 'dagula',
projectVersion: pkg.version,
Expand All @@ -42,7 +49,7 @@ cli.command('block get <cid>')
.action(async (cid, { peer, timeout }) => {
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })
const dagula = await fromNetwork(libp2p, { peer, hashers })
try {
const block = await dagula.getBlock(cid, { signal: controller.signal })
process.stdout.write(block.bytes)
Expand All @@ -60,7 +67,7 @@ cli.command('get <cid>')
cid = CID.parse(cid)
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })
const dagula = await fromNetwork(libp2p, { peer, hashers })
const { writer, out } = CarWriter.create(cid)
try {
let error
Expand Down Expand Up @@ -91,7 +98,7 @@ cli.command('unixfs get <path>')
.action(async (path, { peer, timeout }) => {
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })
const dagula = await fromNetwork(libp2p, { peer, hashers })
try {
const entry = await dagula.getUnixfs(path, { signal: controller.signal })
if (entry.type === 'directory') throw new Error(`${path} is a directory`)
Expand All @@ -118,7 +125,7 @@ cli.command('ls <cid>')
cid = CID.parse(cid)
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })
const dagula = await fromNetwork(libp2p, { peer, hashers })
try {
for await (const block of dagula.get(cid, { signal: controller.signal })) {
controller.reset()
Expand All @@ -138,7 +145,7 @@ cli.command('tree <cid>')
cid = CID.parse(cid)
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })
const dagula = await fromNetwork(libp2p, { peer, hashers })
// build up the tree, starting with the root
/** @type {archy.Data} */
const root = { label: cid.toString(), nodes: [] }
Expand Down
36 changes: 30 additions & 6 deletions bitswap-fetcher.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import defer from 'p-defer'
import { pipe } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import { sha256 } from 'multiformats/hashes/sha2'
import { base58btc } from 'multiformats/bases/base58'
import debug from 'debug'
import { Entry, Message, BlockPresenceType } from './message.js'
import * as Prefix from './prefix.js'
import { Hashers } from './defaults.js'

/** @typedef {import('./index').Block} Block */

Expand All @@ -21,12 +22,16 @@ export class BitswapFetcher {
#wantlist = []
/** @type {number} */
#outstandingWants = 0
/** @type {import('./index').MultihashHashers} */
#hashers

/**
* @param {() => Promise<import('@libp2p/interface-connection').Stream>} newStream
* @param {{ hashers?: import('./index').MultihashHashers }} [options]
*/
constructor (newStream) {
constructor (newStream, options = {}) {
this.#newStream = newStream
this.#hashers = options.hashers || Hashers
this.handler = this.handler.bind(this)
}

Expand Down Expand Up @@ -86,6 +91,11 @@ export class BitswapFetcher {
throw options.signal.reason || abortError()
}

// ensure we can hash the data when we receive the block
if (!this.#hashers[cid.multihash.code]) {
throw new Error(`missing hasher: ${cid.multihash.code} for wanted block: ${cid}`)
}

const key = base58btc.encode(cid.multihash.bytes)
const keyWants = this.#wants.get(key)
/** @type {import('p-defer').DeferredPromise<Block | undefined>} */
Expand Down Expand Up @@ -124,11 +134,22 @@ export class BitswapFetcher {
for await (const data of source) {
const message = Message.decode(data.subarray())
log('incoming message with %d blocks and %d presences', message.blocks.length, message.blockPresences.length)
for (const { data } of message.blocks) {
const hash = await sha256.digest(data)
for (const { prefix: prefixBytes, data } of message.blocks) {
const prefix = Prefix.decode(prefixBytes)
const hasher = this.#hashers[prefix.multihash.code]
if (!hasher) {
// hasher presence for a wanted block has been checked before
// request so this must have been sent in error
log('missing hasher %s', prefix.multihash.code)
continue
}
const hash = await hasher.digest(data)
const key = base58btc.encode(hash.bytes)
const keyWants = this.#wants.get(key)
if (!keyWants) continue
if (!keyWants) {
log('got unwanted block %s', key)
continue
}
log('got block for wanted multihash %s', key)
this.#wants.delete(key)
this.#outstandingWants--
Expand All @@ -140,7 +161,10 @@ export class BitswapFetcher {
if (presence.type !== BlockPresenceType.DontHave) continue
const key = base58btc.encode(presence.cid.multihash.bytes)
const keyWants = this.#wants.get(key)
if (!keyWants) continue
if (!keyWants) {
log('got unwanted block presence: %s', key)
continue
}
log('don\'t have wanted multihash %s', key)
this.#wants.delete(key)
this.#outstandingWants--
Expand Down
18 changes: 18 additions & 0 deletions defaults.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import * as raw from 'multiformats/codecs/raw'
import * as dagPb from '@ipld/dag-pb'
import * as dagCbor from '@ipld/dag-cbor'
import * as dagJson from '@ipld/dag-json'
import { sha256 } from 'multiformats/hashes/sha2'

/** @type {import('./index').BlockDecoders} */
export const Decoders = {
[raw.code]: raw,
[dagPb.code]: dagPb,
[dagCbor.code]: dagCbor,
[dagJson.code]: dagJson
}

/** @type {import('./index').MultihashHashers} */
export const Hashers = {
[sha256.code]: sha256
}
15 changes: 8 additions & 7 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { BlockDecoder } from 'multiformats/codecs/interface'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { CID } from 'multiformats'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand All @@ -11,6 +12,10 @@ export interface BlockDecoders {
[code: number]: BlockDecoder<any, any>
}

export interface MultihashHashers {
[code: number]: MultihashHasher<any>
}

export interface Block {
cid: CID
bytes: Uint8Array
Expand Down Expand Up @@ -41,11 +46,11 @@ export interface IDagula {
/**
* Emit nodes for all path segements and get UnixFS files and directories
*/
walkUnixfsPath (path: CID|string, options?: AbortOptions): Promise<UnixFSEntry>
walkUnixfsPath (path: CID|string, options?: AbortOptions): AsyncIterableIterator<UnixFSEntry>
}

export declare class Dagula implements IDagula {
constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders })
constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers })
/**
* Get a complete DAG.
*/
Expand All @@ -61,9 +66,5 @@ export declare class Dagula implements IDagula {
/**
* Emit nodes for all path segements and get UnixFS files and directories
*/
walkUnixfsPath (path: CID|string, options?: AbortOptions): Promise<UnixFSEntry>
/**
* Create a new Dagula instance from the passed libp2p Network interface.
*/
static fromNetwork (network: Network, options?: { decoders?: BlockDecoders, peer?: Multiaddr|string }): Promise<Dagula>
walkUnixfsPath (path: CID|string, options?: AbortOptions): AsyncIterableIterator<UnixFSEntry>
}
85 changes: 35 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,68 +1,31 @@
import { multiaddr } from '@multiformats/multiaddr'
import debug from 'debug'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import * as dagPb from '@ipld/dag-pb'
import * as dagCbor from '@ipld/dag-cbor'
import * as dagJson from '@ipld/dag-json'
import * as Block from 'multiformats/block'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import { exporter, walkPath } from 'ipfs-unixfs-exporter'
import { transform } from 'streaming-iterables'
import { BitswapFetcher } from './bitswap-fetcher.js'

/**
* @typedef {import('./index').Blockstore} Blockstore
* @typedef {import('./index').BlockDecoders} BlockDecoders
* @typedef {import('./index').Block} Block
*/

const BITSWAP_PROTOCOL = '/ipfs/bitswap/1.2.0'
const DEFAULT_PEER = multiaddr('/dns4/elastic.dag.house/tcp/443/wss/p2p/bafzbeibhqavlasjc7dvbiopygwncnrtvjd2xmryk5laib7zyjor6kf3avm')
import { Decoders, Hashers } from './defaults.js'

const log = debug('dagula')

/** @type {BlockDecoders} */
const Decoders = {
[raw.code]: raw,
[dagPb.code]: dagPb,
[dagCbor.code]: dagCbor,
[dagJson.code]: dagJson
}

export class Dagula {
/** @type {Blockstore} */
/** @type {import('./index').Blockstore} */
#blockstore

/** @type {BlockDecoders} */
/** @type {import('./index').BlockDecoders} */
#decoders
/** @type {import('./index').MultihashHashers} */
#hashers

/**
* @param {Blockstore} blockstore
* @param {{ decoders?: BlockDecoders }} [options]
* @param {import('./index').Blockstore} blockstore
* @param {{
* decoders?: import('./index').BlockDecoders,
* hashers?: import('./index').MultihashHashers
* }} [options]
*/
constructor (blockstore, options = {}) {
this.#blockstore = blockstore
this.#decoders = options.decoders || Decoders
}

/**
* @param {import('./index').Network} network
* @param {{ decoders?: BlockDecoders, peer?: import('@multiformats/multiaddr').Multiaddr }} [options]
*/
static async fromNetwork (network, options = {}) {
const peer = (typeof options.peer === 'string' ? multiaddr(options.peer) : options.peer) || DEFAULT_PEER
const bitswap = new BitswapFetcher(async () => {
log('new stream to %s', peer)
// @ts-ignore
const stream = await network.dialProtocol(peer, BITSWAP_PROTOCOL, { lazy: true })
return stream
})

// incoming blocks
await network.handle(BITSWAP_PROTOCOL, bitswap.handler)

return new Dagula(bitswap, options)
this.#hashers = options.hashers || Hashers
}

/**
Expand All @@ -72,11 +35,24 @@ export class Dagula {
async * get (cid, options = {}) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('getting DAG %s', cid)

/** @type {AbortController[]} */
let aborters = []
const { signal } = options
signal?.addEventListener('abort', () => aborters.forEach(a => a.abort(signal.reason)))

let cids = [cid]
while (true) {
log('fetching %d CIDs', cids.length)
const fetchBlocks = transform(cids.length, async cid => {
return this.getBlock(cid, { signal: options.signal })
if (signal) {
const aborter = new AbortController()
aborters.push(aborter)
const block = await this.getBlock(cid, { signal: aborter.signal })
aborters = aborters.filter(a => a !== aborter)
return block
}
return this.getBlock(cid)
})
const nextCids = []
for await (const { cid, bytes } of fetchBlocks(cids)) {
Expand All @@ -85,8 +61,16 @@ export class Dagula {
yield { cid, bytes }
throw new Error(`unknown codec: ${cid.code}`)
}
const hasher = this.#hashers[cid.multihash.code]
if (!hasher) {
yield { cid, bytes }
throw new Error(`unknown multihash codec: ${cid.multihash.code}`)
}
log('decoding block %s', cid)
const block = await Block.decode({ bytes, codec: decoder, hasher })
// bitswap-fetcher _must_ verify hashes on receipt of a block, but we
// cannot guarantee the blockstore passed is a bitswap so cannot use
// createUnsafe here.
const block = await Block.create({ bytes, cid, codec: decoder, hasher })
yield block
for (const [, cid] of block.links()) {
nextCids.push(cid)
Expand Down Expand Up @@ -149,6 +133,7 @@ export class Dagula {
}
}

// @ts-ignore exporter requires Blockstore but only uses `get`
yield * walkPath(path, blockstore, { signal: options.signal })
}
}
8 changes: 2 additions & 6 deletions message.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { CID } from 'multiformats/cid'
import * as Prefix from './prefix.js'
import * as gen from './gen/message.js'

const MAX_PRIORITY = Math.pow(2, 31) - 1
Expand Down Expand Up @@ -129,12 +130,7 @@ export class Block {
*/
constructor (prefixOrCid, data) {
if (prefixOrCid instanceof CID) {
prefixOrCid = new Uint8Array([
prefixOrCid.version,
prefixOrCid.code,
prefixOrCid.multihash.bytes[0],
prefixOrCid.multihash.bytes[1]
])
prefixOrCid = Prefix.encode(prefixOrCid)
}

this.prefix = prefixOrCid
Expand Down
9 changes: 8 additions & 1 deletion p2p.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { Libp2p } from 'libp2p'
import type { Libp2p } from 'libp2p'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Network, BlockDecoders, IDagula, MultihashHashers } from './index'

/**
* Create and start a default p2p networking stack (libp2p) with generated peer ID.
*/
export declare function getLibp2p (): Promise<Libp2p>

/**
* Create a new Dagula instance from the passed libp2p Network interface.
*/
export declare function fromNetwork (network: Network, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers, peer?: Multiaddr|string }): Promise<IDagula>
Loading