diff --git a/lib/sessions/hypercore.js b/lib/sessions/hypercore.js index 8c7470b..a61da9b 100644 --- a/lib/sessions/hypercore.js +++ b/lib/sessions/hypercore.js @@ -17,6 +17,9 @@ module.exports = class HypercoreSession { if (this._sessionState.hasResource('@hypercore/close-' + id)) { this._sessionState.deleteResource('@hypercore/close-' + id) } + if (this._sessionState.hasResource('@hypercore/download-' + id)) { + this._sessionState.deleteResource('@hypercore/download-' + id) + } const downloadSet = this._downloads.get(id) if (!downloadSet) return for (const resourceId of downloadSet) { @@ -208,4 +211,27 @@ module.exports = class HypercoreSession { core[LOCK] = null this._sessionState.deleteResource(LOCK) } + + async watchDownloads ({ id }) { + if (this._sessionState.hasResource('@hypercore/download-' + id)) { + return + } + const core = this._sessionState.getCore(id) + const downloadListener = seq => { + this._client.hypercore.onDownloadNoReply({ + id, + seq + }) + } + core.on('download', downloadListener) + this._sessionState.addResource('@hypercore/download-' + id, null, () => { + core.removeListener('download', downloadListener) + }) + } + + async unwatchDownloads ({ id }) { + if (this._sessionState.hasResource('@hypercore/download-' + id)) { + this._sessionState.deleteResource('@hypercore/download-' + id) + } + } } diff --git a/test/networked.js b/test/networked.js index 239d1f5..c8b091a 100644 --- a/test/networked.js +++ b/test/networked.js @@ -459,6 +459,67 @@ test('can read a live stream', async t => { t.end() }) +test('can watch downloads and appends', async t => { + const { clients, cleanup } = await createMany(2) + + const client1 = clients[0] + const client2 = clients[1] + const corestore1 = client1.corestore() + const corestore2 = client2.corestore() + + const core1 = corestore1.get() + await core1.ready() + await core1.append(Buffer.from('zero', 'utf8')) + await core1.append(Buffer.from('one', 'utf8')) + await core1.append(Buffer.from('two', 'utf8')) + await client1.network.configure(core1.discoveryKey, { announce: true, lookup: true, flush: true }) + + const core2 = corestore2.get(core1.key) + await core2.ready() + + let downloads = 0 + let appends = 0 + core2.on('download', () => (downloads++)) + core2.on('append', () => (appends++)) + + await client2.network.configure(core1.discoveryKey, { announce: false, lookup: true }) + + let downloadPromise = watchDownloadPromise(core2, 2) + let block = await core2.get(2) + t.same(block.toString('utf8'), 'two') + await downloadPromise + + downloadPromise = watchDownloadPromise(core2, 0) + block = await core2.get(0) + t.same(block.toString('utf8'), 'zero') + await downloadPromise + + t.equal(downloads, 2, 'download count correct') + t.equal(appends, 1, 'append count correct') + + await core1.append(Buffer.from('three', 'utf8')) + await core1.append(Buffer.from('four', 'utf8')) + await core2.update({}) + t.equal(appends, 2, 'append counter after update correct') + + downloadPromise = watchDownloadPromise(core2, 4) + await core2.download(4) + await downloadPromise + t.equal(downloads, 3, 'download counter after download correct') + + await cleanup() + t.end() +}) + +function watchDownloadPromise (core, expectedSeq) { + return new Promise((resolve, reject) => { + core.once('download', seq => { + if (seq === expectedSeq) resolve() + else reject(new Error('Expected ' + expectedSeq + ', found ' + seq)) + }) + }) +} + function delay (ms) { return new Promise(resolve => setTimeout(resolve, ms)) }