@@ -32,14 +32,15 @@ extension HTTPClient {
32
32
/// A streaming uploader.
33
33
///
34
34
/// ``StreamWriter`` abstracts
35
- public struct StreamWriter {
36
- let closure : ( IOData ) -> EventLoopFuture < Void >
35
+ public struct StreamWriter : Sendable {
36
+ let closure : @ Sendable ( IOData ) -> EventLoopFuture < Void >
37
37
38
38
/// Create new ``HTTPClient/Body/StreamWriter``
39
39
///
40
40
/// - parameters:
41
41
/// - closure: function that will be called to write actual bytes to the channel.
42
- public init ( closure: @escaping ( IOData ) -> EventLoopFuture < Void > ) {
42
+ @preconcurrency
43
+ public init ( closure: @escaping @Sendable ( IOData ) -> EventLoopFuture < Void > ) {
43
44
self . closure = closure
44
45
}
45
46
@@ -55,8 +56,8 @@ extension HTTPClient {
55
56
func writeChunks< Bytes: Collection > (
56
57
of bytes: Bytes ,
57
58
maxChunkSize: Int
58
- ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 {
59
- // `StreamWriter` is has design issues, for example
59
+ ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 , Bytes : Sendable {
60
+ // `StreamWriter` has design issues, for example
60
61
// - https://github.com/swift-server/async-http-client/issues/194
61
62
// - https://github.com/swift-server/async-http-client/issues/264
62
63
// - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
@@ -66,49 +67,52 @@ extension HTTPClient {
66
67
typealias Iterator = EnumeratedSequence < ChunksOfCountCollection < Bytes > > . Iterator
67
68
typealias Chunk = ( offset: Int , element: ChunksOfCountCollection < Bytes > . Element )
68
69
69
- func makeIteratorAndFirstChunk(
70
- bytes: Bytes
71
- ) -> (
72
- iterator: NIOLockedValueBox < Iterator > ,
73
- chunk: Chunk
74
- ) ? {
75
- var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
76
- guard let chunk = iterator. next ( ) else {
77
- return nil
70
+ // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
71
+ return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
72
+ func makeIteratorAndFirstChunk(
73
+ bytes: Bytes
74
+ ) -> ( iterator: Iterator , chunk: Chunk ) ? {
75
+ var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
76
+ guard let chunk = iterator. next ( ) else {
77
+ return nil
78
+ }
79
+
80
+ return ( iterator, chunk)
78
81
}
79
82
80
- return ( NIOLockedValueBox ( iterator) , chunk)
81
- }
82
-
83
- guard let ( iterator, chunk) = makeIteratorAndFirstChunk ( bytes: bytes) else {
84
- return self . write ( IOData . byteBuffer ( . init( ) ) )
85
- }
83
+ guard let iteratorAndChunk = makeIteratorAndFirstChunk ( bytes: bytes) else {
84
+ return loop. makeSucceededVoidFuture ( )
85
+ }
86
86
87
- @Sendable // can't use closure here as we recursively call ourselves which closures can't do
88
- func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
89
- if let nextElement = iterator. withLockedValue ( { $0. next ( ) } ) {
90
- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . map {
91
- let index = nextElement. offset
92
- if ( index + 1 ) % 4 == 0 {
93
- // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
94
- // mode.
95
- // Also, we must frequently return to the EventLoop because we may get the pause signal
96
- // from another thread. If we fail to do that promptly, we may balloon our body chunks
97
- // into memory.
98
- allDone. futureResult. eventLoop. execute {
99
- writeNextChunk ( nextElement, allDone: allDone)
87
+ var iterator = iteratorAndChunk. 0
88
+ let chunk = iteratorAndChunk. 1
89
+
90
+ // can't use closure here as we recursively call ourselves which closures can't do
91
+ func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
92
+ let loop = allDone. futureResult. eventLoop
93
+ loop. assertInEventLoop ( )
94
+
95
+ if let ( index, element) = iterator. next ( ) {
96
+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . hop ( to: loop) . assumeIsolated ( ) . map
97
+ {
98
+ if ( index + 1 ) % 4 == 0 {
99
+ // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
100
+ // mode.
101
+ // Also, we must frequently return to the EventLoop because we may get the pause signal
102
+ // from another thread. If we fail to do that promptly, we may balloon our body chunks
103
+ // into memory.
104
+ allDone. futureResult. eventLoop. assumeIsolated ( ) . execute {
105
+ writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
106
+ }
107
+ } else {
108
+ writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
100
109
}
101
- } else {
102
- writeNextChunk ( nextElement, allDone: allDone)
103
- }
104
- } . cascadeFailure ( to: allDone)
105
- } else {
106
- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
110
+ } . nonisolated ( ) . cascadeFailure ( to: allDone)
111
+ } else {
112
+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
113
+ }
107
114
}
108
- }
109
115
110
- // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
111
- return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
112
116
let allDone = loop. makePromise ( of: Void . self)
113
117
writeNextChunk ( chunk, allDone: allDone)
114
118
return allDone. futureResult
0 commit comments