Skip to content

Commit

Permalink
Merge pull request #15 from mconf/tcp-adjust
Browse files Browse the repository at this point in the history
Rework TCP fragmentation handling to see if it fixes anything
  • Loading branch information
prlanzarin authored Sep 13, 2019
2 parents fbd0f19 + 0b54804 commit 8071576
Showing 1 changed file with 62 additions and 34 deletions.
96 changes: 62 additions & 34 deletions src/TransportTCP.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,30 @@ Transport.prototype = {
(this.reconnection_attempts === 0)?1:this.reconnection_attempts);

this.server = net.createServer((socket) => {
socket.pendingSegments = [];
socket.pendingSegmentsBuffer = new Buffer(0);
socket.setKeepAlive(true);
socket.on('end', (e) => {
this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
if (this.sockets[socket.callIndex]) {
this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
delete this.sockets[socket.callIndex];
}
});

socket.on('close', (e) => {
if (this.sockets[socket.callIndex]) {
this.logger.log(`TCP socket closed for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
delete this.sockets[socket.callIndex];
}
});

const onSocketError = (e) => {
this.logger.log("TCP socket returned error " + e + " and will close");
if (socket.callIndex) {
this.logger.log(`TCP socket ended for connection ${socket.callIndex}`);
this.logger.log(`TCP socket errored for connection ${socket.callIndex}`);
transport.onClientSocketClose(socket, e);
socket.destroy();
if (this.sockets[socket.callIndex]) {
delete this.sockets[socket.callIndex];
}
Expand All @@ -188,10 +200,9 @@ Transport.prototype = {

socket.on('error', onSocketError.bind(this));

socket.on('data', function(e) {
let msg = e.toString();
socket.on('data', function(data) {
transport.onMessage({
data: msg,
data,
socket
});
});
Expand Down Expand Up @@ -268,20 +279,12 @@ Transport.prototype = {
*/
onMessage: function(args) {
let { data, socket } = args;
let dataString = data.toString();
let pendingSegments;
var messages = [], transaction;

if (socket.pendingSegments.length > 0) {
let pendingSegments = '';
socket.pendingSegments.forEach(s => {
pendingSegments = `${pendingSegments}${s}`;
});
socket.pendingSegments = [];
pendingSegments = pendingSegments.trim();
data = `${pendingSegments}${data}`;
}

// CRLF Keep Alive response from server. Ignore it.
if(data === '\r\n') {
if(dataString === '\r\n') {
SIP.Timers.clearTimeout(this.keepAliveTimeout);
this.keepAliveTimeout = null;

Expand All @@ -292,58 +295,75 @@ Transport.prototype = {
return;
}

if (socket.pendingSegmentsBuffer.length > 0) {
pendingSegments = Buffer.concat([socket.pendingSegmentsBuffer, data]);
} else {
pendingSegments = data;
}

socket.pendingSegmentsBuffer = pendingSegments;

// TCP binary message.
else if (typeof data !== 'string') {
if (typeof pendingSegments !== 'string') {
try {
data = String.fromCharCode.apply(null, new Uint8Array(data));
pendingSegments = String.fromCharCode.apply(null, new Uint8Array(pendingSegments));
} catch(evt) {
this.logger.warn('received TCP binary message failed to be converted into string, message discarded');
this.logger.warn('received TCP binary message failed to be converted into string, message discarded', evt);
return;
}

if (this.ua.configuration.traceSip === true) {
this.logger.log('received TCP binary message:\n\n' + data + '\n');
this.logger.log('received TCP binary message:\n\n' + pendingSegments + '\n');
}
}

// TCP text message.
else {
} else {
// TCP text message.
if (this.ua.configuration.traceSip === true) {
this.logger.log('received TCP text message:\n\n' + data + '\n');
this.logger.log('received TCP text message:\n\n' + pendingSegments + '\n');
}
}

let endOfStream = false;
while (!endOfStream) {
let fragment;
fragment = SIP.Parser.parseMessage(data, this.ua);
fragment = SIP.Parser.parseMessage(pendingSegments, this.ua);

// End of UA session. Clear segments buffer and skip
if(this.ua.status === SIP.UA.C.STATUS_USER_CLOSED && fragment instanceof SIP.IncomingRequest) {
socket.pendingSegmentsBuffer = new Buffer(0);
endOfStream = true;
return;
}

if (fragment == null && data.length > 0) {
socket.pendingSegments.push(data);
// Invalid/incomplete fragment which might be completed due to fragmentation.
// Keep the pendingSegmentsBuffer as it is and skip.
if (fragment == null && pendingSegments.length > 0) {
endOfStream = true;
break;
}

// There is a fragment (which indicates a valid SIP message), but the Content-Length
// header does not match with the whole fragment size, so we skip and leave
// it to reprocess when a new segment comes through the socket
if (SIP.Utils.str_utf8_length(fragment.body) < fragment.getHeader('content-length')) {
socket.pendingSegments.push(data);
endOfStream = true;
break;
}


if (fragment) {
messages.push(fragment);
data = data.slice(fragment.currentLength);
pendingSegments = pendingSegments.slice(fragment.currentLength);
}
if (data.length === 0) {
if (pendingSegments.length === 0) {
endOfStream = true;
}
}

if (pendingSegments.length === 0) {
socket.pendingSegmentsBuffer = new Buffer(0);
} else {
socket.pendingSegmentsBuffer = Buffer.from(pendingSegments, 'utf8');
}

messages.forEach(message => {
// Do some sanity check
if(SIP.sanityCheck(message, this.ua, this)) {
Expand All @@ -355,6 +375,14 @@ Transport.prototype = {
const callIndex = `${message.call_id}|${message.from_tag}`;
socket.callIndex = callIndex;
this.sockets[callIndex] = socket;
// Get that socket, hook it up to the session termination
// to close it. Noice.
let sessionToWatch = this.ua.findSession(message);
if (sessionToWatch) {
sessionToWatch.once('terminated', () => {
socket.end();
});
}
}
break;
default:
Expand Down

0 comments on commit 8071576

Please sign in to comment.