Skip to content

Commit

Permalink
crash detection, close_out, close_in signal the id and if the arc is …
Browse files Browse the repository at this point in the history
…failed
  • Loading branch information
folkvir committed Nov 2, 2018
1 parent 54a0afa commit 4dbb496
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 50 deletions.
11 changes: 7 additions & 4 deletions examples/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ let g = new sigma({ // eslint-disable-line
container: 'network',
type: 'canvas'
},
settings: Object.assign(sigma.settings, {
settings: Object.assign(sigma.settings, { // eslint-disable-line
defaultEdgeType: 'curvedArrow',
minArrowSize: 10,
scalingMode: 'inside',
sideMargin: 0.5
}) // eslint-disable-line
}) // eslint-disable-line
const moc = false
localStorage.debug = 'n2n:direct'
localStorage.debug = 'n2n:direct' // eslint-disable-line

const a = createNode('a', 0, 0)
const b = createNode('b', 1, 0)
Expand Down Expand Up @@ -56,8 +56,11 @@ function createNode (name, x, y) {
g.refresh()
}
})
node.on('close', (id, outview) => {
console.log('%s closes a con: ', node.id, id, outview)
node.on('close_in', (id, fail) => {
console.log('%s closes an inview connection with %s, fail: %s', node.id, id, fail)
})
node.on('close_out', (id, fail) => {
console.log('%s closes an outview connection with %s, fail: %s', node.id, id, fail)
})
node.on('receive', (id, message) => {
console.log('%s receive a message from %s:', node.id, id, message)
Expand Down
3 changes: 3 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module.exports = {
OCC_DEC: 'o:d' // when we have to decrease the occurence on the inview
},
n2n: {
DISCONNECT: 'disc',
INC_IN: 'inc:in',
DEC_IN: 'dec:in',
CONNECT_TO_US: 'c:2:u',
DIRECT_TO: 'd:2',
DIRECT_BACK: 'd:b',
Expand Down
3 changes: 2 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module.exports = {
N2N: require('./main'),
sockets: require('./sockets'),
signaling: require('./signaling')
signaling: require('./signaling'),
errors: require('./errors')
}
176 changes: 131 additions & 45 deletions lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,16 @@ class N2N extends EventEmitter {
reject(errors.peerNotFound(peerId))
} else {
this.livingOutview.get(peerId).occurences++
this._signalConnect(peerId, true)
resolve()
this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.INC_IN
}).then(() => {
this._signalConnect(peerId, true)
resolve()
}).catch(e => {
console.warn('[%s] cannot send INC_IN message to %s', this.id, peerId)
this._signalConnect(peerId, true)
resolve()
})
}
})
}
Expand All @@ -396,18 +404,75 @@ class N2N extends EventEmitter {
throw new Error('lock cannot be higher than the number of occurences when a deletion is performed.')
} else if (p.occurences > 0 && p.occurences > p.lock) {
this.livingOutview.get(peerId).occurences--
if (this.livingOutview.get(peerId).occurences === 0) {
this._signalDisconnect(peerId, true)
return p.socket.disconnect()
} else {
this._signalDisconnect(peerId, true) // signal disconnect
}
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DEC_IN
}).then(() => {
if (this.livingOutview.get(peerId).occurences === 0) {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).then(() => {
return p.socket.disconnect()
}).catch(e => {
return p.socket.disconnect()
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
}
}).catch(e => {
console.warn('[%s] cannot send the message to %s', this.id, peerId)
if (this.livingOutview.get(peerId).occurences === 0) {
this._signalDisconnect(peerId, true, false)
return this.send(this.options.n2n.protocol, peerId, {
type: events.n2n.DISCONNECT
}).then(() => {
return p.socket.disconnect()
}).catch(e => {
return p.socket.disconnect()
})
} else {
this._signalDisconnect(peerId, true, false) // signal disconnect
}
})
} else {
throw new Error('PLEASE REPORT: decreaseOccOutview')
}
}
}

/**
* Decrease the occurences of our inview id
* @param {String} id identifier of the peer we want to decrease the occurence
* @return {void}
*/
_decreaseInview (id) {
if (this.livingInview.has(id)) {
this.livingInview.get(id).occurences--
this._signalDisconnect(id, false, false)
}
}
/**
* Increase the occurences of our inview id
* @param {String} id identifier of the peer we want to decrease the occurence
* @return {void}
*/
_increaseInview (id) {
if (this.livingInview.has(id)) {
this.livingInview.get(id).occurences++
}
}

/**
* Disconnect the living inview socket corresponding to the id provided.
* @param {String} id identifier of the peer
* @return {void}
*/
_disconnectInview (id) {
if (this.livingInview.has(id)) {
this.livingInview.get(id).socket.disconnect()
}
}

