Skip to content

Commit 7e6f9cf

Browse files
authored
Make the ResponseAccumulator Sendable (#838)
Motivation: The response accumulator is a delegate which must be sendable as it's passed across isolation domains. Modifications: - Make delegates have a sendable requirement - Make the response accumulator sendable Result: Delegates, and the response accumulator, are sendable
1 parent c61298e commit 7e6f9cf

File tree

1 file changed

+130
-110
lines changed

1 file changed

+130
-110
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

+130-110
Original file line numberDiff line numberDiff line change
@@ -538,8 +538,12 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
538538
}
539539
}
540540

541-
var history = [HTTPClient.RequestResponse]()
542-
var state = State.idle
541+
private struct MutableState: Sendable {
542+
var history = [HTTPClient.RequestResponse]()
543+
var state = State.idle
544+
}
545+
546+
private let state: NIOLockedValueBox<MutableState>
543547
let requestMethod: HTTPMethod
544548
let requestHost: String
545549

@@ -573,107 +577,126 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
573577
self.requestMethod = request.method
574578
self.requestHost = request.host
575579
self.maxBodySize = maxBodySize
580+
self.state = NIOLockedValueBox(MutableState())
576581
}
577582

578583
public func didVisitURL(
579584
task: HTTPClient.Task<HTTPClient.Response>,
580585
_ request: HTTPClient.Request,
581586
_ head: HTTPResponseHead
582587
) {
583-
self.history.append(.init(request: request, responseHead: head))
588+
self.state.withLockedValue {
589+
$0.history.append(.init(request: request, responseHead: head))
590+
}
584591
}
585592

586593
public func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
587-
switch self.state {
588-
case .idle:
589-
if self.requestMethod != .HEAD,
590-
let contentLength = head.headers.first(name: "Content-Length"),
591-
let announcedBodySize = Int(contentLength),
592-
announcedBodySize > self.maxBodySize
593-
{
594-
let error = ResponseTooBigError(maxBodySize: maxBodySize)
595-
self.state = .error(error)
596-
return task.eventLoop.makeFailedFuture(error)
597-
}
594+
let responseTooBig: Bool
598595

599-
self.state = .head(head)
600-
case .head:
601-
preconditionFailure("head already set")
602-
case .body:
603-
preconditionFailure("no head received before body")
604-
case .end:
605-
preconditionFailure("request already processed")
606-
case .error:
607-
break
596+
if self.requestMethod != .HEAD,
597+
let contentLength = head.headers.first(name: "Content-Length"),
598+
let announcedBodySize = Int(contentLength),
599+
announcedBodySize > self.maxBodySize
600+
{
601+
responseTooBig = true
602+
} else {
603+
responseTooBig = false
604+
}
605+
606+
return self.state.withLockedValue {
607+
switch $0.state {
608+
case .idle:
609+
if responseTooBig {
610+
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
611+
$0.state = .error(error)
612+
return task.eventLoop.makeFailedFuture(error)
613+
}
614+
615+
$0.state = .head(head)
616+
case .head:
617+
preconditionFailure("head already set")
618+
case .body:
619+
preconditionFailure("no head received before body")
620+
case .end:
621+
preconditionFailure("request already processed")
622+
case .error:
623+
break
624+
}
625+
return task.eventLoop.makeSucceededFuture(())
608626
}
609-
return task.eventLoop.makeSucceededFuture(())
610627
}
611628

