Skip to content

Commit

Permalink
Fix handling of pipelined requests
Browse files Browse the repository at this point in the history
fixes #10
  • Loading branch information
dougwilson committed Oct 22, 2014
1 parent 807f09e commit 2792e5d
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ matrix:
- node_js: "0.11"
fast_finish: true
script: "npm run-script test-travis"
after_script: "test $TRAVIS_NODE_VERSION = '0.10' && npm install coveralls@2 && cat ./coverage/lcov.info | coveralls"
after_script: "npm install coveralls@2 && cat ./coverage/lcov.info | coveralls"
5 changes: 5 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
unreleased
==========

* Fix handling of pipelined requests

2.1.0 / 2014-08-16
==================

Expand Down
80 changes: 72 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function isFinished(msg) {

if (typeof msg.finished === 'boolean') {
// OutgoingMessage
return Boolean(!socket || msg.finished || !socket.writable)
return Boolean(msg.finished || (socket && !socket.writable))
}

if (typeof msg.complete === 'boolean') {
Expand All @@ -74,6 +74,56 @@ function isFinished(msg) {
return undefined
}

/**
* Attach a finished listener to the message.
*
* @param {object} msg
* @param {function} callback
* @private
*/

function attachFinishedListener(msg, callback) {
var eeMsg
var eeSocket
var finished = false

function onFinish(error) {
eeMsg.cancel()
eeSocket.cancel()

finished = true
callback(error)
}

// finished on first message event
eeMsg = eeSocket = first([[msg, 'end', 'finish']], onFinish)

function onSocket(socket) {
// remove listener
msg.removeListener('socket', onSocket)

if (finished) return
if (eeMsg !== eeSocket) return

// finished on first socket event
eeSocket = first([[socket, 'error', 'close']], onFinish)
}

if (msg.socket) {
// socket already assigned
onSocket(msg.socket)
return
}

// wait for socket to be assigned
msg.on('socket', onSocket)

if (msg.socket === undefined) {
// node.js 0.8 patch
patchAssignSocket(msg, onSocket)
}
}

/**
* Attach the listener to the message.
*
Expand All @@ -84,17 +134,11 @@ function isFinished(msg) {

function attachListener(msg, listener) {
var attached = msg.__onFinished
var socket = msg.socket

// create a private single listener with queue
if (!attached || !attached.queue) {
attached = msg.__onFinished = createListener(msg)

// finished on first event
first([
[socket, 'error', 'close'],
[msg, 'end', 'finish'],
], attached)
attachFinishedListener(msg, attached)
}

attached.queue.push(listener)
Expand Down Expand Up @@ -125,3 +169,23 @@ function createListener(msg) {

return listener
}

/**
* Patch ServerResponse.prototype.assignSocket for node.js 0.8.
*
* @param {ServerResponse} res
* @param {function} callback
* @private
*/

function patchAssignSocket(res, callback) {
var assignSocket = res.assignSocket

if (typeof assignSocket !== 'function') return

// res.on('socket', callback) is broken in 0.8
res.assignSocket = function _assignSocket(socket) {
assignSocket.call(this, socket)
callback(socket)
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"license": "MIT",
"repository": "jshttp/on-finished",
"dependencies": {
"ee-first": "1.0.5"
"ee-first": "1.1.0"
},
"devDependencies": {
"istanbul": "0.3.2",
Expand Down
134 changes: 134 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,64 @@ describe('onFinished(res, listener)', function () {
})
})

describe('when requests pipelined', function () {
it('should fire for each request', function (done) {
var count = 0
var responses = []
var server = http.createServer(function (req, res) {
responses.push(res)

onFinished(res, function (err) {
assert.ifError(err)
assert.equal(responses[0], res)
responses.shift()

if (responses.length === 0) {
socket.end()
return
}

responses[0].end('response b')
})

onFinished(req, function (err) {
assert.ifError(err)

if (++count !== 2) {
return
}

assert.equal(responses.length, 2)
responses[0].end('response a')
})

if (responses.length === 1) {
// second request
writerequest(socket)
}

req.resume()
})
var socket

server.listen(function () {
var data = ''
socket = net.connect(this.address().port, function () {
writerequest(this)
})

socket.on('data', function (chunk) {
data += chunk.toString('binary')
})
socket.on('end', function () {
assert.ok(/response a/.test(data))
assert.ok(/response b/.test(data))
server.close(done)
})
})
})
})

describe('when response errors', function () {
it('should fire with error', function (done) {
var server = http.createServer(function (req, res) {
Expand Down Expand Up @@ -150,6 +208,82 @@ describe('isFinished(res)', function () {
sendget(server)
})

describe('when requests pipelined', function () {
it('should have correct state when socket shared', function (done) {
var count = 0
var responses = []
var server = http.createServer(function (req, res) {
responses.push(res)

onFinished(req, function (err) {
assert.ifError(err)

if (++count !== 2) {
return
}

assert.ok(!onFinished.isFinished(responses[0]))
assert.ok(!onFinished.isFinished(responses[1]))

responses[0].end()
responses[1].end()
socket.end()
server.close(done)
})

if (responses.length === 1) {
// second request
writerequest(socket)
}

req.resume()
})
var socket

server.listen(function () {
socket = net.connect(this.address().port, function () {
writerequest(this)
})
})
})

it('should handle aborted requests', function (done) {
var count = 0
var requests = 0
var server = http.createServer(function (req, res) {
requests++

onFinished(req, function (err) {
switch (++count) {
case 1:
assert.ifError(err)
// abort the socket
socket.on('error', noop)
socket.destroy()
break
case 2:
server.close(done)
break
}
})

req.resume()

if (requests === 1) {
// second request
writerequest(socket, true)
}
})
var socket

server.listen(function () {
socket = net.connect(this.address().port, function () {
writerequest(this)
})
})
})
})

describe('when response errors', function () {
it('should return true', function (done) {
var server = http.createServer(function (req, res) {
Expand Down

0 comments on commit 2792e5d

Please sign in to comment.