/**
* Simulate a crash by disconnecting all sockets from inview/outview
* @return {void}
Expand All @@ -431,13 +496,6 @@ class N2N extends EventEmitter {
createNewSocket (options, id, outview = false, timeout = this.options.n2n.timeout) {
const newSocket = new this.options.n2n.SocketClass(options)
const sid = newSocket.socketId
// const s = {
// from: this.id,
// to: id,
// socket: newSocket
// }
// this._pending.set(sid, s)
// this._all.set(sid, s)
this._debug('[%s] new socket created: %s with timeout', this.id, newSocket.socketId, timeout)
const tout = setTimeout(() => {
// deletion, the sid need to be the same as declared... otherwise report this error.
Expand Down Expand Up @@ -541,19 +599,21 @@ class N2N extends EventEmitter {
if (p.socket.socketId === socketId) {
this._debug('[%s] close outview: ', this.id, peerId, outview, p)
for (let i = 0; i < (p.occurences); ++i) {
this._signalDisconnect(peerId, outview)
this._signalDisconnect(peerId, outview, true)
}
this._deleteLiving(peerId, outview)
} // else, nothing to do
} else if (!outview && this.livingInview.has(peerId)) {
const p = this.livingInview.get(peerId)
if (p.socket.socketId === socketId) {
this._signalDisconnect(peerId, outview)
for (let i = 0; i < (p.occurences); ++i) {
this._signalDisconnect(peerId, outview, true)
}
this._debug('[%s] close inview: ', this.id, peerId, outview)
this._deleteLiving(peerId, outview)
}
} else {
console.log('[socket does not exist] Connection closed', peerId)
console.warn('[socket does not exist] Connection closed', peerId)
}
}

Expand Down Expand Up @@ -611,13 +671,14 @@ class N2N extends EventEmitter {
* @description Signal when an arc is closed
* @param {string} id Id of the peer of the arc
* @param {Boolean} outview Is an inview or an outview arc
* @param {Boolean} if the arc is a failed arc or an arc that has been well disconnected
* @return {void}
*/
_signalDisconnect (id, outview) {
_signalDisconnect (id, outview, fail = false) {
if (outview) {
this.emit('close_out', id, outview)
this.emit('close_out', id, fail)
} else {
this.emit('close_in', id, outview)
this.emit('close_in', id, fail)
}
}

Expand Down Expand Up @@ -730,42 +791,67 @@ class N2N extends EventEmitter {
}

_receive (id, message) {
try {
if (message && 'type' in message && 'id' in message && message.type === events.n2n.CONNECT_TO_US) {
console.log(id, message)
switch (message.type) {
case events.n2n.DISCONNECT:
this._disconnectInview(id)
break
case events.n2n.DEC_IN:
this._decreaseInview(id)
break
case events.n2n.INC_IN:
this._increaseInview(id)
break
case events.n2n.CONNECT_TO_US:
this.signaling.direct._connectToUs(message)
} else if (message && 'type' in message && 'response' in message && message.type === events.n2n.RESPONSE) {
break
case events.n2n.RESPONSE:
this.events.emit(message.jobId, message)
} else if (message && 'type' in message && message.type === events.n2n.DIRECT_TO) {
break
case events.n2n.DIRECT_TO:
this.signaling.direct.receiveOffer(message)
} else if (message && 'type' in message && message.type === events.n2n.DIRECT_BACK) {
break
case events.n2n.DIRECT_BACK:
this.signaling.direct.receiveOffer(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeIO.BRIDGE) {
break
case events.n2n.bridgeIO.BRIDGE:
this.signaling.bridgeIO._bridge(id, message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeIO.BRIDGE_FORWARD) {
break
case events.n2n.bridgeIO.BRIDGE_FORWARD:
this.signaling.bridgeIO.forward(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeIO.BRIDGE_FORWARD_BACK) {
break
case events.n2n.bridgeIO.BRIDGE_FORWARD_BACK:
this.signaling.bridgeIO.forwardBack(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeIO.BRIDGE_FORWARD_RESPONSE) {
break
case events.n2n.bridgeIO.BRIDGE_FORWARD_RESPONSE:
this.signaling.bridgeIO.receiveOffer(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOO.BRIDGE) {
this.signaling.bridgeOO._bridge(id, message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOO.BRIDGE_FORWARD) {
this.signaling.bridgeOO.forward(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOO.BRIDGE_FORWARD_BACK) {
this.signaling.bridgeOO.forwardBack(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOO.BRIDGE_FORWARD_RESPONSE) {
this.signaling.bridgeOO.receiveOffer(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOI.BRIDGE) {
break
case events.n2n.bridgeOI.BRIDGE:
this.signaling.bridgeOI._bridge(id, message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOI.BRIDGE_FORWARD) {
break
case events.n2n.bridgeOI.BRIDGE_FORWARD:
this.signaling.bridgeOI.forward(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOI.BRIDGE_FORWARD_BACK) {
break
case events.n2n.bridgeOI.BRIDGE_FORWARD_BACK:
this.signaling.bridgeOI.forwardBack(message)
} else if (message && 'type' in message && message.type === events.n2n.bridgeOI.BRIDGE_FORWARD_RESPONSE) {
break
case events.n2n.bridgeOI.BRIDGE_FORWARD_RESPONSE:
this.signaling.bridgeOI.receiveOffer(message)
}
} catch (e) {
console.error('An error here? hum please report...', e)
break
case events.n2n.bridgeOO.BRIDGE:
this.signaling.bridgeOO._bridge(id, message)
break
case events.n2n.bridgeOO.BRIDGE_FORWARD:
this.signaling.bridgeOO.forward(message)
break
case events.n2n.bridgeOO.BRIDGE_FORWARD_BACK:
this.signaling.bridgeOO.forwardBack(message)
break
case events.n2n.bridgeOO.BRIDGE_FORWARD_RESPONSE:
this.signaling.bridgeOO.receiveOffer(message)
break
default:
throw new Error('case not handled.')
}
}
}
Expand Down

0 comments on commit 4dbb496

Please sign in to comment.