Skip to content

Commit

Permalink
Merge PR #856 from 'nodech/interactive-scan'
Browse files Browse the repository at this point in the history
  • Loading branch information
nodech committed Dec 15, 2023
2 parents 349d203 + 140265e commit 419924b
Show file tree
Hide file tree
Showing 21 changed files with 1,730 additions and 355 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@
**When upgrading to this version of hsd, you must pass `--wallet-migrate=3` when
you run it for the first time.**

### Primitives
- TX Changes:
- tx.test no longer updates the filter.
- Introduce TX.testAndMaybeUpdate method for potentially updating filter while
testing. (old tx.test)

### Node Changes
Add support for the interactive rescan, that allows more control over rescan
process and allows parallel rescans.

#### Node HTTP API
- `GET /` or `getInfo()` now has more properties:
- `treeRootHeight` - height at which the block txns are accumulated
Expand All @@ -22,6 +31,13 @@ you run it for the first time.**
- `compactInterval` - what is the current compaction interval config.
- `nextCompaction` - when will the next compaction trigger after restart.
- `lastCompaction` - when was the last compaction run.
- Introduce `scan interactive` hook (start, filter)

### Node HTTP Client:
- Introduce `scanInteractive` method that starts interactive rescan.
- expects ws hook for `block rescan interactive` params `rawEntry, rawTXs`
that returns scanAction object.
- expects ws hook for `block rescan interactive abort` param `message`.

### Wallet Changes
#### Configuration
Expand Down
75 changes: 74 additions & 1 deletion lib/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const {OwnershipProof} = require('../covenants/ownership');
const AirdropProof = require('../primitives/airdropproof');
const {CriticalError} = require('../errors');
const thresholdStates = common.thresholdStates;
const scanActions = common.scanActions;
const {states} = NameState;

