Skip to content

Commit

Permalink
node: add fullLock option to the interactive rescan.
Browse files Browse the repository at this point in the history
Interactive rescan by default does per block scan lock. This enables
parallel rescans, as well as chain sync while rescan is in progress. But
in specific cases, it may be more beneficial to stop the node from
syncing while the rescan is in progress.
  • Loading branch information
nodech committed Jan 10, 2024
1 parent 43d049e commit ac1f9c6
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 19 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ process and allows parallel rescans.
- `compactInterval` - what is the current compaction interval config.
- `nextCompaction` - when will the next compaction trigger after restart.
- `lastCompaction` - when was the last compaction run.
- Introduce `scan interactive` hook (start, filter)
- Introduce `scan interactive` hook (start, filter, fullLock)

### Node HTTP Client:
- Introduce `scanInteractive` method that starts interactive rescan.
Expand Down
40 changes: 37 additions & 3 deletions lib/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -2273,9 +2273,39 @@ class Chain extends AsyncEmitter {
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/

async scanInteractive(start, filter, iter, fullLock = false) {
if (fullLock) {
const unlock = await this.locker.lock();
try {
// We lock the whole chain, no longer lock per block scan.
return await this._scanInteractive(start, filter, iter, false);
} catch (e) {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
}
}

return this._scanInteractive(start, filter, iter, true);
}

/**
* Interactive scan the blockchain for transactions containing specified
* address hashes. Allows repeat and abort.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
* @param {Boolean} [lockPerScan=true] - if we should lock per block scan.
* @returns {Promise}
*/

async scanInteractive(start, filter, iter) {
async _scanInteractive(start, filter, iter, lockPerScan = true) {
if (start == null)
start = this.network.genesis.hash;

Expand All @@ -2287,7 +2317,10 @@ class Chain extends AsyncEmitter {
let hash = start;

while (hash != null) {
const unlock = await this.locker.lock();
let unlock;

if (lockPerScan)
unlock = await this.locker.lock();

try {
const {entry, txs} = await this.db.scanBlock(hash, filter);
Expand Down Expand Up @@ -2333,7 +2366,8 @@ class Chain extends AsyncEmitter {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
if (lockPerScan)
unlock();
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions lib/client/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,16 +339,17 @@ class NodeClient extends Client {
* Rescan for any missed transactions. (Interactive)
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} [filter]
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/

rescanInteractive(start, filter = null) {
rescanInteractive(start, filter = null, fullLock = false) {
if (start == null)
start = 0;

assert(typeof start === 'number' || Buffer.isBuffer(start));

return this.call('rescan interactive', start, filter);
return this.call('rescan interactive', start, filter, fullLock);
}
}

Expand Down
6 changes: 4 additions & 2 deletions lib/node/fullnode.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,13 @@ class FullNode extends Node {
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @param {Boolean} [fullLock=false] - lock the whole chain instead of per
* scan.
* @returns {Promise}
*/

scanInteractive(start, filter, iter) {
return this.chain.scanInteractive(start, filter, iter);
scanInteractive(start, filter, iter, fullLock = false) {
return this.chain.scanInteractive(start, filter, iter, fullLock);
}

/**
Expand Down
8 changes: 5 additions & 3 deletions lib/node/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ class HTTP extends Server {
const valid = new Validator(args);
const start = valid.uintbhash(0);
const rawFilter = valid.buf(1);
const fullLock = valid.bool(2, false);
let filter = socket.filter;

if (start == null)
Expand All @@ -720,7 +721,7 @@ class HTTP extends Server {
if (rawFilter)
filter = BloomFilter.fromRaw(rawFilter);

return this.scanInteractive(socket, start, filter);
return this.scanInteractive(socket, start, filter, fullLock);
});
}

Expand Down Expand Up @@ -859,10 +860,11 @@ class HTTP extends Server {
* @param {WebSocket} socket
* @param {Hash} start
* @param {BloomFilter} filter
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/

async scanInteractive(socket, start, filter) {
async scanInteractive(socket, start, filter, fullLock = false) {
const iter = async (entry, txs) => {
const block = entry.encode();
const raw = [];
Expand Down Expand Up @@ -921,7 +923,7 @@ class HTTP extends Server {
};

try {
await this.node.scanInteractive(start, filter, iter);
await this.node.scanInteractive(start, filter, iter, fullLock);
} catch (err) {
return socket.call('block rescan interactive abort', err.message);
}
Expand Down
96 changes: 88 additions & 8 deletions test/node-rescan-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,57 @@ describe('Node Rescan Interactive API', function() {
node.scanInteractive(startHeight, null, getIter(counter2))
]);

assert.strictEqual(counter1.count, 10);
assert.strictEqual(counter2.count, 10);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);

// Chain gets locked per block, so we should see alternating events.
// Chain gets locked per block by default, so we should see alternating events.
// Because they start in parallel, but id1 starts first they will be
// getting events in alternating older (first one gets lock, second waits,
// second gets lock, first waits, etc.)
for (let i = 0; i < 10; i++) {
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + 1].id, 2);
i++;
}
});

it('should rescan in series', async () => {
const {node} = nodeCtx;
const startHeight = nodeCtx.height - RESCAN_DEPTH + 1;

const events = [];
const getIter = (counterObj) => {
return async (entry, txs) => {
assert.strictEqual(entry.height, startHeight + counterObj.count);
assert.strictEqual(txs.length, 4);

events.push({ ...counterObj });
counterObj.count++;

return {
type: scanActions.NEXT
};
};
};

const counter1 = { id: 1, count: 0 };
const counter2 = { id: 2, count: 0 };
await Promise.all([
node.scanInteractive(startHeight, null, getIter(counter1), true),
node.scanInteractive(startHeight, null, getIter(counter2), true)
]);

assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);

// We lock the whole chain for this test, so we should see events
// from one to other.
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + RESCAN_DEPTH].id, 2);
}
});

describe('HTTP', function() {
let client = null;

Expand Down Expand Up @@ -456,7 +493,7 @@ describe('Node Rescan Interactive API', function() {
filter = test.filter.encode();

await client.rescanInteractive(startHeight, filter);
assert.strictEqual(count, 10);
assert.strictEqual(count, RESCAN_DEPTH);

count = 0;
if (test.filter)
Expand Down Expand Up @@ -757,20 +794,63 @@ describe('Node Rescan Interactive API', function() {
client2.rescanInteractive(startHeight)
]);

assert.strictEqual(counter1.count, 10);
assert.strictEqual(counter2.count, 10);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);

// Chain gets locked per block, so we should see alternating events.
// Because they start in parallel, but id1 starts first they will be
// getting events in alternating older (first one gets lock, second waits,
// second gets lock, first waits, etc.)
for (let i = 0; i < 10; i++) {
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + 1].id, 2);
i++;
}
});

it('should rescan in series', async () => {
const client2 = nodeCtx.nodeClient();
await client2.open();

const startHeight = nodeCtx.height - RESCAN_DEPTH + 1;
const events = [];
const counter1 = { id: 1, count: 0 };
const counter2 = { id: 2, count: 0 };

const getIter = (counterObj) => {
return async (rawEntry, rawTXs) => {
const [entry, txs] = parseBlock(rawEntry, rawTXs);
assert.strictEqual(entry.height, startHeight + counterObj.count);
assert.strictEqual(txs.length, 4);

events.push({ ...counterObj });
counterObj.count++;

return {
type: scanActions.NEXT
};
};
};

client.hook('block rescan interactive', getIter(counter1));
client2.hook('block rescan interactive', getIter(counter2));

await Promise.all([
client.rescanInteractive(startHeight, null, true),
client2.rescanInteractive(startHeight, null, true)
]);

assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);

// We lock the whole chain for this test, so we should see events
// from one to other.
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + RESCAN_DEPTH].id, 2);
}
});

// Make sure the client closing does not cause the chain locker to get
// indefinitely locked. (https://github.com/bcoin-org/bsock/pull/11)
it('should stop rescan when client closes', async () => {
Expand Down

0 comments on commit ac1f9c6

Please sign in to comment.