diff --git a/lib/node/http.js b/lib/node/http.js index d0e266e65..914c998ac 100644 --- a/lib/node/http.js +++ b/lib/node/http.js @@ -523,11 +523,14 @@ class HTTP extends Server { socket.hook('set filter', (...args) => { const valid = new Validator(args); const data = valid.buf(0); + const height = valid.i32(1, 0); if (!data) throw new Error('Invalid parameter.'); socket.filter = BloomFilter.decode(data); + socket.filterUpdated = true; + socket.filterUpdatedHeight = height; return null; }); @@ -772,15 +775,29 @@ class HTTP extends Server { */ async scan(socket, start) { - await this.node.scan(start, socket.filter, (entry, txs) => { - const block = entry.encode(); - const raw = []; + try { + await this.node.scan(start, socket.filter, (entry, txs) => { + if (socket.filterUpdated) { + throw Error('filter updated'); + } - for (const tx of txs) - raw.push(tx.encode()); + const block = entry.encode(); + const raw = []; + + for (const tx of txs) + raw.push(tx.encode()); + + return socket.call('block rescan', block, raw); + }); + } catch (e) { + if (e.message === 'filter updated') { + socket.filterUpdated = false; + return this.scan(socket, socket.filterUpdatedHeight - 1); + } + + throw e; + } - return socket.call('block rescan', block, raw); - }); return null; } } diff --git a/lib/wallet/client.js b/lib/wallet/client.js index 15ccf2b9c..034f8d89f 100644 --- a/lib/wallet/client.js +++ b/lib/wallet/client.js @@ -69,8 +69,10 @@ class WalletClient extends NodeClient { return super.sendClaim(claim.encode()); } - async setFilter(filter) { - return super.setFilter(filter.encode()); + async setFilter(filter, height) { + const filterBuf = filter.encode(); + assert(Buffer.isBuffer(filterBuf)); + return this.call('set filter', filterBuf, height); } async rescan(start) { diff --git a/lib/wallet/wallet.js b/lib/wallet/wallet.js index 5fe6181df..a7966e624 100644 --- a/lib/wallet/wallet.js +++ b/lib/wallet/wallet.js @@ -4151,17 +4151,18 @@ class Wallet extends EventEmitter { */ async _add(tx, block) { + let derived = []; const details = await this.txdb.add(tx, block); if (details) { - const derived = await this.syncOutputDepth(tx); + derived = await this.syncOutputDepth(tx); if (derived.length > 0) { this.wdb.emit('address', this, derived); this.emit('address', derived); } } - return details; + return {details, derived}; } /** diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index e61e836f5..5b76faae1 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -80,6 +80,7 @@ class WalletDB extends EventEmitter { // Address and outpoint filter. this.filter = new BloomFilter(); + this.filterUpdated = false; this.init(); } @@ -195,6 +196,8 @@ class WalletDB extends EventEmitter { this.state.height, this.state.startHeight); + await this.migrateChange(); + const wallet = await this.ensure({ id: 'primary' }); @@ -206,8 +209,6 @@ class WalletDB extends EventEmitter { wallet.id, wallet.wid, addr.toString(this.network)); this.primary = wallet; - - await this.migrateChange(); } /** @@ -597,12 +598,15 @@ class WalletDB extends EventEmitter { * @returns {Promise} */ - syncFilter() { + async syncFilter() { + const height = this.state.height; this.logger.info('Sending filter to server (%dmb).', this.filter.size / 8 / (1 << 20)); this.filterSent = true; - return this.client.setFilter(this.filter); + this.filterUpdated = false; + + return this.client.setFilter(this.filter, height); } /** @@ -2152,8 +2156,15 @@ class WalletDB extends EventEmitter { total += 1; } - // Sync the state to the new tip. - await this.setTip(tip); + // Make sure to update the filter with the node + // server and the p2p network. + if (this.filterUpdated) { + await this.syncFilter(); + await this.rollback(this.state.height - 1); + } else { + // Sync the state to the new tip. + await this.setTip(tip); + } } finally { this.confirming = false; } @@ -2309,12 +2320,18 @@ class WalletDB extends EventEmitter { assert(wallet); - if (await wallet.add(tx, block)) { + const {details, derived} = await wallet.add(tx, block); + + if (details) { this.logger.info( 'Added transaction to wallet in WalletDB: %s (%d).', wallet.id, wid); result = true; } + + if (derived.length > 0) { + this.filterUpdated = true; + } } if (!result) diff --git a/test/node-sync-test.js b/test/node-sync-test.js new file mode 100644 index 000000000..c64c273fc --- /dev/null +++ b/test/node-sync-test.js @@ -0,0 +1,164 @@ +/* eslint-env mocha */ +/* eslint prefer-arrow-callback: "off" */ + +'use strict'; + +const assert = require('bsert'); +const FullNode = require('../lib/node/fullnode'); +const WalletNode = require('../lib/wallet/node'); +const {forValue} = require('./util/common'); +const WalletKey = require('../lib/wallet/walletkey'); + +describe('Node Sync', function() { + this.timeout(60000); + + const mnemonic = [ + 'abandon', 'abandon', 'abandon', 'abandon', + 'abandon', 'abandon', 'abandon', 'abandon', + 'abandon', 'abandon', 'abandon', 'about' + ].join(' '); + + const ports = { + p2p: 49331, + node: 49332, + wallet: 49333, + rs: 49334, + ns: 49335 + }; + + const lookahead = 10; + + let node, node2 = null; + let node2wallet = null; + let wdb, wdb2 = null; + let wallet, wallet2 = null; + + before(async () => { + /** + * Setup initial nodes and wallets. + */ + + node = new FullNode({ + memory: true, + apiKey: 'foo', + network: 'regtest', + workers: true, + workersSize: 2, + bip37: true, + plugins: [require('../lib/wallet/plugin')], + listen: true, + port: ports.p2p, + httpPort: ports.node, + rsPort: ports.rs, + nsPort: ports.ns, + env: { + 'BCOIN_WALLET_HTTP_PORT': ports.wallet.toString() + } + }); + + await node.open(); + + node2 = new FullNode({ + memory: true, + apiKey: 'foo', + network: 'regtest', + workers: true, + workersSize: 2, + port: ports.p2p + 5, + httpPort: ports.node + 5, + rsPort: ports.rs + 5, + nsPort: ports.ns + 5, + only: [`127.0.0.1:${ports.p2p}`] + }); + + await node2.open(); + + node2wallet = new WalletNode({ + httpPort: ports.wallet + 5, + nodePort: ports.node + 5, + nodeApiKey: 'foo', + network: 'regtest' + }); + + await node2wallet.open(); + + /** + * Generate blocks and transactions. + */ + + await node.connect(); + + // Prepare the miner and wallet. + const {miner, chain} = node; + wdb = node.require('walletdb').wdb; + wallet = await wdb.create(); + miner.addAddress(await wallet.receiveAddress()); + + // Mature the initial coins to use for the + // use in generating the test case. + for (let i = 0; i < 200; i++) { + const block = await miner.cpu.mineBlock(); + assert(await chain.add(block)); + } + + assert.strictEqual(chain.height, 200); + + // Prepare the full node wallet. + wdb2 = node2wallet.wdb; + wallet2 = await wdb2.create({mnemonic}); + + let index = 0; + + // Generate several blocks of transactions for + // the identical wallets. + for (let b = 0; b < 5; b++) { + const account = await wallet2.getAccount(0); + assert.equal(account.lookahead, lookahead); + let count = 0; + + // Include more transactions than the lookahead + // within the block. The filter will need to be updated + // and re-download the same block. + while (count < lookahead + 1) { + const branch = 0; + const key = account.accountKey.derive(branch).derive(index); + const ring = WalletKey.fromHD(account, key, branch, index); + const spvaddr = ring.getAddress(); + + await wallet.send({ + subtractFee: true, + outputs: [{ + address: spvaddr, + value: 10000 + }] + }); + + index += 1; + count += 1; + } + + const block = await miner.mineBlock(); + assert(await chain.add(block)); + } + }); + + after(async () => { + await node.close(); + await node2wallet.close(); + await node2.close(); + }); + + it('should sync with node and wallet (full)', async () => { + await node2.connect(); + await node2.startSync(); + + await forValue(wdb2, 'height', 205); + await forValue(node2.chain, 'height', 205); + + await new Promise(r => setTimeout(() => r(), 3000)); + + const bal = await wallet2.getBalance(); + assert.equal(bal.tx, 5 * lookahead + 5); + assert.equal(bal.coin, 5 * lookahead + 5); + }); +});