Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative Presence Implementation from Propeller Factory #285

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c89c8d9
Removing the effective 'shattering' of remotely received ops.
jeremysf Aug 13, 2018
f191eef
Add 'publish' hook to sharedb to allow modifying the op before it is …
Aug 29, 2018
e56e0e3
pass correct context to publish trigger
Aug 29, 2018
894bf7e
prevent invert attempt on error since 'rich-text' subtype does not su…
Aug 30, 2018
c2d8b68
use newer json0 in sharedb fork
Aug 30, 2018
f749c99
use 1.1.0 json0 explicitly instead of referring to github commit
Aug 30, 2018
212873b
prelim add presense transmit
sidhu663 Mar 25, 2019
1b908a4
update connection.js to transmit presence alongside op
sidhu663 Mar 25, 2019
2387957
Add sendPresence method to client
sidhu663 Mar 25, 2019
32641bb
add sendPresence to client/doc
sidhu663 Mar 25, 2019
9b40a52
use source id from the correct place
sidhu663 Mar 25, 2019
4c53d85
Fix sendPresence unknown reference
sidhu663 Mar 25, 2019
717733a
pass through presence data through op submit
sidhu663 Mar 25, 2019
9297aa0
correctly call submit with the .. correct parameters
sidhu663 Mar 25, 2019
afefc13
attach presence when sending op in agent.js
sidhu663 Mar 25, 2019
8c7df46
fix referencing global this instead of request 'this' in submit-request
sidhu663 Mar 26, 2019
9edd885
update behavior of send presence to take into account inflight ops
sidhu663 Mar 26, 2019
b9636bd
Add support for receiving presence
sidhu663 Mar 26, 2019
82dff80
fix unknown reference
sidhu663 Mar 26, 2019
5d58406
correctly persist presence when delaying send'
sidhu663 Mar 27, 2019
4cb14a8
transform incoming presense by inflight op as well if available; do n…
sidhu663 Mar 29, 2019
9f4a3af
avoid fetching database ops when transforming presence if presence al…
sidhu663 Apr 3, 2019
3786df6
declare variable before using it ...
sidhu663 Apr 3, 2019
8c66251
retry submit op on commit error as well as commit failure
sidhu663 Apr 4, 2019
97b2358
transmit a connection closed message when an agent disconnects
sidhu663 Apr 8, 2019
a37ed29
rename action to notifyUnsubscribe; notify unsubscribe on either unsu…
sidhu663 Apr 8, 2019
18d9e3d
emit notifyUnsubscribe message with collection and doc id; add unsub …
sidhu663 Apr 8, 2019
0211fac
inject the src param into the presence object upon receipt
sidhu663 Apr 9, 2019
d8305a0
fix not returning after broadcasting the notify unsub message
sidhu663 Apr 9, 2019
fe76e78
attach src info to presence that piggybacks on ops
sidhu663 Apr 9, 2019
d6ccf47
only create presence object if presence received
sidhu663 Apr 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 95 additions & 6 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ Agent.prototype.close = function(err) {

Agent.prototype._cleanup = function() {
this.closed = true;

// Clean up doc subscription streams
for (var collection in this.subscribedDocs) {
var docs = this.subscribedDocs[collection];
for (var id in docs) {
this.backend.notifyUnsubscribe(this.clientId, collection, id)
var stream = docs[id];
stream.destroy();
}
Expand Down Expand Up @@ -98,6 +98,17 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
console.error('Doc subscription stream error', collection, id, data.error);
return;
}
if (data.a === 'pr') {
// Received a presense publish from another client
if (data.src !== agent.clientId) agent.send(data);
return
}
if (data.a === 'nus') {
// Another client has unsubscribed from this document
if (data.src !== agent.clientId) agent.send(data)
return
}

if (agent._isOwnOp(collection, data)) return;
agent._sendOp(collection, id, data);
});
Expand Down Expand Up @@ -176,6 +187,7 @@ Agent.prototype._sendOp = function(collection, id, op) {
if (op.op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = true;
if (op.pr) message.pr = op.pr

this.send(message);
};
Expand Down Expand Up @@ -299,7 +311,10 @@ Agent.prototype._handleMessage = function(request, callback) {
case 'op':
var op = this._createOp(request);
if (!op) return callback({code: 4000, message: 'Invalid op message'});
return this._submit(request.c, request.d, op, callback);
return this._submit(request.c, request.d, op, request.pr, callback);
case 'pr': {
return this._sendPresence(request, callback)
}
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
default:
Expand Down Expand Up @@ -496,6 +511,70 @@ Agent.prototype._subscribe = function(collection, id, version, callback) {
});
};


