Skip to content

Commit

Permalink
Merge pull request #325 from arthurbolsoni/feat/post_event
Browse files Browse the repository at this point in the history
feat: post_event implementation
  • Loading branch information
mariuz authored Mar 29, 2024
2 parents a1ad887 + a9c82e6 commit 027e72a
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ declare module 'node-firebird' {
sequentially(query: string, params: any[], rowCallback: SequentialCallback, callback: SimpleCallback, asArray?: boolean): Database;
drop(callback: SimpleCallback): void;
escape(value: any): string;
attachEvent(callback: any): this;
}

export interface Transaction {
Expand Down
75 changes: 75 additions & 0 deletions lib/wire/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -1883,4 +1883,79 @@ function bufferReader(buffer, max, writer, cb, beg, end) {
});
}

Connection.prototype.auxConnection = function (callback) {
var self = this;
if (self._isClosed)
return this.throwClosed(callback);
var msg = self._msg;
msg.pos = 0;
msg.addInt(Const.op_connect_request);
msg.addInt(1); // async
msg.addInt(self.dbhandle);
msg.addInt(0);
function cb(err, ret) {

if (err) {
doError(err, callback);
return;
}

var socket_info = {
family: ret.buffer.readInt16BE(0),
port: ret.buffer.readUInt16BE(2),
host: ret.buffer.readUInt8(4) + '.' + ret.buffer.readUInt8(5) + '.' + ret.buffer.readUInt8(6) + '.' + ret.buffer.readUInt8(7)
}

callback(undefined, socket_info);
}
this._queueEvent(cb);
}

Connection.prototype.queEvents = function (events, eventid, callback) {
var self = this;
if (this._isClosed)
return this.throwClosed(callback);
var msg = this._msg;
var blr = this._blr;
blr.pos = 0;
msg.pos = 0;
msg.addInt(Const.op_que_events);
msg.addInt(this.dbhandle);
// prepare EPB
blr.addByte(1) // epb_version
for (var event in events) {
var event_buffer = new Buffer(event, 'UTF8');
blr.addByte(event_buffer.length);
blr.addBytes(event_buffer);
blr.addInt32(events[event]);
}
msg.addBlr(blr); // epb
msg.addInt(0); // ast
msg.addInt(0); // args
msg.addInt(eventid);
this._queueEvent(callback);
}

Connection.prototype.closeEvents = function (eventid, callback) {
var self = this;
if (this._isClosed)
return this.throwClosed(callback);
var msg = self._msg;
msg.pos = 0;
msg.addInt(Const.op_cancel_events);
msg.addInt(self.dbhandle);
msg.addInt(eventid);

function cb(err, ret) {
if (err) {
doError(err, callback);
return;
}

callback(err);
}

this._queueEvent(cb);
}

module.exports = Connection;
36 changes: 34 additions & 2 deletions lib/wire/database.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const Events = require('events');
const {doError} = require('../callback');
const {escape} = require('../utils');
const { doError } = require('../callback');
const { escape } = require('../utils');
const EventConnection = require('./eventConnection');
const FbEventManager = require('./fbEventManager');

