Skip to content

Commit

Permalink
broadcast updated to FIFO broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
folkvir committed Oct 2, 2018
1 parent 472d940 commit 0d6ef2b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Easy use of WebRTC Networks with embedded network management and simple communic
## Features

**Communication primitives**:
- Causal Broadcast (to all peers in your network, with anti entropy not enabled by default)
- Broadcast (to all peers in your network, with an anti-entropy mechanism not enabled by default). We ensure single delivery and a causal relation between 2 consecutive messages from a same site.
- Unicast (to one direct neighbor)
- Multicast (to one or several direct neighbors)
- Streaming over our Causal Broadcast and Unicast
- Streaming over our Broadcast and Unicast
- Multiple communication channel per network

We only support Data Channel for the moment.
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/foglet.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ const DEFAULT_OPTIONS = () => {
return {
verbose: true, // want some logs ? switch to false otherwise
rps: {
type: 'spray-wrtc',
type: 'cyclon',
options: {
protocol: 'foglet-example-rps', // foglet running on the protocol foglet-example, defined for spray-wrtc
webrtc: { // add WebRTC options
trickle: true, // enable trickle (divide offers in multiple small offers sent by pieces)
config: {iceServers: []} // define iceServers in non local instance
},
timeout: 60 * 1000, // spray-wrtc timeout before definitively close a WebRTC connection.
timeout: 5 * 1000, // spray-wrtc timeout before definitively close a WebRTC connection.
pendingTimeout: 60 * 1000,
delta: 60 * 1000, // spray-wrtc shuffle interval
maxPeers: 5,
Expand All @@ -64,6 +64,7 @@ const DEFAULT_OPTIONS = () => {
},
overlays: [
// {
// name: 'yourOverlayName' // required to the network using the overlay function of the foglet instance
// class: YourOverlayClass,
// options: {
// delta: 10 * 1000,
Expand Down Expand Up @@ -102,7 +103,7 @@ const DEFAULT_OPTIONS = () => {
* // let's create a simple application that send message in broadcast
* const foglet = new Foglet({
* rps: {
* type: 'spray-wrtc', // we choose Spray as a our RPS
* type: 'cyclon', // we choose Spray as a our RPS
* options: {
* protocol: 'my-awesome-broadcast-application', // the name of the protocol run by our app
* webrtc: { // some WebRTC options
Expand Down
37 changes: 29 additions & 8 deletions src/network/communication/broadcast/broadcast.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
This broadcast implementation is clearly inspired from https://github.com/Chat-Wane/CausalBroadcastDefinition
This is a causal broadcast customizable, if you want to specifiy
This is a broadcast customizable, if you want to specifiy
Ensure single delivery and causality between 2 consecutive messages from a single site
*/
'use strict'

Expand Down Expand Up @@ -76,14 +77,34 @@ class Broadcast extends AbstractBroadcast {
* @return {boolean}
*/
send (message, id, isReady = undefined) {
const a = id || this._causality.increment()
const broadcastMessage = messages.BroadcastMessage(this._protocol, a, isReady, message)
const messageId = id || this._causality.increment()
if (messageId.e !== this._causality.local.e) {
throw new Error('The id of the identifier need to be equal to: ' + this._causality.local.e)
} else if (messageId.c < this._causality.local.v) {
throw new Error('Cant send the message because the identifier has a counter lower than our local counter: need to be equal to ' + this._causality.local.v + 1)
} else if (messageId.c > this._causality.local.v + 1) {
throw new Error('Cant send the message because the identifier has a counter higher than the counter accepted: need to be equal to ' + this._causality.local.v + 1)
}
let rdy = isReady
if (!rdy) {
// if the counter is higher than one, it means that we already send messages on the network
if (messageId.c > 1) {
rdy = {
e: messageId.e,
c: messageId.c - 1
}
}
}
const broadcastMessage = this._createBroadcastMessage(message, messageId, rdy)
// #2 register the message in the structure
this._causality.incrementFrom(a)

this._causality.incrementFrom(messageId)
// #3 send the message to the neighborhood
this._sendAll(broadcastMessage)
return a
return messageId
}

_createBroadcastMessage (message, id, isReady) {
return messages.BroadcastMessage(this._protocol, id, isReady, message)
}

/**
Expand Down Expand Up @@ -179,8 +200,8 @@ class Broadcast extends AbstractBroadcast {

default: {
if (!this._shouldStopPropagation(message)) {
// #1 register the operation
// maintain `this._buffer` sorted to search in O(log n)
// #1 register the operation
// maintain `this._buffer` sorted to search in O(log n)
const index = sortedIndexBy(this._buffer, message, formatID)
this._buffer.splice(index, 0, message)
// #2 deliver
Expand Down
41 changes: 41 additions & 0 deletions tests/foglet-communication-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,45 @@ describe('Foglet High-level communication', function () {
}, 2000)
}).catch(done)
})
it('should receive broadcasted weirdly ordered messages in a 3 peers network (1-3-2-4) (second test)', function (done) {
const foglets = utils.buildFog(Foglet, 3)
const f1 = foglets[0]
const f2 = foglets[1]
const f3 = foglets[2]

let cptA = 0
let cptB = 0
const results = [ '1', '3', '2', '4' ]
const totalResult = 8
const check = utils.doneAfter(totalResult, () => {
utils.clearFoglets(foglets).then(() => done())
})

utils.pathConnect(foglets, 2000).then(() => {
f2.onBroadcast((id, message) => {
assert.equal(id, f1.outViewID)
assert.equal(message, results[cptA])
cptA++
check()
})

f3.onBroadcast((id, message) => {
assert.equal(id, f1.outViewID)
assert.equal(message, results[cptB])
cptB++
check()
})

setTimeout(() => {
const id1 = f1.overlay().communication.broadcast._causality.increment()
const id2 = f1.overlay().communication.broadcast._causality.increment()
const id3 = f1.overlay().communication.sendBroadcast('3', null, id1)
f1.overlay().communication.sendBroadcast('4', null, id2)
setTimeout(() => {
f1.overlay().communication.broadcast._sendAll(f1.overlay().communication.broadcast._createBroadcastMessage('1', id1, null))
f1.overlay().communication.broadcast._sendAll(f1.overlay().communication.broadcast._createBroadcastMessage('2', id2, id3))
}, 2000)
}, 2000)
}).catch(done)
})
})

0 comments on commit 0d6ef2b

Please sign in to comment.