Skip to content

Commit

Permalink
adjust moc latencies upon messages, closes and connections
Browse files Browse the repository at this point in the history
  • Loading branch information
folkvir committed Nov 7, 2018
1 parent 0d7cd72 commit a9fb4be
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 47 deletions.
53 changes: 31 additions & 22 deletions bin/n2n-wrtc.bundle.js
Original file line number Diff line number Diff line change
Expand Up @@ -785,25 +785,26 @@ class N2N extends EventEmitter {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).catch(e => {
console.warn('[%s] cannot send the message to %s', this.id, peerId, e)
}).then(() => {
return p.socket.disconnect(this.options.socket)
}).catch(e => {
return p.socket.disconnect()
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
return Promise.resolve()
}
}).catch(e => {
console.error(e)
console.warn('[%s] cannot send the message to %s', this.id, peerId)
if (this.livingOutview.get(peerId).occurences === 0) {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).then(() => {
return p.socket.disconnect()
}).catch(e => {
return p.socket.disconnect()
console.warn('[%s] cannot send the message to %s', this.id, peerId, e)
}).then(() => {
return p.socket.disconnect(this.options.socket)
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
Expand Down Expand Up @@ -2284,10 +2285,7 @@ class DirectSignaling extends SignalingAPI {

const socket = this.parent.createNewSocket(this.parent.options.socket, peerId, true)
socket.on('error', (error) => {
this.parent._manageError(error, peerId, true, (e) => {
this._debug('[%s][%s][_connectToUs] receive an error during the connection %s...', this.parent.id, jobId, id, e)
reject(e)
}, 'direct')
this.parent._manageError(error, peerId, true, reject, 'direct')
})
socket.on(events.socket.EMIT_OFFER, (offer) => {
const off = {
Expand Down Expand Up @@ -3209,7 +3207,8 @@ class Manager {
}
this.manager = new Map()
this._options = {
latency: (send) => { setTimeout(send, 0) }
latency: 0,
connections: 50
}
debugManager('manager initialized')
}
Expand All @@ -3228,17 +3227,22 @@ class Manager {
connect (from, to) {
debugManager('peer connected from/to: ', from, to)
this.manager.get(to)._connectWith(from)
this.manager.get(from)._connectWith(to)
setTimeout(() => {
this.manager.get(from)._connectWith(to)
}, this._options.connections)
}
// @private
destroy (from, to) {
debugManager('peer disconnected from/to: ', from, to)
if (this.manager.get(from)) {
if (this.manager.has(from)) {
this.manager.get(from)._close()
}
if (this.manager.get(to)) {
this.manager.get(to)._close()
}
// disconnect the overside of the socket after one second
setTimeout(() => {
if (this.manager.has(to)) {
this.manager.get(to)._close()
}
}, this._options.connections)
}
// @private
send (from, to, msg, retry = 0) {
Expand All @@ -3247,9 +3251,12 @@ class Manager {
// @private
_send (from, to, msg, retry = 0) {
try {
if (!this.manager.has(from) || !this.manager.has(to)) throw new Error('need a (from) and (to) peer.')
this.manager.get(to).emit('data', msg)
this._statistics.message++
if (!this.manager.has(from) || !this.manager.has(to)) {
throw new Error('need a (from) and (to) peer.')
} else {
this._statistics.message++
this.manager.get(to)._receive(msg)
}
} catch (e) {
throw new Error('cannot send the message. perhaps your destination is not reachable.', e)
}
Expand Down Expand Up @@ -3284,14 +3291,16 @@ module.exports = class SimplePeerAbstract extends EventEmitter {
})
}
this._manager.set(this.id, this)
this.on('internal_close', () => {
this._manager.manager.delete(this.id)
})
}
// @private
static get manager () {
return manager
}
_receive (data) {
setTimeout(() => {
this.emit('data', data)
}, this._manager._options.latency)
}
send (data) {
if (!this.connectedWith) {
this.messageBuffer.push(data)
Expand Down Expand Up @@ -3327,7 +3336,7 @@ module.exports = class SimplePeerAbstract extends EventEmitter {
}
// @private
_close () {
this.emit('internal_close')
this._manager.manager.delete(this.id)
debugManager('[%s] is closed.', this.id)
this.emit('close')
}
Expand Down
2 changes: 1 addition & 1 deletion bin/n2n-wrtc.bundle.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bin/n2n-wrtc.bundle.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bin/n2n-wrtc.bundle.min.js.map

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -555,25 +555,26 @@ class N2N extends EventEmitter {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).catch(e => {
console.warn('[%s] cannot send the message to %s', this.id, peerId, e)
}).then(() => {
return p.socket.disconnect(this.options.socket)
}).catch(e => {
return p.socket.disconnect()
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
return Promise.resolve()
}
}).catch(e => {
console.error(e)
console.warn('[%s] cannot send the message to %s', this.id, peerId)
if (this.livingOutview.get(peerId).occurences === 0) {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).then(() => {
return p.socket.disconnect()
}).catch(e => {
return p.socket.disconnect()
console.warn('[%s] cannot send the message to %s', this.id, peerId, e)
}).then(() => {
return p.socket.disconnect(this.options.socket)
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
Expand Down
5 changes: 1 addition & 4 deletions lib/signaling/direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,7 @@ class DirectSignaling extends SignalingAPI {

const socket = this.parent.createNewSocket(this.parent.options.socket, peerId, true)
socket.on('error', (error) => {
this.parent._manageError(error, peerId, true, (e) => {
this._debug('[%s][%s][_connectToUs] receive an error during the connection %s...', this.parent.id, jobId, id, e)
reject(e)
}, 'direct')
this.parent._manageError(error, peerId, true, reject, 'direct')
})
socket.on(events.socket.EMIT_OFFER, (offer) => {
const off = {
Expand Down
37 changes: 24 additions & 13 deletions lib/sockets/webrtc-moc.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Manager {
}
this.manager = new Map()
this._options = {
latency: (send) => { setTimeout(send, 0) }
latency: 0,
connections: 50
}
debugManager('manager initialized')
}
Expand All @@ -45,17 +46,22 @@ class Manager {
connect (from, to) {
debugManager('peer connected from/to: ', from, to)
this.manager.get(to)._connectWith(from)
this.manager.get(from)._connectWith(to)
setTimeout(() => {
this.manager.get(from)._connectWith(to)
}, this._options.connections)
}
// @private
destroy (from, to) {
debugManager('peer disconnected from/to: ', from, to)
if (this.manager.get(from)) {
if (this.manager.has(from)) {
this.manager.get(from)._close()
}
if (this.manager.get(to)) {
this.manager.get(to)._close()
}
// disconnect the overside of the socket after one second
setTimeout(() => {
if (this.manager.has(to)) {
this.manager.get(to)._close()
}
}, this._options.connections)
}
// @private
send (from, to, msg, retry = 0) {
Expand All @@ -64,9 +70,12 @@ class Manager {
// @private
_send (from, to, msg, retry = 0) {
try {
if (!this.manager.has(from) || !this.manager.has(to)) throw new Error('need a (from) and (to) peer.')
this.manager.get(to).emit('data', msg)
this._statistics.message++
if (!this.manager.has(from) || !this.manager.has(to)) {
throw new Error('need a (from) and (to) peer.')
} else {
this._statistics.message++
this.manager.get(to)._receive(msg)
}
} catch (e) {
throw new Error('cannot send the message. perhaps your destination is not reachable.', e)
}
Expand Down Expand Up @@ -101,14 +110,16 @@ module.exports = class SimplePeerAbstract extends EventEmitter {
})
}
this._manager.set(this.id, this)
this.on('internal_close', () => {
this._manager.manager.delete(this.id)
})
}
// @private
static get manager () {
return manager
}
_receive (data) {
setTimeout(() => {
this.emit('data', data)
}, this._manager._options.latency)
}
send (data) {
if (!this.connectedWith) {
this.messageBuffer.push(data)
Expand Down Expand Up @@ -144,7 +155,7 @@ module.exports = class SimplePeerAbstract extends EventEmitter {
}
// @private
_close () {
this.emit('internal_close')
this._manager.manager.delete(this.id)
debugManager('[%s] is closed.', this.id)
this.emit('close')
}
Expand Down
2 changes: 2 additions & 0 deletions tests/connect4u-null-peer-test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const N2N = require('../lib').N2N
const assert = require('assert')
const utils = require('../lib/utils')

describe('[N2N] Offline connection', function () {
this.timeout(2 * 60 * 1000)
Expand Down Expand Up @@ -30,6 +31,7 @@ describe('[N2N] Offline connection', function () {
assert.strictEqual(c.livingOutview.has(b.id), false)
assert.strictEqual(c.livingInview.get(b.id).occurences, 1)
await b.connect4u(null, c.id)
await utils.timeout(500)
assert.strictEqual(b.livingOutview.get(c.id).occurences, 2)
assert.strictEqual(c.livingOutview.has(b.id), false)
assert.strictEqual(c.livingInview.get(b.id).occurences, 2)
Expand Down
3 changes: 3 additions & 0 deletions tests/crash-test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const N2N = require('../lib').N2N
const assert = require('assert')
const utils = require('../lib/utils')

describe('[N2N] Crash test', function () {
this.timeout(10 * 1000)
Expand Down Expand Up @@ -31,6 +32,8 @@ describe('[N2N] Crash test', function () {
await a.connect(b)
await a.connect(b)
await a.connect(b)
// because the moc is enabled, and the code is synchronous we need to wait for messages
await utils.timeout(500)
return new Promise((resolve, reject) => {
let o = 0
const done = (occ) => {
Expand Down
4 changes: 4 additions & 0 deletions tests/n2n-test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const N2N = require('../lib').N2N
const assert = require('assert')
const utils = require('../lib/utils')
const socket = {
moc: true,
tricle: true
Expand Down Expand Up @@ -142,6 +143,7 @@ describe('N2N connection', function () {
assert.strictEqual(b.livingOutview.get(a.id).occurences, 1)
assert.strictEqual(b.livingOutview.get(a.id).lock, 0)
await a.disconnect()
await utils.timeout(500)
assert.strictEqual(a.getNeighboursIds().length, 0)
assert.strictEqual(b.getNeighboursIds().length, 1)
assert.strictEqual(a.livingInview.size, 1)
Expand Down Expand Up @@ -187,13 +189,15 @@ describe('N2N connection', function () {
assert.strictEqual(b.livingOutview.get(a.id).occurences, 1)
assert.strictEqual(b.livingOutview.get(a.id).lock, 0)
await a.disconnect()
await utils.timeout(500)
assert.strictEqual(a.getNeighboursIds().length, 0)
assert.strictEqual(b.getNeighboursIds().length, 1)
assert.strictEqual(a.livingInview.size, 1)
assert.strictEqual(a.livingOutview.size, 0)
assert.strictEqual(b.livingInview.size, 0)
assert.strictEqual(b.livingOutview.size, 1)
await b.disconnect()
await utils.timeout(500)
assert.strictEqual(a.getNeighboursIds().length, 0)
assert.strictEqual(b.getNeighboursIds().length, 0)
assert.strictEqual(a.livingInview.size, 0)
Expand Down

0 comments on commit a9fb4be

Please sign in to comment.