612629
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
613-
switch self.state {
614-
case .idle:
615-
preconditionFailure("no head received before body")
616-
case .head(let head):
617-
guard part.readableBytes <= self.maxBodySize else {
618-
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
619-
self.state = .error(error)
620-
return task.eventLoop.makeFailedFuture(error)
621-
}
622-
self.state = .body(head, part)
623-
case .body(let head, var body):
624-
let newBufferSize = body.writerIndex + part.readableBytes
625-
guard newBufferSize <= self.maxBodySize else {
626-
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
627-
self.state = .error(error)
628-
return task.eventLoop.makeFailedFuture(error)
629-
}
630+
self.state.withLockedValue {
631+
switch $0.state {
632+
case .idle:
633+
preconditionFailure("no head received before body")
634+
case .head(let head):
635+
guard part.readableBytes <= self.maxBodySize else {
636+
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
637+
$0.state = .error(error)
638+
return task.eventLoop.makeFailedFuture(error)
639+
}
640+
$0.state = .body(head, part)
641+
case .body(let head, var body):
642+
let newBufferSize = body.writerIndex + part.readableBytes
643+
guard newBufferSize <= self.maxBodySize else {
644+
let error = ResponseTooBigError(maxBodySize: self.maxBodySize)
645+
$0.state = .error(error)
646+
return task.eventLoop.makeFailedFuture(error)
647+
}
630648

631-
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's
632-
// a cross-module call in the way) so we need to drop the original reference to `body` in
633-
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which
634-
// has no associated data). We'll fix it at the bottom of this block.
635-
self.state = .end
636-
var part = part
637-
body.writeBuffer(&part)
638-
self.state = .body(head, body)
639-
case .end:
640-
preconditionFailure("request already processed")
641-
case .error:
642-
break
649+
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's
650+
// a cross-module call in the way) so we need to drop the original reference to `body` in
651+
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which
652+
// has no associated data). We'll fix it at the bottom of this block.
653+
$0.state = .end
654+
var part = part
655+
body.writeBuffer(&part)
656+
$0.state = .body(head, body)
657+
case .end:
658+
preconditionFailure("request already processed")
659+
case .error:
660+
break
661+
}
662+
return task.eventLoop.makeSucceededFuture(())
643663
}
644-
return task.eventLoop.makeSucceededFuture(())
645664
}
646665

647666
public func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
648-
self.state = .error(error)
667+
self.state.withLockedValue {
668+
$0.state = .error(error)
669+
}
649670
}
650671

