From 0b54804f2768ad57e4a9818873b55aa1e943b1a6 Mon Sep 17 00:00:00 2001 From: prlanzarin Date: Wed, 11 Sep 2019 20:37:10 +0000 Subject: [PATCH] Rework TCP fragmentation handling to see if it fixes anything Properly end the socket when the session which it is binded to terminates --- src/TransportTCP.js | 96 +++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 34 deletions(-) diff --git a/src/TransportTCP.js b/src/TransportTCP.js index 1dc5ae7c7..5b6932808 100644 --- a/src/TransportTCP.js +++ b/src/TransportTCP.js @@ -168,18 +168,30 @@ Transport.prototype = { (this.reconnection_attempts === 0)?1:this.reconnection_attempts); this.server = net.createServer((socket) => { - socket.pendingSegments = []; + socket.pendingSegmentsBuffer = new Buffer(0); socket.setKeepAlive(true); socket.on('end', (e) => { - this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`); - transport.onClientSocketClose(socket, e); + if (this.sockets[socket.callIndex]) { + this.logger.log(`TCP socket ended for connection ${socket.callIndex || 'Unknown'}`); + transport.onClientSocketClose(socket, e); + delete this.sockets[socket.callIndex]; + } + }); + + socket.on('close', (e) => { + if (this.sockets[socket.callIndex]) { + this.logger.log(`TCP socket closed for connection ${socket.callIndex || 'Unknown'}`); + transport.onClientSocketClose(socket, e); + delete this.sockets[socket.callIndex]; + } }); const onSocketError = (e) => { this.logger.log("TCP socket returned error " + e + " and will close"); if (socket.callIndex) { - this.logger.log(`TCP socket ended for connection ${socket.callIndex}`); + this.logger.log(`TCP socket errored for connection ${socket.callIndex}`); transport.onClientSocketClose(socket, e); + socket.destroy(); if (this.sockets[socket.callIndex]) { delete this.sockets[socket.callIndex]; } @@ -188,10 +200,9 @@ Transport.prototype = { socket.on('error', onSocketError.bind(this)); - socket.on('data', function(e) { - let msg = e.toString(); + socket.on('data', function(data) { transport.onMessage({ - data: msg, + data, socket }); }); @@ -268,20 +279,12 @@ Transport.prototype = { */ onMessage: function(args) { let { data, socket } = args; + let dataString = data.toString(); + let pendingSegments; var messages = [], transaction; - if (socket.pendingSegments.length > 0) { - let pendingSegments = ''; - socket.pendingSegments.forEach(s => { - pendingSegments = `${pendingSegments}${s}`; - }); - socket.pendingSegments = []; - pendingSegments = pendingSegments.trim(); - data = `${pendingSegments}${data}`; - } - // CRLF Keep Alive response from server. Ignore it. - if(data === '\r\n') { + if(dataString === '\r\n') { SIP.Timers.clearTimeout(this.keepAliveTimeout); this.keepAliveTimeout = null; @@ -292,58 +295,75 @@ Transport.prototype = { return; } + if (socket.pendingSegmentsBuffer.length > 0) { + pendingSegments = Buffer.concat([socket.pendingSegmentsBuffer, data]); + } else { + pendingSegments = data; + } + + socket.pendingSegmentsBuffer = pendingSegments; + // TCP binary message. - else if (typeof data !== 'string') { + if (typeof pendingSegments !== 'string') { try { - data = String.fromCharCode.apply(null, new Uint8Array(data)); + pendingSegments = String.fromCharCode.apply(null, new Uint8Array(pendingSegments)); } catch(evt) { - this.logger.warn('received TCP binary message failed to be converted into string, message discarded'); + this.logger.warn('received TCP binary message failed to be converted into string, message discarded', evt); return; } if (this.ua.configuration.traceSip === true) { - this.logger.log('received TCP binary message:\n\n' + data + '\n'); + this.logger.log('received TCP binary message:\n\n' + pendingSegments + '\n'); } - } - - // TCP text message. - else { + } else { + // TCP text message. if (this.ua.configuration.traceSip === true) { - this.logger.log('received TCP text message:\n\n' + data + '\n'); + this.logger.log('received TCP text message:\n\n' + pendingSegments + '\n'); } } let endOfStream = false; while (!endOfStream) { let fragment; - fragment = SIP.Parser.parseMessage(data, this.ua); + fragment = SIP.Parser.parseMessage(pendingSegments, this.ua); + + // End of UA session. Clear segments buffer and skip if(this.ua.status === SIP.UA.C.STATUS_USER_CLOSED && fragment instanceof SIP.IncomingRequest) { + socket.pendingSegmentsBuffer = new Buffer(0); endOfStream = true; return; } - if (fragment == null && data.length > 0) { - socket.pendingSegments.push(data); + // Invalid/incomplete fragment which might be completed due to fragmentation. + // Keep the pendingSegmentsBuffer as it is and skip. + if (fragment == null && pendingSegments.length > 0) { endOfStream = true; break; } + // There is a fragment (which indicates a valid SIP message), but the Content-Length + // header does not match with the whole fragment size, so we skip and leave + // it to reprocess when a new segment comes through the socket if (SIP.Utils.str_utf8_length(fragment.body) < fragment.getHeader('content-length')) { - socket.pendingSegments.push(data); endOfStream = true; break; } - if (fragment) { messages.push(fragment); - data = data.slice(fragment.currentLength); + pendingSegments = pendingSegments.slice(fragment.currentLength); } - if (data.length === 0) { + if (pendingSegments.length === 0) { endOfStream = true; } } + if (pendingSegments.length === 0) { + socket.pendingSegmentsBuffer = new Buffer(0); + } else { + socket.pendingSegmentsBuffer = Buffer.from(pendingSegments, 'utf8'); + } + messages.forEach(message => { // Do some sanity check if(SIP.sanityCheck(message, this.ua, this)) { @@ -355,6 +375,14 @@ Transport.prototype = { const callIndex = `${message.call_id}|${message.from_tag}`; socket.callIndex = callIndex; this.sockets[callIndex] = socket; + // Get that socket, hook it up to the session termination + // to close it. Noice. + let sessionToWatch = this.ua.findSession(message); + if (sessionToWatch) { + sessionToWatch.once('terminated', () => { + socket.end(); + }); + } } break; default: