Skip to content
This repository has been archived by the owner on Nov 23, 2021. It is now read-only.

Commit

Permalink
Honor &stopProcessingBody flag, with tests (#51)
Browse files Browse the repository at this point in the history
Issue #48
  • Loading branch information
carlbrown authored and seabaylea committed Sep 22, 2017
1 parent 327e890 commit 280caad
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 38 deletions.
69 changes: 44 additions & 25 deletions Sources/HTTP/HTTPStreamingParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ public class StreamingParser: HTTPResponseWriter {
/// Protocol that we use to send data (and status info) back to the Network layer
public var parserConnector: ParserConnecting?

///Flag to track whether our handler has told us not to call it anymore
private let _shouldStopProcessingBodyLock = DispatchSemaphore(value: 1)
private var _shouldStopProcessingBody: Bool = false
private var shouldStopProcessingBody: Bool {
get {
_shouldStopProcessingBodyLock.wait()
defer {
_shouldStopProcessingBodyLock.signal()
}
return _shouldStopProcessingBody
}
set {
_shouldStopProcessingBodyLock.wait()
defer {
_shouldStopProcessingBodyLock.signal()
}
_shouldStopProcessingBody = newValue
}
}

var lastCallBack = CallbackRecord.idle
var lastHeaderName: String?
var parsedHeaders = HTTPHeaders()
Expand Down Expand Up @@ -223,10 +243,10 @@ public class StreamingParser: HTTPResponseWriter {
func messageCompleted() -> Int32 {
let didChangeState = processCurrentCallback(.messageCompleted)
if let chunkHandler = self.httpBodyProcessingCallback, didChangeState {
var stop = false
var dummy = false //We're sending `.end`, which means processing is stopping anyway, so the bool here is pointless
switch chunkHandler {
case .processBody(let handler):
handler(.end, &stop)
handler(.end, &dummy)
case .discardBody:
done()
}
Expand Down Expand Up @@ -279,21 +299,25 @@ public class StreamingParser: HTTPResponseWriter {
func bodyReceived(data: UnsafePointer<Int8>?, length: Int) -> Int32 {
processCurrentCallback(.bodyReceived)
guard let data = data else { return 0 }
if shouldStopProcessingBody {
return 0
}
data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in
let buff = UnsafeBufferPointer<UInt8>(start: ptr, count: length)
let chunk = DispatchData(bytes: buff)
if let chunkHandler = self.httpBodyProcessingCallback {
var stop = false
var finished = false
while !stop && !finished {
switch chunkHandler {
switch chunkHandler {
case .processBody(let handler):
handler(.chunk(data: chunk, finishedProcessing: {
finished = true
}), &stop)
//OK, this sucks. We can't access the value of the `inout` inside this block
// due to exclusivity. Which means that if we were to pass a local variable, we'd
// have to put a semaphore or something up here to wait for the block to be done before
// we could get its value and pass that on to the instance variable. So instead, we're
// just passing in a pointer to the internal ivar. But that ivar can't be modified in
// more than one place, so we have to put a semaphore around it to prevent that.
_shouldStopProcessingBodyLock.wait()
handler(.chunk(data: chunk, finishedProcessing: {self._shouldStopProcessingBodyLock.signal()}), &_shouldStopProcessingBody)
case .discardBody:
finished = true
}
break
}
}
}
Expand Down Expand Up @@ -330,7 +354,7 @@ public class StreamingParser: HTTPResponseWriter {
}

public func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) {
// TODO call completion()

guard !headersWritten else {
return
}
Expand Down Expand Up @@ -425,7 +449,6 @@ public class StreamingParser: HTTPResponseWriter {
}

self.parserConnector?.queueSocketWrite(dataToWrite)

completion(.ok)
}

Expand All @@ -446,20 +469,16 @@ public class StreamingParser: HTTPResponseWriter {
self.headersWritten = false
self.httpBodyProcessingCallback = nil
self.upgradeRequested = false
self.shouldStopProcessingBody = false

let closeAfter = {
if self.clientRequestedKeepAlive {
self.keepAliveUntil = Date(timeIntervalSinceNow: StreamingParser.keepAliveTimeout).timeIntervalSinceReferenceDate
self.parserConnector?.responseComplete()
} else {
self.parserConnector?.closeWriter()
}
//Note: This used to be passed into the completion block that `Result` used to have
// But since that block was removed, we're calling it directly
if self.clientRequestedKeepAlive {
self.keepAliveUntil = Date(timeIntervalSinceNow: StreamingParser.keepAliveTimeout).timeIntervalSinceReferenceDate
self.parserConnector?.responseComplete()
} else {
self.parserConnector?.closeWriter()
}

// FIXME I do not understand what code written here before was meant to do
// If it was about delayed closure invocation then it couldn't work either
// Here is the equivalent code
closeAfter()
completion(.ok)
}

Expand Down
17 changes: 10 additions & 7 deletions Sources/HTTP/PoCSocket/PoCSocketConnectionListener.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,26 @@ public class PoCSocketConnectionListener: ParserConnecting {
_errorOccurred = newValue
}
}

///Largest number of bytes we're willing to allocate for a Read
// it's an anti-heartbleed-type paranoia check
private var maxReadLength: Int = 1048576

/// initializer
///
/// - Parameters:
/// - socket: thin PoCSocket wrapper around system calls
/// - parser: Manager of the CHTTPParser library
internal init(socket: PoCSocket, parser: StreamingParser, readQueue: DispatchQueue, writeQueue: DispatchQueue) {
internal init(socket: PoCSocket, parser: StreamingParser, readQueue: DispatchQueue, writeQueue: DispatchQueue, maxReadLength: Int = 0) {
self.socket = socket
socketFD = socket.socketfd
socketReaderQueue = readQueue
socketWriterQueue = writeQueue
self.parser = parser
parser.parserConnector = self
if maxReadLength > 0 {
self.maxReadLength = maxReadLength
}
}

/// Check if socket is still open. Used to decide whether it should be closed/pruned after timeout
Expand Down Expand Up @@ -202,15 +209,11 @@ public class PoCSocketConnectionListener: ParserConnecting {

var length = 1 //initial value

///Largest number of bytes we're willing to allocate for a Read
// it's an anti-heartbleed-type paranoia check
let maxReadCount: Int = 1048576

do {
if strongSelf.socket?.socketfd ?? -1 > 0 {
var maxLength: Int = Int(strongSelf.readerSource?.data ?? 0)
if (maxLength > maxReadCount) || (maxLength <= 0) {
maxLength = maxReadCount
if (maxLength > strongSelf.maxReadLength) || (maxLength <= 0) {
maxLength = strongSelf.maxReadLength
}
var readBuffer: UnsafeMutablePointer<Int8> = UnsafeMutablePointer<Int8>.allocate(capacity: maxLength)
length = try strongSelf.socket?.socketRead(into: &readBuffer, maxLength:maxLength) ?? -1
Expand Down
3 changes: 2 additions & 1 deletion Sources/HTTP/PoCSocket/PoCSocketSimpleServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PoCSocketSimpleServer: CurrentConnectionCounting {
public func start(port: Int = 0,
queueCount: Int = 0,
acceptCount: Int = 0,
maxReadLength: Int = 1048576,
handler: @escaping HTTPRequestHandler) throws {

// Don't let a signal generated by a broken socket kill the server
Expand Down Expand Up @@ -111,7 +112,7 @@ public class PoCSocketSimpleServer: CurrentConnectionCounting {
let streamingParser = StreamingParser(handler: handler, connectionCounter: self)
let readQueue = readQueues[listenerCount % self.queueMax]
let writeQueue = writeQueues[listenerCount % self.queueMax]
let listener = PoCSocketConnectionListener(socket: clientSocket, parser: streamingParser, readQueue:readQueue, writeQueue: writeQueue)
let listener = PoCSocketConnectionListener(socket: clientSocket, parser: streamingParser, readQueue:readQueue, writeQueue: writeQueue, maxReadLength: maxReadLength)
listenerCount += 1
acceptSemaphore.wait()
acceptQueue.async { [weak listener] in
Expand Down
37 changes: 37 additions & 0 deletions Tests/HTTPTests/Helpers/AbortAndSendHelloHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// This source file is part of the Swift.org Server APIs open source project
//
// Copyright (c) 2017 Swift Server API project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
//

import Foundation
import HTTP

/// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R
class AbortAndSendHelloHandler: HTTPRequestHandling {

var chunkCalledCount=0
var chunkLength=0

func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing {
//Assume the router gave us the right request - at least for now
response.writeHeader(status: .ok, headers: [.transferEncoding: "chunked", "X-foo": "bar"])
return .processBody { (chunk, stop) in
switch chunk {
case .chunk(let data, let finishedProcessing):
stop = true
self.chunkCalledCount += 1
self.chunkLength += data.count
finishedProcessing()
case .end:
response.writeBody("Hello, World!")
response.done()
default:
stop = true /* don't call us anymore */
response.abort()
}
}
}
}
22 changes: 22 additions & 0 deletions Tests/HTTPTests/Helpers/UnchunkedHelloWorldHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// This source file is part of the Swift.org Server APIs open source project
//
// Copyright (c) 2017 Swift Server API project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
//

import Foundation
import HTTP

/// Simple `HTTPRequestHandler` that prints "Hello, World" as per K&R
class UnchunkedHelloWorldHandler: HTTPRequestHandling {
func handle(request: HTTPRequest, response: HTTPResponseWriter ) -> HTTPBodyProcessing {
//Assume the router gave us the right request - at least for now
let responseString = "Hello, World!"
response.writeHeader(status: .ok, headers: [.contentLength: "\(responseString.lengthOfBytes(using: .utf8))"])
response.writeBody(responseString)
response.done()
return .discardBody
}
}
70 changes: 65 additions & 5 deletions Tests/HTTPTests/ServerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class ServerTests: XCTestCase {
XCTAssertNotNil(response)
let headers = response?.allHeaderFields ?? ["": ""]
let connectionHeader: String = headers["Connection"] as? String ?? ""
let keepAliveHeader = headers["Keep-Alive"]
let keepAliveHeader = headers["Connection"]
XCTAssertEqual(connectionHeader, "Keep-Alive", "No Keep-Alive Connection")
XCTAssertNotNil(keepAliveHeader)
XCTAssertNotNil(responseBody, "No Keep-Alive Header")
Expand All @@ -320,7 +320,7 @@ class ServerTests: XCTestCase {
XCTAssertNotNil(response2)
let headers = response2?.allHeaderFields ?? ["": ""]
let connectionHeader: String = headers["Connection"] as? String ?? ""
let keepAliveHeader = headers["Keep-Alive"]
let keepAliveHeader = headers["Connection"]
XCTAssertEqual(connectionHeader, "Keep-Alive", "No Keep-Alive Connection")
XCTAssertNotNil(keepAliveHeader, "No Keep-Alive Header")
XCTAssertEqual(server.connectionCount, 2)
Expand All @@ -339,7 +339,7 @@ class ServerTests: XCTestCase {
XCTAssertNotNil(response)
let headers = response?.allHeaderFields ?? ["": ""]
let connectionHeader: String = headers["Connection"] as? String ?? ""
let keepAliveHeader = headers["Keep-Alive"]
let keepAliveHeader = headers["Connection"]
XCTAssertEqual(connectionHeader, "Keep-Alive", "No Keep-Alive Connection")
XCTAssertNotNil(keepAliveHeader, "No Keep-Alive Header")
XCTAssertEqual(server.connectionCount, 3)
Expand Down Expand Up @@ -371,6 +371,9 @@ class ServerTests: XCTestCase {
func testRequestLargeEchoEndToEnd() {
let receivedExpectation = self.expectation(description: "Received web response \(#function)")

//Use a small chunk size to make sure that we're testing multiple HTTPBodyHandler calls
let chunkSize = 1024

// Get a file we know exists
let executableURL = URL(fileURLWithPath: CommandLine.arguments[0])
let testExecutableData: Data
Expand All @@ -392,9 +395,9 @@ class ServerTests: XCTestCase {

let testData = Data(testDataLong)

let server = HTTPServer()
let server = PoCSocketSimpleServer()
do {
try server.start(port: 0, handler: EchoHandler().handle)
try server.start(port: 0, maxReadLength: chunkSize, handler: EchoHandler().handle)
let session = URLSession(configuration: URLSessionConfiguration.default)
let url = URL(string: "http://localhost:\(server.port)/echo")!
print("Test \(#function) on port \(server.port)")
Expand All @@ -421,6 +424,62 @@ class ServerTests: XCTestCase {
XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle")
}
}

func testRequestLargePostHelloWorld() {
let receivedExpectation = self.expectation(description: "Received web response \(#function)")

//Use a small chunk size to make sure that we stop after one HTTPBodyHandler call
let chunkSize = 1024

// Get a file we know exists
let executableURL = URL(fileURLWithPath: CommandLine.arguments[0])
let testExecutableData: Data

do {
testExecutableData = try Data(contentsOf: executableURL)
} catch {
XCTFail("Could not create Data from contents of \(executableURL)")
return
}

//Make sure there's data there
XCTAssertNotNil(testExecutableData)

let executableLength = testExecutableData.count

let server = PoCSocketSimpleServer()
do {
let testHandler = AbortAndSendHelloHandler()
try server.start(port: 0, maxReadLength: chunkSize, handler: testHandler.handle)
let session = URLSession(configuration: URLSessionConfiguration.default)
let url = URL(string: "http://localhost:\(server.port)/echo")!
print("Test \(#function) on port \(server.port)")
var request = URLRequest(url: url)
request.httpMethod = "POST"
let uploadTask = session.uploadTask(with: request, fromFile: executableURL) { (responseBody, rawResponse, error) in
let response = rawResponse as? HTTPURLResponse
XCTAssertNil(error, "\(error!.localizedDescription)")
XCTAssertNotNil(response)
XCTAssertNotNil(responseBody)
XCTAssertEqual(Int(HTTPResponseStatus.ok.code), response?.statusCode ?? 0)
XCTAssertEqual("Hello, World!", String(data: responseBody ?? Data(), encoding: .utf8) ?? "Nil")
XCTAssertEqual(Int(testHandler.chunkCalledCount), 1)
XCTAssertLessThan(testHandler.chunkLength, executableLength, "Should have written less than the length of the file")
XCTAssertEqual(Int(testHandler.chunkLength), chunkSize)
receivedExpectation.fulfill()
}
uploadTask.resume()
self.waitForExpectations(timeout: 10) { (error) in
if let error = error {
XCTFail("\(error)")
}
}
server.stop()
} catch {
XCTFail("Error listening on port \(0): \(error). Use server.failed(callback:) to handle")
}
}


static var allTests = [
("testEcho", testEcho),
Expand All @@ -433,5 +492,6 @@ class ServerTests: XCTestCase {
("testRequestEchoEndToEnd", testRequestEchoEndToEnd),
("testRequestKeepAliveEchoEndToEnd", testRequestKeepAliveEchoEndToEnd),
("testRequestLargeEchoEndToEnd", testRequestLargeEchoEndToEnd),
("testRequestLargePostHelloWorld", testRequestLargePostHelloWorld),
]
}

0 comments on commit 280caad

Please sign in to comment.