Skip to content

Commit

Permalink
Rework TCP fragmentation handling to see if it fixes anything
Browse files Browse the repository at this point in the history
Properly end the socket when the session which it is binded to terminates
  • Loading branch information
prlanzarin committed Sep 11, 2019
1 parent a5d2dfe commit 0b54804
Showing 1 changed file with 62 additions and 34 deletions.
96 changes: 62 additions & 34 deletions src/TransportTCP.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,30 @@ Transport.prototype = {
(this.reconnection_attempts === 0)?1:this.reconnection_attempts);

this.server = net.createServer((socket) => {
socket.pendingSegments = [];
socket.pendingSegmentsBuffer = new Buffer(0);
socket.setKeepAlive(true);
socket.on('end', (e) => {
this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
if (this.sockets[socket.callIndex]) {
this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
delete this.sockets[socket.callIndex];
}
});

socket.on('close', (e) => {
if (this.sockets[socket.callIndex]) {
this.logger.log(`TCP socket closed for connection ${socket.callIndex || 'Unknown'}`);
transport.onClientSocketClose(socket, e);
delete this.sockets[socket.callIndex];
}
});

const onSocketError = (e) => {
this.logger.log("TCP socket returned error " + e + " and will close");
if (socket.callIndex) {
this.logger.log(`TCP socket ended for connection ${socket.callIndex}`);
this.logger.log(`TCP socket errored for connection ${socket.callIndex}`);
transport.onClientSocketClose(socket, e);
socket.destroy();
if (this.sockets[socket.callIndex]) {
delete this.sockets[socket.callIndex];
}
Expand All @@ -188,10 +200,9 @@ Transport.prototype = {

socket.on('error', onSocketError.bind(this));

socket.on('data', function(e) {
let msg = e.toString();
socket.on('data', function(data) {
transport.onMessage({
data: msg,
data,
socket
});
});
Expand Down Expand Up @@ -268,20 +279,12 @@ Transport.prototype = {
*/
onMessage: function(args) {
let { data, socket } = args;
let dataString = data.toString();
let pendingSegments;
var messages = [], transaction;

if (socket.pendingSegments.length > 0) {
let pendingSegments = '';
socket.pendingSegments.forEach(s => {
pendingSegments = `${pendingSegments}${s}`;
});
socket.pendingSegments = [];
pendingSegments = pendingSegments.trim();
data = `${pendingSegments}${data}`;
}

// CRLF Keep Alive response from server. Ignore it.
if(data === '\r\n') {
if(dataString === '\r\n') {
SIP.Timers.clearTimeout(this.keepAliveTimeout);
this.keepAliveTimeout = null;

Expand All @@ -292,58 +295,75 @@ Transport.prototype = {
return;
}

if (socket.pendingSegmentsBuffer.length > 0) {
pendingSegments = Buffer.concat([socket.pendingSegmentsBuffer, data]);
} else {
pendingSegments = data;
}

socket.pendingSegmentsBuffer = pendingSegments;

// TCP binary message.
else if (typeof data !== 'string') {
if (typeof pendingSegments !== 'string') {
try {
data = String.fromCharCode.apply(null, new Uint8Array(data));
pendingSegments = String.fromCharCode.apply(null, new Uint8Array(pendingSegments));
} catch(evt) {
this.logger.warn('received TCP binary message failed to be converted into string, message discarded');
this.logger.warn('received TCP binary message failed to be converted into string, message discarded', evt);
return;
}

if (this.ua.configuration.traceSip === true) {
this.logger.log('received TCP binary message:\n\n' + data + '\n');
this.logger.log('received TCP binary message:\n\n' + pendingSegments + '\n');
}
}

// TCP text message.
else {
} else {
// TCP text message.
if (this.ua.configuration.traceSip === true) {
this.logger.log('received TCP text message:\n\n' + data + '\n');
this.logger.log('received TCP text message:\n\n' + pendingSegments + '\n');
}
}

let endOfStream = false;
while (!endOfStream) {
let fragment;
fragment = SIP.Parser.parseMessage(data, this.ua);
fragment = SIP.Parser.parseMessage(pendingSegments, this.ua);

// End of UA session. Clear segments buffer and skip
if(this.ua.status === SIP.UA.C.STATUS_USER_CLOSED && fragment instanceof SIP.IncomingRequest) {
socket.pendingSegmentsBuffer = new Buffer(0);
endOfStream = true;
return;
}

if (fragment == null && data.length > 0) {
socket.pendingSegments.push(data);
// Invalid/incomplete fragment which might be completed due to fragmentation.
// Keep the pendingSegmentsBuffer as it is and skip.
if (fragment == null && pendingSegments.length > 0) {
endOfStream = true;
break;
}

// There is a fragment (which indicates a valid SIP message), but the Content-Length
// header does not match with the whole fragment size, so we skip and leave
// it to reprocess when a new segment comes through the socket
if (SIP.Utils.str_utf8_length(fragment.body) < fragment.getHeader('content-length')) {
socket.pendingSegments.push(data);
endOfStream = true;
break;
}


if (fragment) {
messages.push(fragment);
data = data.slice(fragment.currentLength);
pendingSegments = pendingSegments.slice(fragment.currentLength);
}
if (data.length === 0) {
if (pendingSegments.length === 0) {
endOfStream = true;
}
}

if (pendingSegments.length === 0) {
socket.pendingSegmentsBuffer = new Buffer(0);
} else {
socket.pendingSegmentsBuffer = Buffer.from(pendingSegments, 'utf8');
}

messages.forEach(message => {
// Do some sanity check
if(SIP.sanityCheck(message, this.ua, this)) {
Expand All @@ -355,6 +375,14 @@ Transport.prototype = {
const callIndex = `${message.call_id}|${message.from_tag}`;
socket.callIndex = callIndex;
this.sockets[callIndex] = socket;
// Get that socket, hook it up to the session termination
// to close it. Noice.
let sessionToWatch = this.ua.findSession(message);
if (sessionToWatch) {
sessionToWatch.once('terminated', () => {
socket.end();
});
}
}
break;
default:
Expand Down

0 comments on commit 0b54804

Please sign in to comment.