Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wallet: fix missing tx when rescan with filter update #519

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions lib/node/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,14 @@ class HTTP extends Server {
socket.hook('set filter', (...args) => {
const valid = new Validator(args);
const data = valid.buf(0);
const height = valid.i32(1, 0);

if (!data)
throw new Error('Invalid parameter.');

socket.filter = BloomFilter.decode(data);
socket.filterUpdated = true;
socket.filterUpdatedHeight = height;

return null;
});
Expand Down Expand Up @@ -772,15 +775,29 @@ class HTTP extends Server {
*/

async scan(socket, start) {
await this.node.scan(start, socket.filter, (entry, txs) => {
const block = entry.encode();
const raw = [];
try {
await this.node.scan(start, socket.filter, (entry, txs) => {
if (socket.filterUpdated) {
throw Error('filter updated');
}

for (const tx of txs)
raw.push(tx.encode());
const block = entry.encode();
const raw = [];

for (const tx of txs)
raw.push(tx.encode());

return socket.call('block rescan', block, raw);
});
} catch (e) {
if (e.message === 'filter updated') {
socket.filterUpdated = false;
return this.scan(socket, socket.filterUpdatedHeight - 1);
}

throw e;
}

return socket.call('block rescan', block, raw);
});
return null;
}
}
Expand Down
6 changes: 4 additions & 2 deletions lib/wallet/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ class WalletClient extends NodeClient {
return super.sendClaim(claim.encode());
}

async setFilter(filter) {
return super.setFilter(filter.encode());
async setFilter(filter, height) {
const filterBuf = filter.encode();
assert(Buffer.isBuffer(filterBuf));
return this.call('set filter', filterBuf, height);
}

async rescan(start) {
Expand Down
5 changes: 3 additions & 2 deletions lib/wallet/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -4151,17 +4151,18 @@ class Wallet extends EventEmitter {
*/

async _add(tx, block) {
let derived = [];
const details = await this.txdb.add(tx, block);

if (details) {
const derived = await this.syncOutputDepth(tx);
derived = await this.syncOutputDepth(tx);
if (derived.length > 0) {
this.wdb.emit('address', this, derived);
this.emit('address', derived);
}
}

return details;
return {details, derived};
}

/**
Expand Down
31 changes: 24 additions & 7 deletions lib/wallet/walletdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class WalletDB extends EventEmitter {

// Address and outpoint filter.
this.filter = new BloomFilter();
this.filterUpdated = false;

this.init();
}
Expand Down Expand Up @@ -195,6 +196,8 @@ class WalletDB extends EventEmitter {
this.state.height,
this.state.startHeight);

await this.migrateChange();

const wallet = await this.ensure({
id: 'primary'
});
Expand All @@ -206,8 +209,6 @@ class WalletDB extends EventEmitter {
wallet.id, wallet.wid, addr.toString(this.network));

this.primary = wallet;

await this.migrateChange();
}

/**
Expand Down Expand Up @@ -597,12 +598,15 @@ class WalletDB extends EventEmitter {
* @returns {Promise}
*/

syncFilter() {
async syncFilter() {
const height = this.state.height;
this.logger.info('Sending filter to server (%dmb).',
this.filter.size / 8 / (1 << 20));

this.filterSent = true;
return this.client.setFilter(this.filter);
this.filterUpdated = false;

return this.client.setFilter(this.filter, height);
}

/**
Expand Down Expand Up @@ -2152,8 +2156,15 @@ class WalletDB extends EventEmitter {
total += 1;
}

// Sync the state to the new tip.
await this.setTip(tip);
// Make sure to update the filter with the node
// server and the p2p network.
if (this.filterUpdated) {
await this.syncFilter();
await this.rollback(this.state.height - 1);
} else {
// Sync the state to the new tip.
await this.setTip(tip);
}
} finally {
this.confirming = false;
}
Expand Down Expand Up @@ -2309,12 +2320,18 @@ class WalletDB extends EventEmitter {

assert(wallet);

if (await wallet.add(tx, block)) {
const {details, derived} = await wallet.add(tx, block);

if (details) {
this.logger.info(
'Added transaction to wallet in WalletDB: %s (%d).',
wallet.id, wid);
result = true;
}

if (derived.length > 0) {
this.filterUpdated = true;
}
}

if (!result)
Expand Down
164 changes: 164 additions & 0 deletions test/node-sync-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/* eslint-env mocha */
/* eslint prefer-arrow-callback: "off" */

'use strict';

const assert = require('bsert');
const FullNode = require('../lib/node/fullnode');
const WalletNode = require('../lib/wallet/node');
const {forValue} = require('./util/common');
const WalletKey = require('../lib/wallet/walletkey');

describe('Node Sync', function() {
this.timeout(60000);

const mnemonic = [
'abandon', 'abandon', 'abandon', 'abandon',
'abandon', 'abandon', 'abandon', 'abandon',
'abandon', 'abandon', 'abandon', 'about'
].join(' ');

const ports = {
p2p: 49331,
node: 49332,
wallet: 49333,
rs: 49334,
ns: 49335
};

const lookahead = 10;

let node, node2 = null;
let node2wallet = null;
let wdb, wdb2 = null;
let wallet, wallet2 = null;

before(async () => {
/**
* Setup initial nodes and wallets.
*/

node = new FullNode({
memory: true,
apiKey: 'foo',
network: 'regtest',
workers: true,
workersSize: 2,
bip37: true,
plugins: [require('../lib/wallet/plugin')],
listen: true,
port: ports.p2p,
httpPort: ports.node,
rsPort: ports.rs,
nsPort: ports.ns,
env: {
'BCOIN_WALLET_HTTP_PORT': ports.wallet.toString()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might need to be changed to HSD_WALLET_HTTP_PORT :-)

}
});

await node.open();

node2 = new FullNode({
memory: true,
apiKey: 'foo',
network: 'regtest',
workers: true,
workersSize: 2,
port: ports.p2p + 5,
httpPort: ports.node + 5,
rsPort: ports.rs + 5,
nsPort: ports.ns + 5,
only: [`127.0.0.1:${ports.p2p}`]
});

await node2.open();

node2wallet = new WalletNode({
httpPort: ports.wallet + 5,
nodePort: ports.node + 5,
nodeApiKey: 'foo',
network: 'regtest'
});

await node2wallet.open();

/**
* Generate blocks and transactions.
*/

await node.connect();

// Prepare the miner and wallet.
const {miner, chain} = node;
wdb = node.require('walletdb').wdb;
wallet = await wdb.create();
miner.addAddress(await wallet.receiveAddress());

// Mature the initial coins to use for the
// use in generating the test case.
for (let i = 0; i < 200; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in hsd the coinbase maturity for regtest is much smaller so we can reduce this here.

regtest.coinbaseMaturity = 2;

const block = await miner.cpu.mineBlock();
assert(await chain.add(block));
}

assert.strictEqual(chain.height, 200);

// Prepare the full node wallet.
wdb2 = node2wallet.wdb;
wallet2 = await wdb2.create({mnemonic});

let index = 0;

// Generate several blocks of transactions for
// the identical wallets.
for (let b = 0; b < 5; b++) {
const account = await wallet2.getAccount(0);
assert.equal(account.lookahead, lookahead);
let count = 0;

// Include more transactions than the lookahead
// within the block. The filter will need to be updated
// and re-download the same block.
while (count < lookahead + 1) {
const branch = 0;
const key = account.accountKey.derive(branch).derive(index);
const ring = WalletKey.fromHD(account, key, branch, index);
const spvaddr = ring.getAddress();

await wallet.send({
subtractFee: true,
outputs: [{
address: spvaddr,
value: 10000
}]
});

index += 1;
count += 1;
}

const block = await miner.mineBlock();
assert(await chain.add(block));
}
});

after(async () => {
await node.close();
await node2wallet.close();
await node2.close();
});

it('should sync with node and wallet (full)', async () => {
await node2.connect();
await node2.startSync();

await forValue(wdb2, 'height', 205);
await forValue(node2.chain, 'height', 205);

await new Promise(r => setTimeout(() => r(), 3000));

const bal = await wallet2.getBalance();
assert.equal(bal.tx, 5 * lookahead + 5);
assert.equal(bal.coin, 5 * lookahead + 5);
});
});