Skip to content

Commit

Permalink
Watch downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Aug 10, 2020
1 parent 86a4247 commit c8cf5c3
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
26 changes: 26 additions & 0 deletions lib/sessions/hypercore.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ module.exports = class HypercoreSession {
if (this._sessionState.hasResource('@hypercore/close-' + id)) {
this._sessionState.deleteResource('@hypercore/close-' + id)
}
if (this._sessionState.hasResource('@hypercore/download-' + id)) {
this._sessionState.deleteResource('@hypercore/download-' + id)
}
const downloadSet = this._downloads.get(id)
if (!downloadSet) return
for (const resourceId of downloadSet) {
Expand Down Expand Up @@ -208,4 +211,27 @@ module.exports = class HypercoreSession {
core[LOCK] = null
this._sessionState.deleteResource(LOCK)
}

async watchDownloads ({ id }) {
if (this._sessionState.hasResource('@hypercore/download-' + id)) {
return
}
const core = this._sessionState.getCore(id)
const downloadListener = seq => {
this._client.hypercore.onDownloadNoReply({
id,
seq
})
}
core.on('download', downloadListener)
this._sessionState.addResource('@hypercore/download-' + id, null, () => {
core.removeListener('download', downloadListener)
})
}

async unwatchDownloads ({ id }) {
if (this._sessionState.hasResource('@hypercore/download-' + id)) {
this._sessionState.deleteResource('@hypercore/download-' + id)
}
}
}
68 changes: 68 additions & 0 deletions test/networked.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,74 @@ test('can send on a hypercore extension', async t => {
t.end()
})

test('can watch downloads and appends', async t => {
try {
const { clients, cleanup } = await createMany(2)

const client1 = clients[0]
const client2 = clients[1]
const corestore1 = client1.corestore()
const corestore2 = client2.corestore()

const core1 = corestore1.get()
await core1.ready()
await core1.append(Buffer.from('zero', 'utf8'))
await core1.append(Buffer.from('one', 'utf8'))
await core1.append(Buffer.from('two', 'utf8'))
await client1.network.configure(core1.discoveryKey, { announce: true, lookup: true, flush: true })

const core2 = corestore2.get(core1.key)
await core2.watchDownloads()
await core2.ready()

let downloads = 0
let appends = 0
core2.on('download', () => (downloads++))
core2.on('append', () => (appends++))

await client2.network.configure(core1.discoveryKey, { announce: false, lookup: true })

let downloadPromise = watchDownloadPromise(core2, 2)
let block = await core2.get(2)
t.same(block.toString('utf8'), 'two')
await downloadPromise

downloadPromise = watchDownloadPromise(core2, 0)
block = await core2.get(0)
t.same(block.toString('utf8'), 'zero')
await downloadPromise

t.equal(downloads, 2, 'download count correct')
t.equal(appends, 1, 'append count correct')

await core1.append(Buffer.from('three', 'utf8'))
await core1.append(Buffer.from('four', 'utf8'))
await core2.update({})
t.equal(appends, 2, 'append counter after update correct')

downloadPromise = watchDownloadPromise(core2, 4)
await core2.download(4)
await downloadPromise
t.equal(downloads, 3, 'download counter after download correct')

await core2.unwatchDownloads()
await core2.get(3)
t.equal(downloads, 3, 'download counter does not increase after unwatch')

await cleanup()
t.end()
} catch (e) { console.error(e) }
})

function watchDownloadPromise (core, expectedSeq) {
return new Promise((resolve, reject) => {
core.once('download', seq => {
if (seq === expectedSeq) resolve()
else reject(new Error('Expected ' + expectedSeq + ', found ' + seq))
})
})
}

function delay (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}

0 comments on commit c8cf5c3

Please sign in to comment.