From e4453ebf28d4c9a85966e8e14a275d89dc8b2baf Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 29 Apr 2025 08:46:01 +0100 Subject: [PATCH 1/3] Make the ResponseAccumulator Sendable Motivation: The response accumulator is a delegate which must be sendable as it's passed across isolation domains. Modifications: - Make delegates have a sendable requirement - Make the response accumulator sendable Result: Delegates, and the response accumulator, are sendable --- Sources/AsyncHTTPClient/HTTPHandler.swift | 216 ++++++++++++---------- 1 file changed, 120 insertions(+), 96 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 432f0ff11..65ed85149 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -538,8 +538,12 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { } } - var history = [HTTPClient.RequestResponse]() - var state = State.idle + private struct MutableState: Sendable { + var history = [HTTPClient.RequestResponse]() + var state = State.idle + } + + private let state: NIOLockedValueBox let requestMethod: HTTPMethod let requestHost: String @@ -573,6 +577,7 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { self.requestMethod = request.method self.requestHost = request.host self.maxBodySize = maxBodySize + self.state = NIOLockedValueBox(MutableState()) } public func didVisitURL( @@ -580,100 +585,110 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { _ request: HTTPClient.Request, _ head: HTTPResponseHead ) { - self.history.append(.init(request: request, responseHead: head)) + self.state.withLockedValue { + $0.history.append(.init(request: request, responseHead: head)) + } } public func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - switch self.state { - case .idle: - if self.requestMethod != .HEAD, - let contentLength = head.headers.first(name: "Content-Length"), - let announcedBodySize = Int(contentLength), - announcedBodySize > self.maxBodySize - { - let error = ResponseTooBigError(maxBodySize: maxBodySize) - self.state = .error(error) - return task.eventLoop.makeFailedFuture(error) - } + self.state.withLockedValue { + switch $0.state { + case .idle: + if self.requestMethod != .HEAD, + let contentLength = head.headers.first(name: "Content-Length"), + let announcedBodySize = Int(contentLength), + announcedBodySize > self.maxBodySize + { + let error = ResponseTooBigError(maxBodySize: maxBodySize) + $0.state = .error(error) + return task.eventLoop.makeFailedFuture(error) + } - self.state = .head(head) - case .head: - preconditionFailure("head already set") - case .body: - preconditionFailure("no head received before body") - case .end: - preconditionFailure("request already processed") - case .error: - break + $0.state = .head(head) + case .head: + preconditionFailure("head already set") + case .body: + preconditionFailure("no head received before body") + case .end: + preconditionFailure("request already processed") + case .error: + break + } + return task.eventLoop.makeSucceededFuture(()) } - return task.eventLoop.makeSucceededFuture(()) } public func didReceiveBodyPart(task: HTTPClient.Task, _ part: ByteBuffer) -> EventLoopFuture { - switch self.state { - case .idle: - preconditionFailure("no head received before body") - case .head(let head): - guard part.readableBytes <= self.maxBodySize else { - let error = ResponseTooBigError(maxBodySize: self.maxBodySize) - self.state = .error(error) - return task.eventLoop.makeFailedFuture(error) - } - self.state = .body(head, part) - case .body(let head, var body): - let newBufferSize = body.writerIndex + part.readableBytes - guard newBufferSize <= self.maxBodySize else { - let error = ResponseTooBigError(maxBodySize: self.maxBodySize) - self.state = .error(error) - return task.eventLoop.makeFailedFuture(error) - } + self.state.withLockedValue { + switch $0.state { + case .idle: + preconditionFailure("no head received before body") + case .head(let head): + guard part.readableBytes <= self.maxBodySize else { + let error = ResponseTooBigError(maxBodySize: self.maxBodySize) + $0.state = .error(error) + return task.eventLoop.makeFailedFuture(error) + } + $0.state = .body(head, part) + case .body(let head, var body): + let newBufferSize = body.writerIndex + part.readableBytes + guard newBufferSize <= self.maxBodySize else { + let error = ResponseTooBigError(maxBodySize: self.maxBodySize) + $0.state = .error(error) + return task.eventLoop.makeFailedFuture(error) + } - // The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's - // a cross-module call in the way) so we need to drop the original reference to `body` in - // `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which - // has no associated data). We'll fix it at the bottom of this block. - self.state = .end - var part = part - body.writeBuffer(&part) - self.state = .body(head, body) - case .end: - preconditionFailure("request already processed") - case .error: - break + // The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's + // a cross-module call in the way) so we need to drop the original reference to `body` in + // `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which + // has no associated data). We'll fix it at the bottom of this block. + $0.state = .end + var part = part + body.writeBuffer(&part) + $0.state = .body(head, body) + case .end: + preconditionFailure("request already processed") + case .error: + break + } + return task.eventLoop.makeSucceededFuture(()) } - return task.eventLoop.makeSucceededFuture(()) } public func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.state = .error(error) + self.state.withLockedValue { + $0.state = .error(error) + } } public func didFinishRequest(task: HTTPClient.Task) throws -> Response { - switch self.state { - case .idle: - preconditionFailure("no head received before end") - case .head(let head): - return Response( - host: self.requestHost, - status: head.status, - version: head.version, - headers: head.headers, - body: nil, - history: self.history - ) - case .body(let head, let body): - return Response( - host: self.requestHost, - status: head.status, - version: head.version, - headers: head.headers, - body: body, - history: self.history - ) - case .end: - preconditionFailure("request already processed") - case .error(let error): - throw error + try self.state.withLockedValue { + switch $0.state { + case .idle: + preconditionFailure("no head received before end") + case .head(let head): + return Response( + host: self.requestHost, + status: head.status, + version: head.version, + headers: head.headers, + body: nil, + history: $0.history + ) + case .body(let head, let body): + return Response( + host: self.requestHost, + status: head.status, + version: head.version, + headers: head.headers, + body: body, + history: $0.history + ) + case .end: + preconditionFailure("request already processed") + case .error(let error): + throw error + } } } } @@ -709,8 +724,9 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { /// released together with the `HTTPTaskHandler` when channel is closed. /// Users of the library are not required to keep a reference to the /// object that implements this protocol, but may do so if needed. -public protocol HTTPClientResponseDelegate: AnyObject { - associatedtype Response +@preconcurrency +public protocol HTTPClientResponseDelegate: AnyObject, Sendable { + associatedtype Response: Sendable /// Called when the request head is sent. Will be called once. /// @@ -885,7 +901,7 @@ extension URL { } } -protocol HTTPClientTaskDelegate { +protocol HTTPClientTaskDelegate: Sendable { func fail(_ error: Error) } @@ -894,7 +910,8 @@ extension HTTPClient { /// /// Will be created by the library and could be used for obtaining /// `EventLoopFuture` of the execution or cancellation of the execution. - public final class Task { + @preconcurrency + public final class Task { /// The `EventLoop` the delegate will be executed on. public let eventLoop: EventLoop /// The `Logger` used by the `Task` for logging. @@ -902,22 +919,26 @@ extension HTTPClient { let promise: EventLoopPromise + struct State: Sendable { + var isCancelled: Bool + var taskDelegate: HTTPClientTaskDelegate? + } + + private let state: NIOLockedValueBox + var isCancelled: Bool { - self.lock.withLock { self._isCancelled } + self.state.withLockedValue { $0.isCancelled } } var taskDelegate: HTTPClientTaskDelegate? { get { - self.lock.withLock { self._taskDelegate } + self.state.withLockedValue { $0.taskDelegate } } set { - self.lock.withLock { self._taskDelegate = newValue } + self.state.withLockedValue { $0.taskDelegate = newValue } } } - private var _isCancelled: Bool = false - private var _taskDelegate: HTTPClientTaskDelegate? - private let lock = NIOLock() private let makeOrGetFileIOThreadPool: () -> NIOThreadPool /// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access. @@ -930,6 +951,7 @@ extension HTTPClient { self.promise = eventLoop.makePromise() self.logger = logger self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool + self.state = NIOLockedValueBox(State(isCancelled: false, taskDelegate: nil)) } static func failedTask( @@ -957,7 +979,8 @@ extension HTTPClient { /// - returns: The value of ``futureResult`` when it completes. /// - throws: The error value of ``futureResult`` if it errors. @available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") - public func wait() throws -> Response { + @preconcurrency + public func wait() throws -> Response where Response: Sendable { try self.promise.futureResult.wait() } @@ -968,7 +991,8 @@ extension HTTPClient { /// - returns: The value of ``futureResult`` when it completes. /// - throws: The error value of ``futureResult`` if it errors. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - public func get() async throws -> Response { + @preconcurrency + public func get() async throws -> Response where Response: Sendable { try await self.promise.futureResult.get() } @@ -985,9 +1009,9 @@ extension HTTPClient { /// /// - Parameter error: the error that is used to fail the promise public func fail(reason error: Error) { - let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in - self._isCancelled = true - return self._taskDelegate + let taskDelegate = self.state.withLockedValue { state in + state.isCancelled = true + return state.taskDelegate } taskDelegate?.fail(error) @@ -1017,7 +1041,7 @@ internal struct TaskCancelEvent {} // MARK: - RedirectHandler -internal struct RedirectHandler { +internal struct RedirectHandler { let request: HTTPClient.Request let redirectState: RedirectState let execute: (HTTPClient.Request, RedirectState) -> HTTPClient.Task From 4125ff476609ed308f8a4ec94478546b3aaf5f00 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 30 Apr 2025 14:04:19 +0100 Subject: [PATCH 2/3] Remove an unchecked --- Sources/AsyncHTTPClient/HTTPHandler.swift | 41 ++++++++++------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 65ed85149..e64db75df 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -591,15 +591,22 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { } public func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - self.state.withLockedValue { + let responseTooBig: Bool + + if self.requestMethod != .HEAD, + let contentLength = head.headers.first(name: "Content-Length"), + let announcedBodySize = Int(contentLength), + announcedBodySize > self.maxBodySize { + responseTooBig = true + } else { + responseTooBig = false + } + + return self.state.withLockedValue { switch $0.state { case .idle: - if self.requestMethod != .HEAD, - let contentLength = head.headers.first(name: "Content-Length"), - let announcedBodySize = Int(contentLength), - announcedBodySize > self.maxBodySize - { - let error = ResponseTooBigError(maxBodySize: maxBodySize) + if responseTooBig { + let error = ResponseTooBigError(maxBodySize: self.maxBodySize) $0.state = .error(error) return task.eventLoop.makeFailedFuture(error) } @@ -910,8 +917,7 @@ extension HTTPClient { /// /// Will be created by the library and could be used for obtaining /// `EventLoopFuture` of the execution or cancellation of the execution. - @preconcurrency - public final class Task { + public final class Task: Sendable { /// The `EventLoop` the delegate will be executed on. public let eventLoop: EventLoop /// The `Logger` used by the `Task` for logging. @@ -939,14 +945,14 @@ extension HTTPClient { } } - private let makeOrGetFileIOThreadPool: () -> NIOThreadPool + private let makeOrGetFileIOThreadPool: @Sendable () -> NIOThreadPool /// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access. internal var fileIOThreadPool: NIOThreadPool { self.makeOrGetFileIOThreadPool() } - init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool) { + init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping @Sendable () -> NIOThreadPool) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.logger = logger @@ -958,7 +964,7 @@ extension HTTPClient { eventLoop: EventLoop, error: Error, logger: Logger, - makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool + makeOrGetFileIOThreadPool: @escaping @Sendable () -> NIOThreadPool ) -> Task { let task = self.init( eventLoop: eventLoop, @@ -1017,15 +1023,6 @@ extension HTTPClient { taskDelegate?.fail(error) } - func succeed( - promise: EventLoopPromise?, - with value: Response, - delegateType: Delegate.Type, - closing: Bool - ) { - promise?.succeed(value) - } - func fail( with error: Error, delegateType: Delegate.Type @@ -1035,8 +1032,6 @@ extension HTTPClient { } } -extension HTTPClient.Task: @unchecked Sendable {} - internal struct TaskCancelEvent {} // MARK: - RedirectHandler From 1b741e49d5ebd2c365cd92003204038e0ec661cb Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 30 Apr 2025 14:25:20 +0100 Subject: [PATCH 3/3] format --- Sources/AsyncHTTPClient/HTTPHandler.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index e64db75df..fdca88982 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -596,7 +596,8 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { if self.requestMethod != .HEAD, let contentLength = head.headers.first(name: "Content-Length"), let announcedBodySize = Int(contentLength), - announcedBodySize > self.maxBodySize { + announcedBodySize > self.maxBodySize + { responseTooBig = true } else { responseTooBig = false