Agent.prototype._sendPresence = function(request, callback) {
const collection = request.c
const id = request.d
const version = request.v
const source = request.src || this.clientId
const from = version
const backend = this.backend
const agent = this.agent
// Send a special projection so that getSnapshot knows to return all fields.
// With a null projection, it strips document metadata
var fields = {$submit: true};

backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) {
if (err) return callback(err);

if (version === snapshot.v) {
// Send presense to other clients
// We don't need to transform because there is no newer version of the document to transform to
var message = {
a: 'pr',
c: collection,
d: id,
v: version,
src: source,
pr: request.pr
};

backend.sendPresence(message, function(err) {
if (err) return callback(err);
});
} else {
// The version of the presence doesn't match the snapshot so fetch the ops
// and give middleware chance to transform the presence
backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) {
if (err) return callback(err);

if (ops.length !== snapshot.v - from) {
return callback(request.missingOpsError());
}

request.opsToSnapshot = ops
backend.trigger('transformPresence', agent, request, function(err) {
if (err) return callback(err);

var message = {
a: 'pr',
c: collection,
d: id,
v: version,
src: source,
pr: request.pr
};

// Send presense to other clients
backend.sendPresence(message, function(err) {
if (err) return callback(err);
});
})
})
}
})
}

Agent.prototype._subscribeBulk = function(collection, versions, callback) {
var agent = this;
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) {
Expand All @@ -516,7 +595,10 @@ Agent.prototype._unsubscribe = function(collection, id, callback) {
// stream or an inflight subscribing state
var docs = this.subscribedDocs[collection];
var stream = docs && docs[id];
if (stream) stream.destroy();
if (stream) {
stream.destroy();
this.backend.notifyUnsubscribe(this.clientId, collection, id)
}
process.nextTick(callback);
};

Expand All @@ -526,14 +608,17 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) {
for (var i = 0; i < ids.length; i++) {
var id = ids[i];
var stream = docs[id];
if (stream) stream.destroy();
if (stream) {
this.backend.notifyUnsubscribe(this.clientId, collection, id)
stream.destroy();
}
}
process.nextTick(callback);
};

Agent.prototype._submit = function(collection, id, op, callback) {
Agent.prototype._submit = function(collection, id, op, presence, callback) {
var agent = this;
this.backend.submit(this, collection, id, op, null, function(err, ops) {
this.backend.submit(this, collection, id, op, presence, null, function(err, ops) {
// Message to acknowledge the op was successfully submitted
var ack = {src: op.src, seq: op.seq, v: op.v};
if (err) {
Expand Down Expand Up @@ -588,3 +673,7 @@ Agent.prototype._createOp = function(request) {
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype.missingOpsError = function() {
return {code: 5019, message: 'Presence send failed. DB missing ops needed to transform it up to the current snapshot version'};
};
20 changes: 18 additions & 2 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ Backend.prototype.trigger = function(action, agent, request, callback) {
// Submit an operation on the named collection/docname. op should contain a
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if
// it doesn't, it defaults to the current version).
Backend.prototype.submit = function(agent, index, id, op, options, callback) {
Backend.prototype.submit = function(agent, index, id, op, presence, options, callback) {
var err = ot.checkOp(op);
if (err) return callback(err);
var request = new SubmitRequest(this, agent, index, id, op, options);
var request = new SubmitRequest(this, agent, index, id, op, presence, options);
var backend = this;
backend.trigger(backend.MIDDLEWARE_ACTIONS.submit, agent, request, function(err) {
if (err) return callback(err);
Expand All @@ -239,6 +239,22 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) {
});
};

Backend.prototype.sendPresence = function(presence, callback) {
var channels = [ this.getDocChannel(presence.c, presence.d) ];
this.pubsub.publish(channels, presence, callback);
};

Backend.prototype.notifyUnsubscribe = function(clientId, collectionId, docId, callback) {
const message = {
a: 'nus',
c: collectionId,
d: docId,
src: clientId
}

this.pubsub.publish([this.getDocChannel(collectionId, docId)], message, callback)
}

Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) {
if (projection) {
try {
Expand Down
21 changes: 20 additions & 1 deletion lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,14 @@ Connection.prototype.handleMessage = function(message) {
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleOp(err, message);
return;

case 'pr':
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handlePresence(err, message)
return;
case 'nus':
var doc = this.getExisting(message.c, message.d)
if (doc) doc._handleUnsubscribeNotification(err, message)
return;
default:
console.warn('Ignoring unrecognized message', message);
}
Expand Down Expand Up @@ -418,9 +425,21 @@ Connection.prototype.sendOp = function(doc, op) {
if (op.op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = op.del;
if (op.pr) message.pr = op.pr
this.send(message);
};

Connection.prototype.sendPresence = function(doc, presence) {
var message = {
a: 'pr',
c: doc.collection,
d: doc.id,
v: doc.version,
src: this.id,
pr: presence
};
this.send(message);
}

/**
* Sends a message down the socket
Expand Down
Loading