651672
public func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response {
652-
switch self.state {
653-
case .idle:
654-
preconditionFailure("no head received before end")
655-
case .head(let head):
656-
return Response(
657-
host: self.requestHost,
658-
status: head.status,
659-
version: head.version,
660-
headers: head.headers,
661-
body: nil,
662-
history: self.history
663-
)
664-
case .body(let head, let body):
665-
return Response(
666-
host: self.requestHost,
667-
status: head.status,
668-
version: head.version,
669-
headers: head.headers,
670-
body: body,
671-
history: self.history
672-
)
673-
case .end:
674-
preconditionFailure("request already processed")
675-
case .error(let error):
676-
throw error
673+
try self.state.withLockedValue {
674+
switch $0.state {
675+
case .idle:
676+
preconditionFailure("no head received before end")
677+
case .head(let head):
678+
return Response(
679+
host: self.requestHost,
680+
status: head.status,
681+
version: head.version,
682+
headers: head.headers,
683+
body: nil,
684+
history: $0.history
685+
)
686+
case .body(let head, let body):
687+
return Response(
688+
host: self.requestHost,
689+
status: head.status,
690+
version: head.version,
691+
headers: head.headers,
692+
body: body,
693+
history: $0.history
694+
)
695+
case .end:
696+
preconditionFailure("request already processed")
697+
case .error(let error):
698+
throw error
699+
}
677700
}
678701
}
679702
}
@@ -709,8 +732,9 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate {
709732
/// released together with the `HTTPTaskHandler` when channel is closed.
710733
/// Users of the library are not required to keep a reference to the
711734
/// object that implements this protocol, but may do so if needed.
712-
public protocol HTTPClientResponseDelegate: AnyObject {
713-
associatedtype Response
735+
@preconcurrency
736+
public protocol HTTPClientResponseDelegate: AnyObject, Sendable {
737+
associatedtype Response: Sendable
714738

715739
/// Called when the request head is sent. Will be called once.
716740
///
@@ -885,7 +909,7 @@ extension URL {
885909
}
886910
}
887911

888-
protocol HTTPClientTaskDelegate {
912+
protocol HTTPClientTaskDelegate: Sendable {
889913
func fail(_ error: Error)
890914
}
891915

@@ -894,49 +918,54 @@ extension HTTPClient {
894918
///
895919
/// Will be created by the library and could be used for obtaining
896920
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
897-
public final class Task<Response> {
921+
public final class Task<Response>: Sendable {
898922
/// The `EventLoop` the delegate will be executed on.
899923
public let eventLoop: EventLoop
900924
/// The `Logger` used by the `Task` for logging.
901925
public let logger: Logger // We are okay to store the logger here because a Task is for only one request.
902926

903927
let promise: EventLoopPromise<Response>
904928

929+
struct State: Sendable {
930+
var isCancelled: Bool
931+
var taskDelegate: HTTPClientTaskDelegate?
932+
}
933+
934+
private let state: NIOLockedValueBox<State>
935+
905936
var isCancelled: Bool {
906-
self.lock.withLock { self._isCancelled }
937+
self.state.withLockedValue { $0.isCancelled }
907938
}
908939

909940
var taskDelegate: HTTPClientTaskDelegate? {
910941
get {
911-
self.lock.withLock { self._taskDelegate }
942+
self.state.withLockedValue { $0.taskDelegate }
912943
}
913944
set {
914-
self.lock.withLock { self._taskDelegate = newValue }
945+
self.state.withLockedValue { $0.taskDelegate = newValue }
915946
}
916947
}
917948

918-
private var _isCancelled: Bool = false
919-
private var _taskDelegate: HTTPClientTaskDelegate?
920-
private let lock = NIOLock()
921-
private let makeOrGetFileIOThreadPool: () -> NIOThreadPool
949+
private let makeOrGetFileIOThreadPool: @Sendable () -> NIOThreadPool
922950

923951
/// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access.
924952
internal var fileIOThreadPool: NIOThreadPool {
925953
self.makeOrGetFileIOThreadPool()
926954
}
927955

928-
init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool) {
956+
init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping @Sendable () -> NIOThreadPool) {
929957
self.eventLoop = eventLoop
930958
self.promise = eventLoop.makePromise()
931959
self.logger = logger
932960
self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool
961+
self.state = NIOLockedValueBox(State(isCancelled: false, taskDelegate: nil))
933962
}
934963

935964
static func failedTask(
936965
eventLoop: EventLoop,
937966
error: Error,
938967
logger: Logger,
939-
makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool
968+
makeOrGetFileIOThreadPool: @escaping @Sendable () -> NIOThreadPool
940969
) -> Task<Response> {
941970
let task = self.init(
942971
eventLoop: eventLoop,
@@ -957,7 +986,8 @@ extension HTTPClient {
957986
/// - returns: The value of ``futureResult`` when it completes.
958987
/// - throws: The error value of ``futureResult`` if it errors.
959988
@available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()")
960-
public func wait() throws -> Response {
989+
@preconcurrency
990+
public func wait() throws -> Response where Response: Sendable {
961991
try self.promise.futureResult.wait()
962992
}
963993

@@ -968,7 +998,8 @@ extension HTTPClient {
968998
/// - returns: The value of ``futureResult`` when it completes.
969999
/// - throws: The error value of ``futureResult`` if it errors.
9701000
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
971-
public func get() async throws -> Response {
1001+
@preconcurrency
1002+
public func get() async throws -> Response where Response: Sendable {
9721003
try await self.promise.futureResult.get()
9731004
}
9741005

@@ -985,23 +1016,14 @@ extension HTTPClient {
9851016
///
9861017
/// - Parameter error: the error that is used to fail the promise
9871018
public func fail(reason error: Error) {
988-
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
989-
self._isCancelled = true
990-
return self._taskDelegate
1019+
let taskDelegate = self.state.withLockedValue { state in
1020+
state.isCancelled = true
1021+
return state.taskDelegate
9911022
}
9921023

9931024
taskDelegate?.fail(error)
9941025
}
9951026

996-
func succeed<Delegate: HTTPClientResponseDelegate>(
997-
promise: EventLoopPromise<Response>?,
998-
with value: Response,
999-
delegateType: Delegate.Type,
1000-
closing: Bool
1001-
) {
1002-
promise?.succeed(value)
1003-
}
1004-
10051027
func fail<Delegate: HTTPClientResponseDelegate>(
10061028
with error: Error,
10071029
delegateType: Delegate.Type
@@ -1011,13 +1033,11 @@ extension HTTPClient {
10111033
}
10121034
}
10131035

1014-
extension HTTPClient.Task: @unchecked Sendable {}
1015-
10161036
internal struct TaskCancelEvent {}
10171037

10181038
// MARK: - RedirectHandler
10191039

1020-
internal struct RedirectHandler<ResponseType> {
1040+
internal struct RedirectHandler<ResponseType: Sendable> {
10211041
let request: HTTPClient.Request
10221042
let redirectState: RedirectState
10231043
let execute: (HTTPClient.Request, RedirectState) -> HTTPClient.Task<ResponseType>

0 commit comments

Comments
 (0)