const {
Expand Down Expand Up @@ -2250,7 +2251,7 @@ class Chain extends AsyncEmitter {

/**
* Scan the blockchain for transactions containing specified address hashes.
* @param {Hash} start - Block hash to start at.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {Bloom} filter - Bloom filter containing tx and address hashes.
* @param {Function} iter - Iterator.
* @returns {Promise}
Expand All @@ -2265,6 +2266,78 @@ class Chain extends AsyncEmitter {
}
}

/**
* Interactive scan the blockchain for transactions containing specified
* address hashes. Allows repeat and abort.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
*/

async scanInteractive(start, filter, iter) {
if (start == null)
start = this.network.genesis.hash;

if (typeof start === 'number')
this.logger.info('Scanning(interactive) from height %d.', start);
else
this.logger.info('Scanning(interactive) from block %x.', start);

let hash = start;

while (hash != null) {
const unlock = await this.locker.lock();

try {
const {entry, txs} = await this.db.scanBlock(hash, filter);

const action = await iter(entry, txs);

if (!action || typeof action !== 'object')
throw new Error('Did not get proper action');

switch (action.type) {
case scanActions.REPEAT: {
break;
}
case scanActions.REPEAT_SET: {
// try again with updated filter.
filter = action.filter;
break;
}
case scanActions.REPEAT_ADD: {
if (!filter)
throw new Error('No filter set.');

for (const chunk of action.chunks)
filter.add(chunk);
break;
}
case scanActions.NEXT: {
const next = await this.getNext(entry);
hash = next && next.hash;
break;
}
case scanActions.ABORT: {
this.logger.info('Scan(interactive) aborted at %x (%d).',
entry.hash, entry.height);
throw new Error('scan request aborted.');
}
default:
this.logger.debug('Scan(interactive) aborting. Unknown action: %d',
action.type);
throw new Error('Unknown action.');
}
} catch (e) {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
}
}
}

/**
* Add a block to the chain, perform all necessary verification.
* @param {Block} block
Expand Down
47 changes: 46 additions & 1 deletion lib/blockchain/chaindb.js
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ class ChainDB {
entry.hash, entry.height);

for (const tx of block.txs) {
if (tx.test(filter))
if (tx.testAndMaybeUpdate(filter))
txs.push(tx);
}

Expand All @@ -1612,6 +1612,51 @@ class ChainDB {
this.logger.info('Finished scanning %d blocks.', total);
}

/**
* Interactive scans block checks.
* @param {Hash|Number} blockID - Block hash or height to start at.
* @param {BloomFilter} [filter] - Starting bloom filter containing tx,
* address and name hashes.
* @returns {Promise}
*/

async scanBlock(blockID, filter) {
assert(blockID != null);

const entry = await this.getEntry(blockID);

if (!entry)
throw new Error('Could not find entry.');

if (!await this.isMainChain(entry))
throw new Error('Cannot rescan an alternate chain.');

const block = await this.getBlock(entry.hash);

if (!block)
throw new Error('Block not found.');

this.logger.info(
'Scanning block %x (%d)',
entry.hash, entry.height);

let txs = [];

if (!filter) {
txs = block.txs;
} else {
for (const tx of block.txs) {
if (tx.testAndMaybeUpdate(filter))
txs.push(tx);
}
}

return {
entry,
txs
};
}

/**
* Save an entry to the database and optionally
* connect it as the tip. Note that this method
Expand Down
15 changes: 15 additions & 0 deletions lib/blockchain/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,18 @@ exports.flags = {
exports.flags.DEFAULT_FLAGS = 0
| exports.flags.VERIFY_POW
| exports.flags.VERIFY_BODY;

/**
* Interactive scan actions.
* @enum {Number}
* @default
*/

exports.scanActions = {
NONE: 0,
ABORT: 1,
NEXT: 2,
REPEAT_SET: 3,
REPEAT_ADD: 4,
REPEAT: 5
};
16 changes: 16 additions & 0 deletions lib/client/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,22 @@ class NodeClient extends Client {

return this.call('rescan', start);
}

/**
* Rescan for any missed transactions. (Interactive)
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} [filter]
* @returns {Promise}
*/

rescanInteractive(start, filter = null) {
if (start == null)
start = 0;

assert(typeof start === 'number' || Buffer.isBuffer(start));

return this.call('rescan interactive', start, filter);
}
}

/*
Expand Down
2 changes: 1 addition & 1 deletion lib/net/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class Peer extends EventEmitter {
// Check the peer's bloom
// filter if they're using spv.
if (this.spvFilter) {
if (!tx.test(this.spvFilter))
if (!tx.testAndMaybeUpdate(this.spvFilter))
continue;
}

Expand Down
14 changes: 13 additions & 1 deletion lib/node/fullnode.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class FullNode extends Node {
/**
* Rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {Bloom} filter
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
Expand All @@ -364,6 +364,18 @@ class FullNode extends Node {
return this.chain.scan(start, filter, iter);
}

/**
* Interactive rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/

scanInteractive(start, filter, iter) {
return this.chain.scanInteractive(start, filter, iter);
}

/**
* Broadcast a transaction.
* @param {TX|Block|Claim|AirdropProof} item
Expand Down
94 changes: 93 additions & 1 deletion lib/node/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const TX = require('../primitives/tx');
const Claim = require('../primitives/claim');
const Address = require('../primitives/address');
const Network = require('../protocol/network');
const scanActions = require('../blockchain/common').scanActions;
const pkg = require('../pkg');

/**
Expand Down Expand Up @@ -706,6 +707,21 @@ class HTTP extends Server {

return this.scan(socket, start);
});

socket.hook('rescan interactive', (...args) => {
const valid = new Validator(args);
const start = valid.uintbhash(0);
const rawFilter = valid.buf(1);
let filter = socket.filter;

if (start == null)
throw new Error('Invalid parameter.');

if (rawFilter)
filter = BloomFilter.fromRaw(rawFilter);

return this.scanInteractive(socket, start, filter);
});
}

/**
Expand Down Expand Up @@ -813,7 +829,7 @@ class HTTP extends Server {
if (!socket.filter)
return false;

return tx.test(socket.filter);
return tx.testAndMaybeUpdate(socket.filter);
}

/**
Expand All @@ -834,6 +850,82 @@ class HTTP extends Server {

return socket.call('block rescan', block, raw);
});

return null;
}

/**
* Scan using a socket's filter (interactive).
* @param {WebSocket} socket
* @param {Hash} start
* @param {BloomFilter} filter
* @returns {Promise}
*/

async scanInteractive(socket, start, filter) {
const iter = async (entry, txs) => {
const block = entry.encode();
const raw = [];

for (const tx of txs)
raw.push(tx.encode());

const action = await socket.call('block rescan interactive', block, raw);
const valid = new Validator(action);
const actionType = valid.i32('type');

switch (actionType) {
case scanActions.NEXT:
case scanActions.ABORT:
case scanActions.REPEAT: {
return {
type: actionType
};
}
case scanActions.REPEAT_SET: {
// NOTE: This is operation is on the heavier side,
// because it sends the whole Filter that can be quite
// big depending on the situation.
// NOTE: In HTTP Context REPEAT_SET wont modify socket.filter
// but instead setup new one for the rescan. Further REPEAT_ADDs will
// modify this filter instead of the socket.filter.
const rawFilter = valid.buf('filter');
let filter = null;

if (rawFilter != null)
filter = BloomFilter.fromRaw(rawFilter);

return {
type: scanActions.REPEAT_SET,
filter: filter
};
}
case scanActions.REPEAT_ADD: {
// NOTE: This operation depending on the filter
// that was provided can be either modifying the
// socket.filter or the filter provided by REPEAT_SET.
const chunks = valid.array('chunks');

if (!chunks)
throw new Error('Invalid parameter.');

return {
type: scanActions.REPEAT_ADD,
chunks: chunks
};
}

default:
throw new Error('Unknown action.');
}
};

try {
await this.node.scanInteractive(start, filter, iter);
} catch (err) {
return socket.call('block rescan interactive abort', err.message);
}

return null;
}
}
Expand Down
Loading

0 comments on commit 419924b

Please sign in to comment.