/***************************************
*
Expand All @@ -11,6 +13,7 @@ const {escape} = require('../utils');
function Database(connection) {
this.connection = connection;
connection.db = this;
this.eventid = 1;
}

Database.prototype.__proto__ = Object.create(Events.EventEmitter.prototype, {
Expand Down Expand Up @@ -163,4 +166,33 @@ Database.prototype.drop = function(callback) {
return this.connection.dropDatabase(callback);
};

Database.prototype.attachEvent = function (callback) {
var self = this;
this.connection.auxConnection(function (err, socket_info) {

if (err) {
doError(err, callback);
return;
}

const eventConnection = new EventConnection(self.connection.host, socket_info.port, function (err) {
if (err) {
doError(err, callback);
return;
}

const evt = new FbEventManager(self, eventConnection, self.eventid++, function (err) {
if (err) {
doError(err, callback);
return;
}

callback(err, evt);
});
}, self);
});

return this;
}

module.exports = Database;
113 changes: 113 additions & 0 deletions lib/wire/eventConnection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
const net = require('net');
const { XdrReader } = require('./serialize');
const DEFAULT_ENCODING = 'utf8';
const Const = require('./const');

var EventConnection = function (host, port, callback, db) {
var self = this;
this.db = db;
this.emgr = null;
this._isClosed = false;
this._isOpened = false;
this._socket = net.createConnection(port, host);
this._bind_events(host, port, callback);
this.error;
this.eventcallback;
};

EventConnection.prototype._bind_events = function (host, port, callback) {
var self = this;

self._socket.on('close', function () {

self._isClosed = true;
})

self._socket.on('error', function (e) {

self.error = e;
})

self._socket.on('connect', function () {
self._isClosed = false;
self._isOpened = true;
if (callback)
callback();
});

self._socket.on('data', function (data) {
var xdr, buf;

if (!self._xdr) {
xdr = new XdrReader(data);
} else {
xdr = self._xdr;
delete (self._xdr);
buf = Buffer.from(data.length + xdr.buffer.length);
xdr.buffer.copy(buf);
data.copy(buf, xdr.buffer.length);
xdr.buffer = buf;
}

try {
var item, op;
var op_pos = xdr.pos;
var tmp_event;
while (xdr.pos < xdr.buffer.length) {
do {
var r = xdr.readInt();
} while (r === Const.op_dummy);

switch (r) {
case Const.op_event:
xdr.readInt(); // db handle
var buf = xdr.readArray();
// first byte is always set to 1
tmp_event = {};
var lst_event = [];
var eventname = '';
var eventcount = 0;
var pos = 1;
while (pos < buf.length) {
var len = buf.readInt8(pos++);
eventname = buf.toString(DEFAULT_ENCODING, pos, pos + len);
var prevcount = self.emgr.events[eventname] || 0;
pos += len;
eventcount = buf.readInt32LE(pos);
tmp_event[eventname] = eventcount;
pos += 4;
if (prevcount !== eventcount)
lst_event.push({ name: eventname, count: eventcount });
}
xdr.readInt64(); // ignore AST INFO
var event_id = xdr.readInt();
// set the new count in global event hash
for (var evt in tmp_event) {
self.emgr.events[evt] = tmp_event[evt];
}
if (self.eventcallback)
return self.eventcallback(null, { eventid: event_id, events: lst_event });

default:
return cb(new Error('Unexpected:' + r));
}
}
} catch (err) {
if (err instanceof RangeError) { // incomplete packet case
xdr.buffer = xdr.buffer = xdr.buffer.slice(op_pos);
xdr.pos = 0;
self._xdr = xdr;
}
}
})
}

EventConnection.prototype.throwClosed = function (callback) {
var err = new Error('Event Connection is closed.');
this.db.emit('error', err);
if (callback)
callback(err);
return this;
};

module.exports = EventConnection;
98 changes: 98 additions & 0 deletions lib/wire/fbEventManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
const Events = require('events');
const { doError } = require('../callback');


function FbEventManager(db, eventconnection, eventid, callback) {
this.db = db;
this.eventconnection = eventconnection;
this.events = {};
this.eventid = eventid;
this._createEventLoop(callback);
}

FbEventManager.prototype.__proto__ = Object.create(Events.EventEmitter.prototype, {
constructor: {
value: FbEventManager,
enumberable: false
}
});

FbEventManager.prototype._createEventLoop = function (callback) {
var self = this;
var cnx = this.db.connection;
this.eventconnection.emgr = this;
// create the loop
function loop(first) {
cnx.queEvents(self.events, self.eventid, function (err, ret) {
if (err) {
doError(err, callback);
return;
}
if (first)
callback();
})
}

this.eventconnection.eventcallback = function (err, ret) {
if (err || (self.eventid !== ret.eventid)) {
doError(err || new Error('Bad eventid'), callback);
return;
}

ret.events.forEach(function (event) {
self.emit('post_event', event.name, event.count)
})

loop(false);
}

loop(true);
}

FbEventManager.prototype._changeEvent = function (callback) {
var self = this;

self.db.connection.closeEvents(this.eventid, function (err) {
if (err) {
doError(err, callback);
return;
}

self.db.connection.queEvents(self.events, self.eventid, callback);
})
}

FbEventManager.prototype.registerEvent = function (events, callback) {
var self = this;

if (self.db.connection._isClosed || self.eventconnection._isClosed)
return self.eventconnection.throwClosed(callback);

events.forEach((event) => self.events[event] = self.events[event] || 0);
self._changeEvent(callback);
}

FbEventManager.prototype.unregisterEvent = function (events, callback) {
var self = this;

if (self.db.connection._isClosed || self.eventconnection._isClosed)
return self.eventconnection.throwClosed(callback);

events.forEach(function (event) { delete self.events[event] });
self._changeEvent(callback);
}

FbEventManager.prototype.close = function (callback) {
var self = this;

self.db.connection.closeEvents(this.eventid, function (err) {
if (err) {
doError(err, callback);
return;
}

self.eventconnection._socket.end();
});
}

module.exports = FbEventManager;
Loading

0 comments on commit 027e72a

Please sign in to comment.