diff --git a/bin/bcoin b/bin/bcoin index 840958d7b..56842efa5 100755 --- a/bin/bcoin +++ b/bin/bcoin @@ -43,6 +43,9 @@ for arg in "$@"; do --daemon) daemon=1 ;; + --neutrino) + cmd='neutrino' + ;; --spv) cmd='spvnode' ;; diff --git a/bin/neutrino b/bin/neutrino new file mode 100755 index 000000000..516f4207f --- /dev/null +++ b/bin/neutrino @@ -0,0 +1,42 @@ +#!/usr/bin/env node + +'use strict'; + +console.log('Starting bcoin'); +process.title = 'bcoin'; +const Neutrino = require('../lib/node/neutrino'); + +const node = new Neutrino({ + file: true, + argv: true, + env: true, + logFile: true, + logConsole: true, + logLevel: 'debug', + db: 'leveldb', + memory: false, + workers: true, + loader: require +}); + +(async () => { + await node.ensure(); + await node.open(); + await node.connect(); + node.startSync(); + + node.on('full', () => { + console.log('Full node'); + }); +})().catch((err) => { + console.error(err.stack); + process.exit(1); +}); + +process.on('unhandledRejection', (err, promise) => { + throw err; +}); + +process.on('SIGINT', async () => { + await node.close(); +}); diff --git a/lib/bcoin-browser.js b/lib/bcoin-browser.js index 1f7254be8..8b2d46cb5 100644 --- a/lib/bcoin-browser.js +++ b/lib/bcoin-browser.js @@ -89,6 +89,7 @@ bcoin.node = require('./node'); bcoin.Node = require('./node/node'); bcoin.FullNode = require('./node/fullnode'); bcoin.SPVNode = require('./node/spvnode'); +bcoin.Neutrino = require('./node/neutrino'); // Primitives bcoin.primitives = require('./primitives'); diff --git a/lib/bcoin.js b/lib/bcoin.js index 3e795f6f6..d8ae7e565 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -108,7 +108,7 @@ bcoin.define('node', './node'); bcoin.define('Node', './node/node'); bcoin.define('FullNode', './node/fullnode'); bcoin.define('SPVNode', './node/spvnode'); - +bcoin.define('Neutrino', './node/neutrino'); // Primitives bcoin.define('primitives', './primitives'); bcoin.define('Address', './primitives/address'); diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 38201fe8e..ef55aeea4 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -23,6 +23,8 @@ const ChainEntry = require('./chainentry'); const CoinView = require('../coins/coinview'); const Script = require('../script/script'); const {VerifyError} = require('../protocol/errors'); +const {filters} = require('../blockstore/common'); +const {filtersByVal} = require('../net/common'); const thresholdStates = common.thresholdStates; /** diff --git a/lib/blockchain/chaindb.js b/lib/blockchain/chaindb.js index cb91accaa..a49cc75b2 100644 --- a/lib/blockchain/chaindb.js +++ b/lib/blockchain/chaindb.js @@ -46,6 +46,7 @@ class ChainDB { this.state = new ChainState(); this.pending = null; this.current = null; + this.neutrinoState = null; this.cacheHash = new LRU(this.options.entryCache, null, BufferMap); this.cacheHeight = new LRU(this.options.entryCache); @@ -90,6 +91,11 @@ class ChainDB { this.logger.info('ChainDB successfully initialized.'); } + if (this.options.neutrino) { + if (!this.neutrinoState) + this.neutrinoState = await this.getNeutrinoState(); + } + this.logger.info( 'Chain State: hash=%h tx=%d coin=%d value=%s.', this.state.tip, @@ -1670,6 +1676,29 @@ class ChainDB { b.put(layout.O.encode(), flags.toRaw()); return b.write(); } + + /** + * Get Neutrino State + * @returns {Promise} - Returns neutrino state + */ + + async getNeutrinoState() { + const data = await this.db.get(layout.N.encode()); + if (!data) + return new NeutrinoState(); + return NeutrinoState.fromRaw(data); + } + + /** + * Save Neutrino State + * @returns {void} + */ + async saveNeutrinoState() { + const state = this.neutrinoState.toRaw(); + const b = this.db.batch(); + b.put(layout.N.encode(), state); + return b.write(); + } } /** @@ -1952,6 +1981,28 @@ function fromU32(num) { return data; } +class NeutrinoState { + constructor() { // TODO: do we add support for multiple filters? + this.headerHeight = 0; + this.filterHeight = 0; + } + + toRaw() { + const bw = bio.write(8); + bw.writeU32(this.headerHeight); + bw.writeU32(this.filterHeight); + return bw.render(); + } + + static fromRaw(data) { + const state = new NeutrinoState(); + const br = bio.read(data); + state.headersHeight = br.readU32(); + state.filterHeight = br.readU32(); + return state; + } +} + /* * Expose */ diff --git a/lib/blockchain/layout.js b/lib/blockchain/layout.js index 337f95900..2877c7d82 100644 --- a/lib/blockchain/layout.js +++ b/lib/blockchain/layout.js @@ -14,6 +14,8 @@ const bdb = require('bdb'); * O -> chain options * R -> tip hash * D -> versionbits deployments + * N -> Neutrino Status + * F[hash] -> filterHeader * e[hash] -> entry * h[hash] -> height * H[height] -> hash @@ -33,6 +35,8 @@ const layout = { O: bdb.key('O'), R: bdb.key('R'), D: bdb.key('D'), + N: bdb.key('N'), + F: bdb.key('H', ['hash256']), e: bdb.key('e', ['hash256']), h: bdb.key('h', ['hash256']), H: bdb.key('H', ['uint32']), diff --git a/lib/indexer/filterindexer.js b/lib/indexer/filterindexer.js index 97265253b..ce25df7e6 100644 --- a/lib/indexer/filterindexer.js +++ b/lib/indexer/filterindexer.js @@ -85,6 +85,48 @@ class FilterIndexer extends Indexer { this.put(layout.f.encode(hash), gcsFilter.hash()); } + /** + * save filter header + * @param {Hash} blockHash + * @param {Hash} filterHeader + * @param {Hash} filterHash + * @returns {Promise} + */ + + async saveFilterHeader(blockHash, filterHeader, filterHash) { + assert(blockHash); + assert(filterHeader); + assert(filterHash); + + const filter = new Filter(); + filter.header = filterHeader; + + await this.blocks.writeFilter(blockHash, filter.toRaw(), this.filterType); + console.log(layout.f.encode(blockHash)); + this.put(layout.f.encode(blockHash), filterHash); + } + + /** + * Save filter + * @param {Hash} blockHash + * @param {BasicFilter} basicFilter + * @param {Hash} filterHeader + * @returns {Promise} + */ + + async saveFilter(blockHash, basicFilter, filterHeader) { + assert(blockHash); + assert(basicFilter); + assert(filterHeader); + + const filter = new Filter(); + filter.filter = basicFilter.toRaw(); + filter.header = filterHeader; + + await this.blocks.writeFilter(blockHash, filter.toRaw(), this.filterType); + this.put(layout.f.encode(blockHash), basicFilter.hash()); + } + /** * Prune compact filters. * @private diff --git a/lib/indexer/indexer.js b/lib/indexer/indexer.js index 97d85f76b..68f8487f9 100644 --- a/lib/indexer/indexer.js +++ b/lib/indexer/indexer.js @@ -76,6 +76,7 @@ class Indexer extends EventEmitter { */ put(key, value) { + console.log('put', key, value.toString('hex')); this.batch.put(key, value); } diff --git a/lib/net/peer.js b/lib/net/peer.js index dac2e265d..154c83e52 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -1009,6 +1009,12 @@ class Peer extends EventEmitter { case packetTypes.GETHEADERS: this.request(packetTypes.HEADERS, timeout * 2); break; + case packetTypes.GETCFHEADERS: + this.request(packetTypes.CFHEADERS, timeout); + break; + case packetTypes.GETCFILTERS: + this.request(packetTypes.CFILTER, timeout); + break; case packetTypes.GETDATA: this.request(packetTypes.DATA, timeout * 2); break; @@ -1751,6 +1757,26 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * @param {Number} filterType - `0` = basic + * @param {Number} startHeight - Height to start at. + * @param {Hash} stopHash - Hash to stop at. + * @returns {void} + * @description Send `getcfilters` to peer. + */ + sendGetCFilters(filterType, startHeight, stopHash) { + const packet = new packets.GetCFiltersPacket( + filterType, + startHeight, + stopHash); + + this.logger.debug( + 'Sending getcfilters (type=%d, startHeight=%d, stopHash=%h).', + filterType, startHeight, stopHash); + + this.send(packet); + } + /** * Send `cfheaders` to peer. * @param {Number} filterType @@ -1773,6 +1799,27 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * @param {Number} filterType + * @param {Number} startHeight + * @param {Hash} stopHash + * @returns {void} + * @description Send `getcfheaders` to peer. + */ + + sendGetCFHeaders(filterType, startHeight, stopHash) { + const packet = new packets.GetCFHeadersPacket( + filterType, + startHeight, + stopHash); + + this.logger.debug( + 'Sending getcfheaders (type=%d, start=%h, stop=%h).', + filterType, startHeight, stopHash); + + this.send(packet); + } + /** * send `cfcheckpt` to peer. * @param {Number} filterType @@ -1793,6 +1840,25 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * Send `getcfcheckpt` to peer. + * @param {Number} filterType + * @param {Hash} stopHash + * @returns {void} + */ + + sendGetCFCheckpt(filterType, stopHash) { + const packet = new packets.GetCFCheckptPacket( + filterType, + stopHash); + + this.logger.debug( + 'Sending getcfcheckpt (type=%d, stop=%h).', + filterType, stopHash); + + this.send(packet); + } + /** * Send `mempool` to peer. */ diff --git a/lib/net/pool.js b/lib/net/pool.js index 6af141ac9..2a1bd7d9e 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -35,6 +35,7 @@ const packetTypes = packets.types; const scores = HostList.scores; const {inspectSymbol} = require('../utils'); const {consensus} = require('../protocol'); +const BasicFilter = require('../golomb/basicFilter'); /** * Pool @@ -79,7 +80,7 @@ class Pool extends EventEmitter { this.pendingRefill = null; this.checkpoints = false; - this.neutrino = false; + this.neutrino = this.options.neutrino; this.headerChain = new List(); this.headerNext = null; this.headerTip = null; @@ -216,6 +217,8 @@ class Pool extends EventEmitter { const tip = this.chain.tip; if (this.options.neutrino) { this.headerChain.push(new HeaderEntry(tip.hash, tip.height)); + this.cfHeaderChain = new List(); + this.cfHeaderChain.push(new CFHeaderEntry(consensus.ZERO_HASH, 0)); return; } if (tip.height < this.network.lastCheckpoint) { @@ -711,6 +714,46 @@ class Pool extends EventEmitter { this.compactBlocks.clear(); } + /** + * Start the filters headers sync. + */ + + async startFilterHeadersSync() { + this.logger.info('Starting filter headers sync (%s).', + this.chain.options.network); + if (!this.opened || !this.connected) + return; + + const startHeight = this.chain.db.neutrinoState.headerHeight + 1; + const chainHeight = await this.chain.tip.height; + const stopHeight = chainHeight > 2000 ? 2000 : chainHeight; + const stopHash = await this.chain.getHash(stopHeight); + this.peers.load.sendGetCFHeaders( + common.FILTERS.BASIC, + startHeight, + stopHash); + } + + /** + * Start the filters sync. + */ + + async startFiltersSync() { + this.logger.info('Starting filter sync (%s).', + this.chain.options.network); + if (!this.opened || !this.connected) + return; + + const startHeight = this.chain.db.neutrinoState.filterHeight + 1; + const chainHeight = await this.chain.tip.height; + const stopHeight = chainHeight > 1000 ? 1000 : chainHeight; + const stopHash = await this.chain.getHash(stopHeight); + this.peers.load.sendGetCFilters( + common.FILTERS.BASIC, + startHeight, + stopHash); + } + /** * Start the headers sync using getHeaders messages. * @private @@ -849,6 +892,8 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); if (this.options.neutrino) { peer.sendGetHeaders(locator); + if (!this.syncing) + this.startFilterHeadersSync(); return true; } if (this.checkpoints) { @@ -1230,6 +1275,12 @@ class Pool extends EventEmitter { case packetTypes.GETCFCHECKPT: await this.handleGetCFCheckpt(peer, packet); break; + case packetTypes.CFHEADERS: + await this.handleCFHeaders(peer, packet); + break; + case packetTypes.CFILTER: + await this.handleCFilter(peer, packet); + break; case packetTypes.GETBLOCKS: await this.handleGetBlocks(peer, packet); break; @@ -1281,8 +1332,6 @@ class Pool extends EventEmitter { case packetTypes.BLOCKTXN: await this.handleBlockTxn(peer, packet); break; - case packetTypes.CFILTER: - case packetTypes.CFHEADERS: case packetTypes.CFCHECKPT: case packetTypes.UNKNOWN: await this.handleUnknown(peer, packet); @@ -1674,6 +1723,14 @@ class Pool extends EventEmitter { if (this.checkpoints) return; + if (this.options.neutrino) { + const locator = await this.chain.getLocator(); + this.sendLocator(locator, peer); + if (!this.syncing) + this.startFilterHeadersSync(); + return; + } + this.logger.debug( 'Received %d block hashes from peer (%s).', hashes.length, @@ -2048,6 +2105,87 @@ class Pool extends EventEmitter { peer.sendCFCheckpt(packet.filterType, packet.stopHash, filterHeaders); } + /** + * Handle peer `cfheaders` packet. + * @method + * @private + * @param {Peer} peer + * @param {CFHeadersPacket} packet + */ + + async handleCFHeaders(peer, packet) { + if (!this.chain.synced) + return; + if (!this.options.neutrino) + return; + + this.logger.info('Received cfheaders (%s).', peer.hostname()); + const filterType = packet.filterType; + const stopHash = packet.stopHash; + let previousFilterHeader = packet.previousFilterHeader; + const filterHashes = packet.filterHashes; + let blockHeight = await this.chain.getHeight(stopHash) + - filterHashes.length; + const stopHeight = await this.chain.getHeight(stopHash); + + for (const filterHash of filterHashes) { + const basicFilter = new BasicFilter(); + basicFilter._hash = filterHash; + const filterHeader = basicFilter.header(previousFilterHeader); + const lastFilterHeader = this.cfHeaderChain.tail; + const cfHeaderEntry = new CFHeaderEntry( + filterHash, lastFilterHeader.height + 1); + this.cfHeaderChain.push(cfHeaderEntry); + // todo: verify the filterHeader + const blockHash = await this.chain.getHash(blockHeight); + const indexer = this.getFilterIndexer(filtersByVal[filterType]); + await indexer.saveFilterHeader(blockHash, filterHeader, filterHash); + previousFilterHeader = filterHeader; + // todo: add a function for this in chain.js + this.chain.db.neutrinoState.headerHeight = blockHeight; + blockHeight++; + } + await this.chain.db.saveNeutrinoState(); + console.log(this.headerChain.tail.height, stopHeight); + if (this.headerChain.tail.height <= stopHeight) { + this.logger.info('CFHeaders sync complete.'); + this.emit('cfheaders'); + } else { + const nextStopHeight = stopHeight + 2000 < this.chain.height + ? stopHeight + 2000 : this.chain.height; + const nextStopHash = await this.chain.getHash(nextStopHeight); + peer.sendGetCFHeaders(filterType, stopHeight + 1, nextStopHash); + } + } + + /** + * Handle peer `cfilter` packet. + * @method + * @private + * @param {Peer} peer + * @param {CFilterPacket} packet + * @returns {Promise} + */ + + async handleCFilter(peer, packet) { + if (!this.chain.synced) + return; + if (!this.options.neutrino) + return; + + this.logger.info('Received cfilter (%s).', peer.hostname()); + const filterType = packet.filterType; + const blockHash = packet.blockHash; + const filterBytes = packet.filterBytes; + const indexer = this.getFilterIndexer(filtersByVal[filterType]); + const basicFilter = new BasicFilter().fromRaw(filterBytes); + const filterHeader = indexer.getFilterHeader(blockHash); + await indexer.saveFilter(blockHash, basicFilter, filterHeader); + this.chain.db.neutrinoState.headerHeight = + await this.chain.getHeight(blockHash); + await this.chain.db.saveNeutrinoState(); + } + /** * Handle `getblocks` packet. * @method @@ -2179,7 +2317,6 @@ class Pool extends EventEmitter { const headers = packet.items; if (!this.checkpoints && !this.options.neutrino) - // todo add support for checkpoints return; if (!this.syncing) @@ -2255,7 +2392,7 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); // Request the blocks we just added. - if (checkpoint) { + if (checkpoint && !this.options.neutrino) { this.headerChain.shift(); this.resolveHeaders(peer); return; @@ -4555,6 +4692,20 @@ class HeaderEntry { } } +class CFHeaderEntry { + /** + * Create cfheader entry. + * @constructor + */ + + constructor(hash, height) { + this.hash = hash; + this.height = height; + this.prev = null; + this.next = null; + } +} + /* * Expose */ diff --git a/lib/node/neutrino.js b/lib/node/neutrino.js index 3f693dc19..684384b84 100644 --- a/lib/node/neutrino.js +++ b/lib/node/neutrino.js @@ -90,6 +90,7 @@ class Neutrino extends Node { chain: this.chain, prefix: this.config.prefix, checkpoints: true, + filterIndexers: this.filterIndexers, proxy: this.config.str('proxy'), onion: this.config.bool('onion'), upnp: this.config.bool('upnp'), @@ -163,6 +164,14 @@ class Neutrino extends Node { if (this.chain.height === 0) return; this.logger.info('Block Headers are fully synced'); + this.pool.startFilterHeadersSync(); + }); + + this.pool.on('cfheaders', () => { + if (this.chain.height === 0) + return; + this.logger.info('Filter Headers are fully synced'); + this.pool.startFiltersSync(); }); this.loadPlugins();