diff --git a/Sources/EventSource/EventSource.swift b/Sources/EventSource/EventSource.swift index 27b3697..bbba321 100644 --- a/Sources/EventSource/EventSource.swift +++ b/Sources/EventSource/EventSource.swift @@ -248,8 +248,7 @@ public actor EventSource { handleLine(line) } - // Send an empty line to trigger event dispatch for most test cases - // This matches the behavior expected by most tests + // Send an empty line to trigger event dispatch if !currentData.isEmpty || currentEventId != nil || currentEventType != nil { handleLine("") } @@ -271,13 +270,25 @@ public actor EventSource { /// The current state of the connection (connecting, open, or closed). public private(set) var readyState: ReadyState = .connecting + /// The maximum number of events to deliver when finalizing parsing. + /// + /// This limit prevents unbounded memory growth in edge cases where + /// a server sends a large burst of events just before closing the connection. + /// When the limit is reached, remaining events in the parser's queue are discarded. + /// + /// The default value of 100 should be sufficient for most use cases. + /// Increase this value if your application needs to buffer more events + /// during connection finalization. + public var maximumFinalizationEventCount: Int = 100 + // Backing storage for callbacks - private var _onOpenCallback: (@Sendable () -> Void)? - private var _onMessageCallback: (@Sendable (Event) -> Void)? - private var _onErrorCallback: (@Sendable (Swift.Error?) -> Void)? + private var _onOpenCallback: (@Sendable () async -> Void)? + private var _onMessageCallback: (@Sendable (Event) async -> Void)? + private var _onErrorCallback: (@Sendable (Swift.Error?) async -> Void)? /// The callback to invoke when the connection is opened. - nonisolated public var onOpen: (@Sendable () -> Void)? { + /// This callback is awaited before proceeding with reconnection or completion. + nonisolated public var onOpen: (@Sendable () async -> Void)? { get { fatalError("onOpen can only be set, not read") } set { if let newValue { @@ -287,7 +298,8 @@ public actor EventSource { } /// The callback to invoke when a message is received. - nonisolated public var onMessage: (@Sendable (Event) -> Void)? { + /// This callback is awaited before proceeding with reconnection or completion. + nonisolated public var onMessage: (@Sendable (Event) async -> Void)? { get { fatalError("onMessage can only be set, not read") } set { if let newValue { @@ -297,7 +309,8 @@ public actor EventSource { } /// The callback to invoke when an error occurs. - nonisolated public var onError: (@Sendable (Swift.Error?) -> Void)? { + /// This callback is awaited before proceeding with reconnection or completion. + nonisolated public var onError: (@Sendable (Swift.Error?) async -> Void)? { get { fatalError("onError can only be set, not read") } set { if let newValue { @@ -307,15 +320,15 @@ public actor EventSource { } // Actor-isolated setters - private func setOnOpenCallback(_ callback: @escaping @Sendable () -> Void) { + private func setOnOpenCallback(_ callback: @escaping @Sendable () async -> Void) { self._onOpenCallback = callback } - private func setOnMessageCallback(_ callback: @escaping @Sendable (Event) -> Void) { + private func setOnMessageCallback(_ callback: @escaping @Sendable (Event) async -> Void) { self._onMessageCallback = callback } - private func setOnErrorCallback(_ callback: @escaping @Sendable (Swift.Error?) -> Void) { + private func setOnErrorCallback(_ callback: @escaping @Sendable (Swift.Error?) async -> Void) { self._onErrorCallback = callback } @@ -394,7 +407,6 @@ public actor EventSource { currentRequest.setValue(lastEventId, forHTTPHeaderField: "Last-Event-ID") } - // Perform the HTTP request and get an asynchronous byte stream. let (byteStream, response) = try await session.bytes( for: currentRequest, delegate: nil @@ -404,53 +416,48 @@ public actor EventSource { if let httpResponse = response as? HTTPURLResponse { let status = httpResponse.statusCode if status != 200 { - // HTTP status not OK -> do not reconnect (per spec, fail the connection). throw EventSourceError.invalidHTTPStatus(status) } - // Check Content-Type header let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") if contentType?.lowercased().hasPrefix("text/event-stream") != true { throw EventSourceError.invalidContentType(contentType) } } else { - // Non-HTTP response (unlikely for URLSession) -> treat as error. throw EventSourceError.invalidHTTPStatus(0) } // Connection is established and content type is correct. - // Update state to `.open` and notify. readyState = .open - if let onOpen = _onOpenCallback { - onOpen() - } + await _onOpenCallback?() // Read the incoming byte stream and parse events. for try await byte in byteStream { - // If closed or task cancelled during streaming, break out. if Task.isCancelled || readyState == .closed { break } await parser.consume(byte) - // Retrieve all complete events available after processing this byte. while let event = await parser.getNextEvent() { - // Trigger onMessage callback for each event. - if let onMessage = _onMessageCallback { - onMessage(event) - } + await _onMessageCallback?(event) } } // End of stream reached (server closed connection). - await parser.finish() // finalize parsing, drop any partial event + await parser.finish() // finalize parsing, delivers any pending events to queue + + // Deliver any events that were queued during finish() + var eventsDelivered = 0 + let maxEvents = maximumFinalizationEventCount + while let event = await parser.getNextEvent(), eventsDelivered < maxEvents { + eventsDelivered += 1 + await _onMessageCallback?(event) + } // If not cancelled and still open, treat as a disconnection to reconnect. if !Task.isCancelled && readyState != .closed { // Notify an error event due to unexpected close, then loop to reconnect. - if let onError = _onErrorCallback { - onError(nil) // stream closed without error (will attempt reconnect) - } + await _onErrorCallback?(nil) // stream closed without error (will attempt reconnect) } else { // If cancelled or closed intentionally, break without reconnecting. break @@ -463,9 +470,7 @@ public actor EventSource { } // Notify error event. - if let onError = _onErrorCallback { - onError(error) - } + await _onErrorCallback?(error) // For HTTP status/content-type errors, break out (do not reconnect as per spec). if error is EventSourceError { diff --git a/Tests/EventSourceTests/AsyncEventsSequenceTests.swift b/Tests/EventSourceTests/AsyncEventsSequenceTests.swift index a1779af..da1b520 100644 --- a/Tests/EventSourceTests/AsyncEventsSequenceTests.swift +++ b/Tests/EventSourceTests/AsyncEventsSequenceTests.swift @@ -5,7 +5,7 @@ import Testing @Suite("AsyncEventsSequence Tests", .timeLimit(.minutes(1))) struct AsyncEventsSequenceTests { @Test("Basic event parsing from byte sequence") - func testBasicEventParsing() async throws { + func basicEventParsing() async throws { // Create a simple SSE data string let sseData = "data: hello\n\n" @@ -28,7 +28,7 @@ struct AsyncEventsSequenceTests { } @Test("Multiple events parsing") - func testMultipleEventsParsing() async throws { + func multipleEventsParsing() async throws { // Create a string with multiple SSE events let sseData = """ data: event1 @@ -55,7 +55,7 @@ struct AsyncEventsSequenceTests { } @Test("Event with all fields") - func testEventWithAllFields() async throws { + func eventWithAllFields() async throws { let sseData = """ id: 123 event: test @@ -75,7 +75,7 @@ struct AsyncEventsSequenceTests { } @Test("Empty sequence") - func testEmptySequence() async throws { + func emptySequence() async throws { let emptySequence = AsyncBytes("".utf8).events var iterator = emptySequence.makeAsyncIterator() let event = try await iterator.next() @@ -85,7 +85,7 @@ struct AsyncEventsSequenceTests { @Suite("Line Break Handling") struct LineBreakTests { @Test("CRLF line breaks") - func testCRLFLineBreaks() async throws { + func crlfLineBreaks() async throws { let sseData = "data: hello\r\n\r\n" var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator() let event = try await iterator.next() @@ -95,7 +95,7 @@ struct AsyncEventsSequenceTests { } @Test("CR line breaks") - func testCRLineBreaks() async throws { + func crLineBreaks() async throws { let sseData = "data: hello\r\r" var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator() let event = try await iterator.next() @@ -108,7 +108,7 @@ struct AsyncEventsSequenceTests { @Suite("Comment Handling") struct CommentTests { @Test("Comments in event stream") - func testComments() async throws { + func comments() async throws { let sseData = """ :comment line data: hello @@ -124,7 +124,7 @@ struct AsyncEventsSequenceTests { } @Test("Only comments") - func testOnlyComments() async throws { + func onlyComments() async throws { let sseData = """ :comment line 1 :comment line 2 @@ -140,7 +140,7 @@ struct AsyncEventsSequenceTests { @Suite("Multi-line Data") struct MultilineDataTests { @Test("Multiple data lines") - func testMultipleDataLines() async throws { + func multipleDataLines() async throws { let sseData = """ data: line1 data: line2 @@ -156,7 +156,7 @@ struct AsyncEventsSequenceTests { } @Test("Empty data line") - func testEmptyDataLine() async throws { + func emptyDataLine() async throws { // The SSE spec states that if there's a data: field with no value, // it should be treated as an empty string value, not as absent data let parser = EventSource.Parser() @@ -175,7 +175,7 @@ struct AsyncEventsSequenceTests { @Suite("Retry Field") struct RetryFieldTests { @Test("Only retry field - no event") - func testOnlyRetryField() async throws { + func onlyRetryField() async throws { let sseData = "retry: 1000\n\n" var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator() @@ -184,7 +184,7 @@ struct AsyncEventsSequenceTests { } @Test("Retry with data") - func testRetryWithData() async throws { + func retryWithData() async throws { let sseData = """ retry: 1000 data: hello @@ -202,7 +202,7 @@ struct AsyncEventsSequenceTests { @Suite("Chunked Data") struct ChunkedDataTests { @Test("Chunked delivery simulation") - func testChunkedDelivery() async throws { + func chunkedDelivery() async throws { // Simulate chunked SSE data delivery let eventData = "id: 123\nevent: update\ndata: partial content\ndata: more content\n\n" @@ -231,7 +231,7 @@ struct AsyncEventsSequenceTests { } @Test("Event spanning multiple chunks") - func testEventSpanningChunks() async throws { + func eventSpanningChunks() async throws { // Create events with fixed size, non-recursive approach let eventData = "data: first event\n\ndata: second event\n\ndata: third event\n\n" @@ -261,7 +261,7 @@ struct AsyncEventsSequenceTests { } @Test("Integration with AsyncSequence operators") - func testAsyncSequenceIntegration() async throws { + func asyncSequenceIntegration() async throws { let sseData = "data: event1\n\ndata: event2\n\ndata: event3\n\n" // Convert to async sequence of bytes diff --git a/Tests/EventSourceTests/IntegrationTests.swift b/Tests/EventSourceTests/IntegrationTests.swift index c444f73..4f937fe 100644 --- a/Tests/EventSourceTests/IntegrationTests.swift +++ b/Tests/EventSourceTests/IntegrationTests.swift @@ -7,10 +7,8 @@ import Testing #endif #if swift(>=6.1) - /// Tests focused on real-world integration patterns for EventSource - @Suite("Integration Examples", .serialized) + @Suite("Integration Tests", .serialized) struct IntegrationTests { - /// Actor to safely manage shared state during tests actor ResponseState { private var completedText = "" private var eventCount = 0 @@ -41,1069 +39,898 @@ import Testing } } - /// Actor to manage state in a thread-safe way - actor ErrorTracker { - private var error: EventSourceError? - - func setError(_ newError: EventSourceError?) { - error = newError - } - - func getError() -> EventSourceError? { - return error - } - } - - /// Model for token streaming example struct TokenChunk: Codable, Equatable { let text: String let isComplete: Bool } - @Test("Decode token chunks from SSE stream", .mockURLSession) - func testDecodingTokenChunks() async throws { - // Define the test URL - let url = URL(string: "https://api.example.com/completions")! + @Suite("EventSource API Tests") + struct EventSourceAPITests { + @Test("Token streaming with EventSource", .mockURLSession) + func tokenStreaming() async throws { + let expectedText = "Hello there!" + let expectedEventCount = 3 - // Create mock SSE data with JSON payloads - let sseData = """ - data: {"text":"Hello, ","isComplete":false} + let url = URL(string: "https://api.example.com/completions")! - data: {"text":"world","isComplete":false} + let responseData = """ + data: {"text":"Hello ","isComplete":false} - data: {"text":"!","isComplete":true} + data: {"text":"there","isComplete":false} - """ + data: {"text":"!","isComplete":true} - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) - #expect(request.httpMethod == "GET") - - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! - - return (response, Data(sseData.utf8)) - } + """ - // Create session with mock protocol - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] - let session = URLSession(configuration: configuration) + actor HandlerCallTracker { + private var callCount = 0 - // Create a decoder for parsing JSON data - let decoder = JSONDecoder() + func incrementAndGet() -> Int { + callCount += 1 + return callCount + } + } - // Track the full response - var completedText = "" - var receivedChunks: [TokenChunk] = [] + let handlerTracker = HandlerCallTracker() - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, response) = try await session.bytes(for: request) + await MockURLProtocol.setHandler { request in + let callNum = await handlerTracker.incrementAndGet() + + // Only respond to the first request to prevent reconnection + if callNum == 1 { + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! + + return (response, Data(responseData.utf8)) + } else { + // For reconnection attempts, return error to stop them + let response = HTTPURLResponse( + url: url, + statusCode: 500, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/plain"] + )! + return (response, Data()) + } + } - // Ensure response is valid - #expect((response as? HTTPURLResponse)?.statusCode == 200) - #expect( - (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") - == "text/event-stream" - ) + // Create session with mock protocol and configuration + let configuration = URLSessionConfiguration.ephemeral + configuration.protocolClasses = [MockURLProtocol.self] - // Stream events asynchronously - for try await event in byteStream.events { - // Decode each chunk as it arrives - let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) + // Create a request + var request = URLRequest(url: url) + request.setValue("text/event-stream", forHTTPHeaderField: "Accept") + request.setValue("no-cache", forHTTPHeaderField: "Cache-Control") - // Save the chunk for verification - receivedChunks.append(chunk) + // Create a decoder for parsing JSON data + let decoder = JSONDecoder() + let responseState = ResponseState() - // Add the new token to our result - completedText += chunk.text + // Create the EventSource with the custom configuration + let eventSource = EventSource(request: request, configuration: configuration) - // Check if the response is complete - if chunk.isComplete { - break + // Set up event handler + eventSource.onMessage = { event in + do { + // Decode the chunk + let chunk = try decoder.decode( + TokenChunk.self, + from: Data(event.data.utf8) + ) + + // Add to the completed text + await responseState.addText(chunk.text) + await responseState.incrementEventCount() + + // Check if complete + if chunk.isComplete { + await responseState.markComplete() + await eventSource.close() + } + } catch { + #expect(Bool(false), "Failed to decode event: \(error)") + await eventSource.close() + } } - } - // Verify the final result - #expect(completedText == "Hello, world!") - #expect(receivedChunks.count == 3) - #expect(receivedChunks[0] == TokenChunk(text: "Hello, ", isComplete: false)) - #expect(receivedChunks[1] == TokenChunk(text: "world", isComplete: false)) - #expect(receivedChunks[2] == TokenChunk(text: "!", isComplete: true)) - } + // Wait for completion or timeout + try await Task.sleep(for: .seconds(2)) + await eventSource.close() - @Test("Full API integration with POST request", .mockURLSession) - func testFullAPIIntegration() async throws { - // Define the test URL - let url = URL(string: "https://api.example.com/completions")! + // Verify the final result + #expect(await responseState.getText() == expectedText) + #expect(await responseState.getEventCount() == expectedEventCount) + #expect(await responseState.isCompleted() == true) + } - // Define the request payload - let requestPayload = """ - { - "prompt": "Write a greeting", - "max_tokens": 50 - } - """ + @Test("Reconnection behavior", .mockURLSession) + func reconnection() async throws { + // This test validates the EventSource reconnection behavior - // Create response data - let responseData = """ - data: {"text":"Hello ","isComplete":false} + // Define the test URL + let url = URL(string: "https://api.example.com/events")! - data: {"text":"there","isComplete":false} + // Track connection attempts + actor ConnectionCounter { + var connectionCount = 0 + var lastEventID: String? - data: {"text":"!","isComplete":true} + func incrementCount() { + connectionCount += 1 + } - """ + func setLastEventID(_ id: String?) { + lastEventID = id + } - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) - #expect(request.httpMethod == "POST") - #expect(request.value(forHTTPHeaderField: "Content-Type") == "application/json") - #expect( - request.value(forHTTPHeaderField: "Authorization")?.starts(with: "Bearer ") - == true - ) + func getCount() -> Int { + return connectionCount + } - // Verify request body - if let bodyData = request.httpBody { - let bodyString = String(data: bodyData, encoding: .utf8) - #expect(bodyString?.contains("prompt") == true) - #expect(bodyString?.contains("max_tokens") == true) + func getLastEventID() -> String? { + return lastEventID + } } - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + let counter = ConnectionCounter() - return (response, Data(responseData.utf8)) - } + // Set up the mock handler to simulate network failures and reconnections + await MockURLProtocol.setHandler { request in + await counter.incrementCount() + let currentCount = await counter.getCount() - // Create session with mock protocol - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] - let session = URLSession(configuration: configuration) - - // Set up the request - var request = URLRequest(url: url) - request.httpMethod = "POST" - request.setValue("application/json", forHTTPHeaderField: "Content-Type") - request.setValue("Bearer YOUR_API_KEY", forHTTPHeaderField: "Authorization") - request.httpBody = requestPayload.data(using: .utf8) - - // Process the stream asynchronously - let decoder = JSONDecoder() - var completedText = "" - - // Get the byte stream from URL session - let (byteStream, response) = try await session.bytes(for: request) - - // Ensure response is valid - #expect((response as? HTTPURLResponse)?.statusCode == 200) - #expect( - (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") - == "text/event-stream" - ) - - // Stream events asynchronously - for try await event in byteStream.events { - // Decode each chunk as it arrives - let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) - - // Add the new token to our result - completedText += chunk.text - - // Check if the response is complete - if chunk.isComplete { - break + // Check if the Last-Event-ID header is set correctly on reconnection + if currentCount > 1 { + let lastEventID = request.value(forHTTPHeaderField: "Last-Event-ID") + await counter.setLastEventID(lastEventID) + } + + // First connection - return an event with ID, then close connection + if currentCount == 1 { + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! + + // Return an event with ID and retry + return (response, Data("id: 123\nretry: 100\ndata: first message\n\n".utf8)) + } + // Second connection - return a server error to test retry + else if currentCount == 2 { + let response = HTTPURLResponse( + url: url, + statusCode: 500, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/plain"] + )! + + return (response, Data("Server Error".utf8)) + } + // Third connection - successful + else { + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! + + return (response, Data("id: 456\ndata: reconnected\n\n".utf8)) + } } - } - // Verify the final result - #expect(completedText == "Hello there!") - } + // Create session with mock protocol and configuration + let configuration = URLSessionConfiguration.ephemeral + configuration.protocolClasses = [MockURLProtocol.self] - @Test("Use EventSource for token streaming", .mockURLSession) - func testEventSourceForTokenStreaming() async throws { - let testTimeout: Duration = .seconds(5) - let expectedText = "Hello there!" - let expectedEventCount = 3 + // Create a request for the EventSource + var request = URLRequest(url: url) + request.setValue("text/event-stream", forHTTPHeaderField: "Accept") + request.setValue("no-cache", forHTTPHeaderField: "Cache-Control") - // Define the test URL - let url = URL(string: "https://api.example.com/completions")! + // Create the EventSource with the custom configuration + let eventSource = EventSource(request: request, configuration: configuration) - // Create response data - let responseData = """ - data: {"text":"Hello ","isComplete":false} + // Track received events + actor EventTracker { + var events: [EventSource.Event] = [] + var errors = 0 + var opens = 0 - data: {"text":"there","isComplete":false} + func addEvent(_ event: EventSource.Event) { + events.append(event) + } - data: {"text":"!","isComplete":true} + func incrementErrorCount() { + errors += 1 + } - """ + func incrementOpenCount() { + opens += 1 + } - // Set up the mock handler - await MockURLProtocol.setHandler { request in - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + func getEventCount() -> Int { + return events.count + } - return (response, Data(responseData.utf8)) - } + func getEvents() -> [EventSource.Event] { + return events + } - // Create session with mock protocol and configuration - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] + func getErrorCount() -> Int { + return errors + } - // Create a request - var request = URLRequest(url: url) - request.setValue("text/event-stream", forHTTPHeaderField: "Accept") - request.setValue("no-cache", forHTTPHeaderField: "Cache-Control") + func getOpenCount() -> Int { + return opens + } + } - // Create a decoder for parsing JSON data - let decoder = JSONDecoder() - let responseState = ResponseState() + let tracker = EventTracker() - // Use a proper async continuation pattern instead of polling - await withCheckedContinuation { (continuation: CheckedContinuation) in - // Create the EventSource with the custom configuration - let eventSource = EventSource(request: request, configuration: configuration) + // Set up the event handlers + eventSource.onOpen = { + Task { + await tracker.incrementOpenCount() + } + } - // Set up event handler immediately after creation eventSource.onMessage = { event in - do { - // Decode the chunk - let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) - - Task { - // Add to the completed text - await responseState.addText(chunk.text) - await responseState.incrementEventCount() - - // Check if complete - if chunk.isComplete { - await responseState.markComplete() - await eventSource.close() - continuation.resume() - } - } - } catch { - #expect(Bool(false), "Failed to decode event: \(error)") - Task { - await eventSource.close() - continuation.resume() - } + Task { + await tracker.addEvent(event) } } - // Add timeout protection - Task { - try await Task.sleep(for: testTimeout) - await eventSource.close() - continuation.resume() + eventSource.onError = { _ in + Task { + await tracker.incrementErrorCount() + } } - } - // Verify the final result - #expect(await responseState.getText() == expectedText) - #expect(await responseState.getEventCount() == expectedEventCount) - #expect(await responseState.isCompleted() == true) - } + // Wait for a bit to allow for connection, error, and reconnection + try await Task.sleep(for: .milliseconds(500)) - @Test("EventSource reconnection", .mockURLSession) - func testEventSourceReconnection() async throws { - // This test validates the EventSource reconnection behavior + // Check the results + let connectionCount = await counter.getCount() + #expect(connectionCount >= 2, "Should have attempted at least 2 connections") - // Define the test URL - let url = URL(string: "https://api.example.com/events")! + let openCount = await tracker.getOpenCount() + #expect(openCount >= 1, "Should have opened at least once") - // Track connection attempts - actor ConnectionCounter { - var connectionCount = 0 - var lastEventID: String? + let errorCount = await tracker.getErrorCount() + #expect(errorCount >= 1, "Should have encountered at least one error") - func incrementCount() { - connectionCount += 1 - } + let events = await tracker.getEvents() + #expect(events.count >= 1, "Should have received at least one event") - func setLastEventID(_ id: String?) { - lastEventID = id + if let firstEvent = events.first { + #expect(firstEvent.id == "123") + #expect(firstEvent.data == "first message") } - func getCount() -> Int { - return connectionCount - } + // Check if Last-Event-ID was sent on reconnection + let lastEventID = await counter.getLastEventID() + #expect(lastEventID == "123", "Should reconnect with the last event ID") - func getLastEventID() -> String? { - return lastEventID - } + // Clean up + await eventSource.close() } - let counter = ConnectionCounter() - - // Set up the mock handler to simulate network failures and reconnections - await MockURLProtocol.setHandler { request in - await counter.incrementCount() - let currentCount = await counter.getCount() + @Test("Content-Type validation", .mockURLSession) + func contentTypeValidation() async throws { + let url = URL(string: "https://example.com/events")! - // Check if the Last-Event-ID header is set correctly on reconnection - if currentCount > 1 { - let lastEventID = request.value(forHTTPHeaderField: "Last-Event-ID") - await counter.setLastEventID(lastEventID) - } + actor ErrorTracker { + private var error: EventSourceError? - // First connection - return an event with ID, then close connection - if currentCount == 1 { - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + func setError(_ newError: EventSourceError?) { + error = newError + } - // Return an event with ID and retry - return (response, Data("id: 123\nretry: 100\ndata: first message\n\n".utf8)) + func getError() -> EventSourceError? { + return error + } } - // Second connection - return a server error to test retry - else if currentCount == 2 { - let response = HTTPURLResponse( - url: url, - statusCode: 500, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/plain"] - )! - return (response, Data("Server Error".utf8)) - } - // Third connection - successful - else { + // Set up the mock handler to return incorrect content type + await MockURLProtocol.setHandler { request in let response = HTTPURLResponse( url: url, statusCode: 200, httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] + headerFields: ["Content-Type": "application/json"] )! - return (response, Data("id: 456\ndata: reconnected\n\n".utf8)) - } - } - - // Create session with mock protocol and configuration - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] - - // Create a request for the EventSource - var request = URLRequest(url: url) - request.setValue("text/event-stream", forHTTPHeaderField: "Accept") - request.setValue("no-cache", forHTTPHeaderField: "Cache-Control") - - // Create the EventSource with the custom configuration - let eventSource = EventSource(request: request, configuration: configuration) - - // Track received events - actor EventTracker { - var events: [EventSource.Event] = [] - var errors = 0 - var opens = 0 - - func addEvent(_ event: EventSource.Event) { - events.append(event) - } - - func incrementErrorCount() { - errors += 1 - } - - func incrementOpenCount() { - opens += 1 + return (response, Data("{\"status\":\"ok\"}".utf8)) } - func getEventCount() -> Int { - return events.count - } + // Create session with mock protocol + let configuration = URLSessionConfiguration.ephemeral + configuration.protocolClasses = [MockURLProtocol.self] - func getEvents() -> [EventSource.Event] { - return events - } + var request = URLRequest(url: url) + request.setValue("text/event-stream", forHTTPHeaderField: "Accept") - func getErrorCount() -> Int { - return errors - } + // Create EventSource with the custom session + let eventSource = EventSource( + request: request, + configuration: configuration + ) - func getOpenCount() -> Int { - return opens + // Set up error handler + let errorTracker = ErrorTracker() + eventSource.onError = { error in + if let specificError = error as? EventSourceError { + Task { + await errorTracker.setError(specificError) + } + } } - } - let tracker = EventTracker() + // Wait for the error + try await Task.sleep(for: .milliseconds(100)) - // Set up the event handlers - eventSource.onOpen = { - Task { - await tracker.incrementOpenCount() + // Verify we got an invalid content type error + let receivedError = await errorTracker.getError() + #expect(receivedError != nil) + if case .invalidContentType = receivedError { + // Expected error + } else { + Issue.record( + "Expected invalidContentType error but got \(String(describing: receivedError))" + ) } - } - eventSource.onMessage = { event in - Task { - await tracker.addEvent(event) - } + // Clean up + await eventSource.close() } + } - eventSource.onError = { _ in - Task { - await tracker.incrementErrorCount() + #if !canImport(FoundationNetworking) + @Suite("URLSession.bytes() API Tests") + struct URLSessionBytesAPITests { + /// Helper to create a URL session with mock protocol handlers + func createMockSession() -> URLSession { + let configuration = URLSessionConfiguration.ephemeral + configuration.protocolClasses = [MockURLProtocol.self] + return URLSession(configuration: configuration) } - } - - // Wait for a bit to allow for connection, error, and reconnection - try await Task.sleep(for: .milliseconds(500)) - // Check the results - let connectionCount = await counter.getCount() - #expect(connectionCount >= 2, "Should have attempted at least 2 connections") + @Test("Decode token chunks from byte stream", .mockURLSession) + func decodeTokenChunks() async throws { + // Define the test URL + let url = URL(string: "https://api.example.com/completions")! - let openCount = await tracker.getOpenCount() - #expect(openCount >= 1, "Should have opened at least once") + // Create mock SSE data with JSON payloads + let sseData = """ + data: {"text":"Hello, ","isComplete":false} - let errorCount = await tracker.getErrorCount() - #expect(errorCount >= 1, "Should have encountered at least one error") + data: {"text":"world","isComplete":false} - let events = await tracker.getEvents() - #expect(events.count >= 1, "Should have received at least one event") + data: {"text":"!","isComplete":true} - if let firstEvent = events.first { - #expect(firstEvent.id == "123") - #expect(firstEvent.data == "first message") - } + """ - // Check if Last-Event-ID was sent on reconnection - let lastEventID = await counter.getLastEventID() - #expect(lastEventID == "123", "Should reconnect with the last event ID") + await MockURLProtocol.setHandler { request in + #expect(request.url == url) + #expect(request.httpMethod == "GET") - // Clean up - await eventSource.close() - } + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - @Suite("AsyncEventsSequence with Mock URL Session Tests", .serialized) - struct MockURLProtocolTests { - /// Helper to create a URL session with mock protocol handlers - func createMockSession() -> URLSession { - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] - return URLSession(configuration: configuration) - } - - @Test("Parse SSE Events from URLSession bytes", .mockURLSession) - func testParseEventsFromURLSession() async throws { - // Define the URL and expected event data - let url = URL(string: "https://example.com/events")! - let sseData = """ - data: event1 + return (response, Data(sseData.utf8)) + } - data: event2 + let session = createMockSession() - """ + // Create a decoder for parsing JSON data + let decoder = JSONDecoder() - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) - #expect(request.httpMethod == "GET") + // Track the full response + var completedText = "" + var receivedChunks: [TokenChunk] = [] - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + let request: URLRequest = URLRequest(url: url) + let (byteStream, response) = try await session.bytes(for: request) - return (response, Data(sseData.utf8)) - } + // Ensure response is valid + #expect((response as? HTTPURLResponse)?.statusCode == 200) + #expect( + (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") + == "text/event-stream" + ) - // Create session with mock protocol - let session = createMockSession() + // Stream events asynchronously + for try await event in byteStream.events { + // Decode each chunk as it arrives + let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, response) = try await session.bytes(for: request) + // Save the chunk for verification + receivedChunks.append(chunk) - // Validate response - #expect((response as? HTTPURLResponse)?.statusCode == 200) - #expect( - (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") - == "text/event-stream" - ) + // Add the new token to our result + completedText += chunk.text - // Use the events extension to convert bytes to SSE events - let eventsSequence = byteStream.events + // Check if the response is complete + if chunk.isComplete { + break + } + } - // Collect the events - var events: [EventSource.Event] = [] - for try await event in eventsSequence { - events.append(event) + // Verify the final result + #expect(completedText == "Hello, world!") + #expect(receivedChunks.count == 3) + #expect(receivedChunks[0] == TokenChunk(text: "Hello, ", isComplete: false)) + #expect(receivedChunks[1] == TokenChunk(text: "world", isComplete: false)) + #expect(receivedChunks[2] == TokenChunk(text: "!", isComplete: true)) } - // Verify the events - #expect(events.count == 2) - #expect(events[0].data == "event1") - #expect(events[1].data == "event2") - } + @Test("POST request integration", .mockURLSession) + func postRequestIntegration() async throws { + // Define the test URL + let url = URL(string: "https://api.example.com/completions")! - @Test("Complex SSE event with all fields", .mockURLSession) - func testComplexSSEEvent() async throws { - // Define the URL and complex event data - let url = URL(string: "https://example.com/events")! - let sseData = """ - id: 1234 - event: update - data: {"name":"test","value":42} - retry: 3000 + // Define the request payload + let requestPayload = """ + { + "prompt": "Write a greeting", + "max_tokens": 50 + } + """ - """ + // Create response data + let responseData = """ + data: {"text":"Hello ","isComplete":false} - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) + data: {"text":"there","isComplete":false} - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + data: {"text":"!","isComplete":true} - return (response, Data(sseData.utf8)) - } + """ - // Create session with mock protocol - let session = createMockSession() + await MockURLProtocol.setHandler { request in + #expect(request.url == url) + #expect(request.httpMethod == "POST") + #expect(request.value(forHTTPHeaderField: "Content-Type") == "application/json") + #expect( + request.value(forHTTPHeaderField: "Authorization")?.starts(with: "Bearer ") + == true + ) + + // Verify request body + if let bodyData = request.httpBody { + let bodyString = String(data: bodyData, encoding: .utf8) + #expect(bodyString?.contains("prompt") == true) + #expect(bodyString?.contains("max_tokens") == true) + } - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, _) = try await session.bytes(for: request) + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - // Use the events extension - let eventsSequence = byteStream.events + return (response, Data(responseData.utf8)) + } - // Get just the first event - var iterator = eventsSequence.makeAsyncIterator() - let event = try await iterator.next() + let session = createMockSession() - // Verify the event fields - #expect(event != nil) - #expect(event?.id == "1234") - #expect(event?.event == "update") - #expect(event?.data == "{\"name\":\"test\",\"value\":42}") - #expect(event?.retry == 3000) - } + // Set up the request + var request = URLRequest(url: url) + request.httpMethod = "POST" + request.setValue("application/json", forHTTPHeaderField: "Content-Type") + request.setValue("Bearer YOUR_API_KEY", forHTTPHeaderField: "Authorization") + request.httpBody = requestPayload.data(using: .utf8) - @Test("Chunked delivery simulation with URLSession", .mockURLSession) - func testChunkedDeliveryWithURLSession() async throws { - // Define the URL - let url = URL(string: "https://example.com/events")! + // Process the stream asynchronously + let decoder = JSONDecoder() + var completedText = "" - // Create event data with multiple events - let sseData = "data: event1\n\ndata: event2\n\ndata: event3\n\n" + // Get the byte stream from URL session + let (byteStream, response) = try await session.bytes(for: request) - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) + // Ensure response is valid + #expect((response as? HTTPURLResponse)?.statusCode == 200) + #expect( + (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") + == "text/event-stream" + ) - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + // Stream events asynchronously + for try await event in byteStream.events { + // Decode each chunk as it arrives + let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) - return (response, Data(sseData.utf8)) - } + // Add the new token to our result + completedText += chunk.text - // Create session with mock protocol - let session = createMockSession() + // Check if the response is complete + if chunk.isComplete { + break + } + } - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, _) = try await session.bytes(for: request) + // Verify the final result + #expect(completedText == "Hello there!") + } - // Use the events extension to convert bytes to SSE events - let eventsSequence = byteStream.events + @Test("Parse SSE events from byte stream", .mockURLSession) + func parseSSEEvents() async throws { + let url = URL(string: "https://example.com/events")! + let sseData = """ + data: event1 - // Collect the events - var events: [EventSource.Event] = [] - for try await event in eventsSequence { - events.append(event) - } + data: event2 - // Verify the events - #expect(events.count == 3) - #expect(events[0].data == "event1") - #expect(events[1].data == "event2") - #expect(events[2].data == "event3") - } + """ - @Test("HTTP error handling", .mockURLSession) - func testHTTPErrorHandling() async throws { - // Define the URL - let url = URL(string: "https://example.com/events")! + await MockURLProtocol.setHandler { request in + #expect(request.url == url) + #expect(request.httpMethod == "GET") - // Set up the mock handler to return an error - await MockURLProtocol.setHandler { request in - let response = HTTPURLResponse( - url: url, - statusCode: 404, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/plain"] - )! + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - return (response, Data("Not Found".utf8)) - } + return (response, Data(sseData.utf8)) + } - // Create session with mock protocol - let session = createMockSession() + let session = createMockSession() - do { - // The URLSession bytes call should still succeed let request: URLRequest = URLRequest(url: url) let (byteStream, response) = try await session.bytes(for: request) - // But we can check the HTTP status code - #expect((response as? HTTPURLResponse)?.statusCode == 404) + // Ensure response is valid + #expect((response as? HTTPURLResponse)?.statusCode == 200) + #expect( + (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") + == "text/event-stream" + ) - // No events should be parsed from this response let eventsSequence = byteStream.events - var iterator = eventsSequence.makeAsyncIterator() - let event = try await iterator.next() - - // Expect no events since content-type is not text/event-stream - #expect(event == nil) - } catch { - // We don't expect an exception here, the bytes call should succeed - // even with a 404 response - Issue.record("Unexpected error: \(error)") - } - } - @Test("Content-Type validation with EventSource", .mockURLSession) - func testContentTypeValidation() async throws { - // Define the URL - let url = URL(string: "https://example.com/events")! - - // Set up the mock handler to return incorrect content type - await MockURLProtocol.setHandler { request in - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "application/json"] - )! + var events: [EventSource.Event] = [] + for try await event in eventsSequence { + events.append(event) + } - return (response, Data("{\"status\":\"ok\"}".utf8)) + #expect(events.count == 2) + #expect(events[0].data == "event1") + #expect(events[1].data == "event2") } - // Create session with mock protocol and test EventSource - let session = createMockSession() + @Test("Complex SSE event parsing", .mockURLSession) + func complexSSEEvent() async throws { + let url = URL(string: "https://example.com/events")! + let sseData = """ + id: msg-1 + event: update + data: {"status":"processing"} - var request = URLRequest(url: url) - request.setValue("text/event-stream", forHTTPHeaderField: "Accept") - - // Create EventSource with the custom session - let eventSource = EventSource( - request: request, - configuration: session.configuration - ) + id: msg-2 + event: complete + data: {"status":"done"} - // Set up error handler - let errorTracker = ErrorTracker() - eventSource.onError = { error in - if let specificError = error as? EventSourceError { - Task { - await errorTracker.setError(specificError) - } - } - } + """ - // Wait for the error - try await Task.sleep(for: .milliseconds(100)) + await MockURLProtocol.setHandler { request in + #expect(request.url == url) - // Verify we got an invalid content type error - let receivedError = await errorTracker.getError() - #expect(receivedError != nil) - if case .invalidContentType = receivedError { - // Expected error - } else { - Issue.record( - "Expected invalidContentType error but got \(String(describing: receivedError))" - ) - } + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - // Clean up - await eventSource.close() - } + return (response, Data(sseData.utf8)) + } - @Test("Real-world streaming example", .mockURLSession) - func testRealWorldStreamingExample() async throws { - // Define the URL - let url = URL(string: "https://api.example.com/stream")! + let session = createMockSession() - // Set up the mock handler to return SSE data - await MockURLProtocol.setHandler { request in - let responseData = """ - data: {"text":"Hello, ","isComplete":false} - - data: {"text":"world!","isComplete":true} + let request: URLRequest = URLRequest(url: url) + let (byteStream, _) = try await session.bytes(for: request) - """ + let eventsSequence = byteStream.events - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + // Collect the events + var events: [EventSource.Event] = [] + for try await event in eventsSequence { + events.append(event) + } - return (response, Data(responseData.utf8)) + #expect(events.count == 2) + #expect(events[0].id == "msg-1") + #expect(events[0].event == "update") + #expect(events[0].data == "{\"status\":\"processing\"}") + #expect(events[1].id == "msg-2") + #expect(events[1].event == "complete") + #expect(events[1].data == "{\"status\":\"done\"}") } - // Create session with mock protocol - let session = createMockSession() + @Test("Chunked delivery parsing", .mockURLSession) + func chunkedDelivery() async throws { + let url = URL(string: "https://example.com/events")! - // Create a decoder for parsing JSON data - let decoder = JSONDecoder() + // Create event data with multiple events + let sseData = "data: event1\n\ndata: event2\n\ndata: event3\n\n" - // Model for token streaming - struct TokenChunk: Codable { - let text: String - let isComplete: Bool - } + await MockURLProtocol.setHandler { request in + #expect(request.url == url) - // Simulating the README.md example - var completedText = "" + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, response) = try await session.bytes(for: request) + return (response, Data(sseData.utf8)) + } - // Ensure response is valid - #expect((response as? HTTPURLResponse)?.statusCode == 200) - #expect( - (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") - == "text/event-stream" - ) + let session = createMockSession() - // Stream events asynchronously - for try await event in byteStream.events { - // Decode each chunk as it arrives - let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) + let request: URLRequest = URLRequest(url: url) + let (byteStream, _) = try await session.bytes(for: request) - // Add the new token to our result - completedText += chunk.text + let eventsSequence = byteStream.events - // Check if the response is complete - if chunk.isComplete { - break + // Collect the events + var events: [EventSource.Event] = [] + for try await event in eventsSequence { + events.append(event) } + + #expect(events.count == 3) + #expect(events[0].data == "event1") + #expect(events[1].data == "event2") + #expect(events[2].data == "event3") } - // Verify the final result - #expect(completedText == "Hello, world!") - } - } + @Test("HTTP error handling", .mockURLSession) + func httpErrorHandling() async throws { + let url = URL(string: "https://example.com/events")! - @Suite("URLSession Stream Simulation Tests", .serialized) - struct MockURLProtocolStreamTests { - /// Helper to create a URL session with mock protocol handlers - func createMockSession() -> URLSession { - let configuration = URLSessionConfiguration.ephemeral - configuration.protocolClasses = [MockURLProtocol.self] - return URLSession(configuration: configuration) - } + // Set up the mock handler to return an error + await MockURLProtocol.setHandler { request in + let response = HTTPURLResponse( + url: url, + statusCode: 404, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/plain"] + )! - /// Creates a chunked delivery handler that delivers SSE data byte-by-byte - /// with delays to simulate network streaming conditions - func createByteByByteHandler( - url: URL, - sseData: String, - delayBetweenBytes: Duration = .milliseconds(1) - ) -> @Sendable (URLRequest) async throws -> (HTTPURLResponse, Data) { - return { request in - #expect(request.url == url) - - // Create response for SSE - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + return (response, Data("Not Found".utf8)) + } - // We're returning an empty data response because we'll manually deliver - // each byte through the loading system to simulate a true stream - let emptyData = Data() - - // Create a task that will stream the bytes - Task.detached { - // Start streaming bytes with some delay between them - let client = - request.value(forHTTPHeaderField: "_MockURLProtocolClient") - as? NSObjectProtocol - let selector = NSSelectorFromString("urlProtocol:didLoad:") - - // Reference to the protocol instance (need to do this via reflection) - guard - let protocolInstance = request.value( - forHTTPHeaderField: "_MockURLProtocolInstance" - ) as? NSObjectProtocol - else { - return - } + let session = createMockSession() - // Convert string to bytes - let dataBytes = [UInt8](sseData.utf8) + do { + // The URLSession bytes call should still succeed + let request: URLRequest = URLRequest(url: url) + let (byteStream, response) = try await session.bytes(for: request) - // Stream each byte with delay - for byte in dataBytes { - // Wait a bit between bytes - try? await Task.sleep(for: delayBetweenBytes) + // But we can check the HTTP status code + #expect((response as? HTTPURLResponse)?.statusCode == 404) - // Deliver a single byte of data - let byteData = Data([byte]) - _ = client?.perform(selector, with: protocolInstance, with: byteData) - } + // No events should be parsed from this response + let eventsSequence = byteStream.events + var iterator = eventsSequence.makeAsyncIterator() + let event = try await iterator.next() - // Finally, complete the loading - let finishSelector = NSSelectorFromString("urlProtocolDidFinishLoading:") - _ = client?.perform(finishSelector, with: protocolInstance) + // Expect no events since content-type is not text/event-stream + #expect(event == nil) + } catch { + // We don't expect an exception here, the bytes call should succeed + // even with a 404 response + Issue.record("Unexpected error: \(error)") } - - return (response, emptyData) } - } - @Test("Simulated Chunked Delivery Test", .mockURLSession) - func testSimulatedChunkedDelivery() async throws { - // Define the URL - let url = URL(string: "https://example.com/events")! + @Test("Real-world streaming example", .mockURLSession) + func realWorldStreaming() async throws { + let url = URL(string: "https://api.example.com/stream")! - // Create event data with multiple events - let eventChunks = [ - "data: event", - "1\n\ndata: ev", - "ent2\n\ndata", - ": event3\n\n", - ] + await MockURLProtocol.setHandler { request in + let responseData = """ + data: {"text":"Hello, ","isComplete":false} - // Set up the mock handler sequence - await MockURLProtocol.setHandler { request in - #expect(request.url == url) + data: {"text":"world!","isComplete":true} - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + """ - // Implement our own chunked delivery simulation using standard - // MockURLProtocol behavior + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - // Create client and protocol storage mechanism - actor DeliveryManager { - private var client: NSObjectProtocol? - private var protocolInstance: NSObjectProtocol? - private var chunks: [String] - private var currentChunkIndex = 0 + return (response, Data(responseData.utf8)) + } - init(chunks: [String]) { - self.chunks = chunks - } + let session = createMockSession() - func setClientAndProtocol( - client: NSObjectProtocol, - protocolInstance: NSObjectProtocol - ) { - self.client = client - self.protocolInstance = protocolInstance - } + // Create a decoder for parsing JSON data + let decoder = JSONDecoder() - func deliverNextChunk() async -> Bool { - guard let client = client, - let protocolInstance = protocolInstance, - currentChunkIndex < chunks.count - else { - return false - } + // Model for token streaming + struct TokenChunk: Codable { + let text: String + let isComplete: Bool + } - // Get the next chunk - let chunk = chunks[currentChunkIndex] - currentChunkIndex += 1 + // Simulating the README.md example + var completedText = "" - // Convert to Data - let chunkData = Data(chunk.utf8) + let request: URLRequest = URLRequest(url: url) + let (byteStream, response) = try await session.bytes(for: request) - // Deliver using the client's didLoad method - let selector = NSSelectorFromString("urlProtocol:didLoad:") - _ = client.perform(selector, with: protocolInstance, with: chunkData) + // Ensure response is valid + #expect((response as? HTTPURLResponse)?.statusCode == 200) + #expect( + (response as? HTTPURLResponse)?.value(forHTTPHeaderField: "Content-Type") + == "text/event-stream" + ) - return currentChunkIndex < chunks.count - } + // Stream events asynchronously + for try await event in byteStream.events { + // Decode each chunk as it arrives + let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) - func finishLoading() { - guard let client = client, - let protocolInstance = protocolInstance - else { - return - } + // Add the new token to our result + completedText += chunk.text - // Call didFinishLoading - let selector = NSSelectorFromString("urlProtocolDidFinishLoading:") - _ = client.perform(selector, with: protocolInstance) + // Check if the response is complete + if chunk.isComplete { + break } } - // Return the initial response with empty data - // The actual data will be delivered in the background task - return (response, Data()) + // Verify the final result + #expect(completedText == "Hello, world!") } - // For this test, we'll use a simpler approach - just deliver the entire - // content at once, but still test chunking via ChunkedAsyncBytes - let fullContent = eventChunks.joined() + @Test("Simulated chunked delivery", .mockURLSession) + func simulatedChunkedDelivery() async throws { + let url = URL(string: "https://example.com/events")! - await MockURLProtocol.setHandler { request in - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + // Create event data with multiple events + let eventChunks = [ + "data: event", + "1\n\ndata: ev", + "ent2\n\ndata", + ": event3\n\n", + ] - return (response, Data(fullContent.utf8)) - } + let fullContent = eventChunks.joined() - // Create session with mock protocol - let session = createMockSession() + await MockURLProtocol.setHandler { request in + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, _) = try await session.bytes(for: request) + return (response, Data(fullContent.utf8)) + } + + let session = createMockSession() - // Use the events extension to convert bytes to SSE events - let eventsSequence = byteStream.events + let request: URLRequest = URLRequest(url: url) + let (byteStream, _) = try await session.bytes(for: request) - // Collect the events - var events: [EventSource.Event] = [] - for try await event in eventsSequence { - events.append(event) + let eventsSequence = byteStream.events + + // Collect the events + var events: [EventSource.Event] = [] + for try await event in eventsSequence { + events.append(event) + } + + #expect(events.count == 3) + #expect(events[0].data == "event1") + #expect(events[1].data == "event2") + #expect(events[2].data == "event3") } - // Verify the events - note that even though we delivered in chunks, - // the parser should have reconstructed the proper events - #expect(events.count == 3) - #expect(events[0].data == "event1") - #expect(events[1].data == "event2") - #expect(events[2].data == "event3") - } + @Test("Large event delivery", .mockURLSession) + func largeEventDelivery() async throws { + let url = URL(string: "https://example.com/events")! - @Test("Large event delivery", .mockURLSession) - func testLargeEventDelivery() async throws { - // Define the URL - let url = URL(string: "https://example.com/events")! + // Create a large event with 10,000 characters + let largePayload = String(repeating: "abcdefghij", count: 1000) + let sseData = "data: \(largePayload)\n\n" - // Create a large event with 10,000 characters - let largePayload = String(repeating: "abcdefghij", count: 1000) - let sseData = "data: \(largePayload)\n\n" + // Set up the mock handler + await MockURLProtocol.setHandler { request in + #expect(request.url == url) - // Set up the mock handler - await MockURLProtocol.setHandler { request in - #expect(request.url == url) + let response = HTTPURLResponse( + url: url, + statusCode: 200, + httpVersion: "HTTP/1.1", + headerFields: ["Content-Type": "text/event-stream"] + )! - let response = HTTPURLResponse( - url: url, - statusCode: 200, - httpVersion: "HTTP/1.1", - headerFields: ["Content-Type": "text/event-stream"] - )! + return (response, Data(sseData.utf8)) + } - return (response, Data(sseData.utf8)) - } + let session = createMockSession() - // Create session with mock protocol - let session = createMockSession() + let request: URLRequest = URLRequest(url: url) + let (byteStream, _) = try await session.bytes(for: request) - // Perform request and get bytes - let request: URLRequest = URLRequest(url: url) - let (byteStream, _) = try await session.bytes(for: request) + let eventsSequence = byteStream.events - // Use the events extension - let eventsSequence = byteStream.events + var iterator = eventsSequence.makeAsyncIterator() + let event = try await iterator.next() - // Get the event - var iterator = eventsSequence.makeAsyncIterator() - let event = try await iterator.next() + #expect(event != nil) + #expect(event?.data == largePayload) + } - // Verify the event - #expect(event != nil) - #expect(event?.data == largePayload) - } + @Test("Network conditions with irregular chunks", .mockURLSession) + func irregularChunks() async throws { + // Create event data - we'll break this up manually + let sseData = "data: event with spaces\n\ndata: another\ndata: line\n\n" - @Test("Simulating network conditions with ChunkedAsyncBytes", .mockURLSession) - func testNetworkConditionsWithChunked() async throws { - // Create event data - we'll break this up manually - let sseData = "data: event with spaces\n\ndata: another\ndata: line\n\n" + // Break the event data into chunks of different sizes + let allBytes = Array(sseData.utf8) + var chunks: [[UInt8]] = [] - // Break the event data into chunks of different sizes - let allBytes = Array(sseData.utf8) - var chunks: [[UInt8]] = [] + // Create irregular chunk sizes to simulate network conditions + var currentIndex = 0 + let chunkSizes = [3, 5, 10, 2, 7, 15, 20] - // Create irregular chunk sizes to simulate network conditions - var currentIndex = 0 - let chunkSizes = [3, 5, 10, 2, 7, 15, 20] + for size in chunkSizes { + if currentIndex >= allBytes.count { + break + } - for size in chunkSizes { - if currentIndex >= allBytes.count { - break + let end = min(currentIndex + size, allBytes.count) + chunks.append(Array(allBytes[currentIndex..=6.1) diff --git a/Tests/EventSourceTests/ParserTests.swift b/Tests/EventSourceTests/ParserTests.swift index b464f5f..e2fd079 100644 --- a/Tests/EventSourceTests/ParserTests.swift +++ b/Tests/EventSourceTests/ParserTests.swift @@ -4,13 +4,13 @@ import Testing @Suite("EventSource Parser Tests", .timeLimit(.minutes(1))) struct ParserTests { @Test("Empty stream produces no events") - func testEmptyStreamProducesNoEvents() async { + func emptyStreamProducesNoEvents() async { let events = await getEvents(from: "") #expect(events.isEmpty) } @Test("Comment only stream produces no events") - func testCommentOnlyStreamProducesNoEvents() async { + func commentOnlyStreamProducesNoEvents() async { let stream = ":comment line 1\n:comment line 2\r\n" let events = await getEvents(from: stream) #expect(events.isEmpty) @@ -19,7 +19,7 @@ struct ParserTests { @Suite("Single Line Event Tests") struct SingleLineEventTests { @Test("LF line breaks") - func testSingleLineEventLF() async { + func singleLineEventLF() async { let stream = "data: test data\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -29,7 +29,7 @@ struct ParserTests { } @Test("CR line breaks") - func testSingleLineEventCR() async { + func singleLineEventCR() async { let stream = "data: test data\r\r" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -37,7 +37,7 @@ struct ParserTests { } @Test("CRLF line breaks") - func testSingleLineEventCRLF() async { + func singleLineEventCRLF() async { let stream = "data: test data\r\n\r\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -46,7 +46,7 @@ struct ParserTests { } @Test("Multi-line data") - func testMultiLineData() async { + func multiLineData() async { let stream = "data: line1\ndata: line2\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -56,7 +56,7 @@ struct ParserTests { @Suite("Event Field Tests") struct EventFieldTests { @Test("Event with ID") - func testEventWithID() async { + func eventWithID() async { let stream = "id: 123\ndata: test data\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -70,7 +70,7 @@ struct ParserTests { } @Test("Event with NUL in ID is ignored") - func testEventWithNULInIDIsIgnoredAndDoesNotSetLastEventID() async { + func eventWithNULInIDIsIgnoredAndDoesNotSetLastEventID() async { let stream = "id: abc\0def\ndata: test\n\n" // ID with NUL let parser = EventSource.Parser() @@ -101,7 +101,7 @@ struct ParserTests { } @Test("ID reset by empty ID line") - func testIDResetByEmptyIDLine() async { + func idResetByEmptyIDLine() async { let stream = "id: 123\ndata: event1\n\nid: \ndata: event2\n\n" let parser = EventSource.Parser() @@ -133,7 +133,7 @@ struct ParserTests { } @Test("Event with name") - func testEventWithName() async { + func eventWithName() async { let stream = "event: custom\ndata: test data\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -142,7 +142,7 @@ struct ParserTests { } @Test("Retry field") - func testRetryField() async { + func retryField() async { let stream = "retry: 5000\ndata: test data\n\n" let parser = EventSource.Parser() // Manually consume and finish to inspect parser state before getting all events @@ -166,7 +166,7 @@ struct ParserTests { } @Test("Retry field only updates reconnection time") - func testRetryFieldOnlyUpdatesReconnectionTimeDoesNotDispatchEvent() async { + func retryFieldOnlyUpdatesReconnectionTimeDoesNotDispatchEvent() async { let stream = "retry: 1234\n\n" // Only retry, no data or other fields let parser = EventSource.Parser() let initialReconnectionTime = await parser.getReconnectionTime() @@ -182,7 +182,7 @@ struct ParserTests { } @Test("Invalid retry value") - func testInvalidRetryField() async { + func invalidRetryField() async { let parser = EventSource.Parser() let bytes = "retry: invalid\n\n".utf8 for byte in bytes { @@ -197,7 +197,7 @@ struct ParserTests { } @Test("Multiple events") - func testMultipleEvents() async { + func multipleEvents() async { let stream = "data: event1\n\nevent: custom\ndata: event2\nid: e2\n\n" let events = await getEvents(from: stream) #expect(events.count == 2) @@ -212,7 +212,7 @@ struct ParserTests { } @Test("Mixed line endings produce correct events") - func testMixedLineEndingsProduceCorrectEvents() async { + func mixedLineEndingsProduceCorrectEvents() async { let stream = "data: event1\r\r" + "data: event2\n\n" + "data: event3\r\n\r\n" + "id: e4\rdata: event4\r\r" // `id: e4` is one line, `data: event4` is next, then dispatch @@ -232,7 +232,7 @@ struct ParserTests { @Suite("Field Parsing Tests") struct FieldParsingTests { @Test("Colon space parsing") - func testColonSpaceParsing() async { + func colonSpaceParsing() async { // Case 1: "data:value" let stream1 = "data:value1\n\n" let events1 = await getEvents(from: stream1) @@ -253,7 +253,7 @@ struct ParserTests { } @Test("Unicode in data") - func testUnicodeInData() async { + func unicodeInData() async { // Adapted from https://github.com/launchdarkly/swift-eventsource/blob/193c097f324666691f71b49b1e70249ef21f9f62/Tests/UTF8LineParserTests.swift#L59 let unicodeData = "¯\\_(ツ)_/¯0️⃣🇺🇸Z̮̞̠͙͔ͅḀ̗̞͈̻̗Ḷ͙͎̯̹̞͓G̻O̭̗̮𝓯𝓸𝔁✅" let stream = "data: \(unicodeData)\n\n" @@ -263,7 +263,7 @@ struct ParserTests { } @Test("NUL character in data") - func testNULCharacterInData() async { + func nulCharacterInData() async { let dataWithNul = "hello\0world" let stream = "data: \(dataWithNul)\n\n" let events = await getEvents(from: stream) @@ -272,7 +272,7 @@ struct ParserTests { } @Test("Invalid UTF8 sequence becomes replacement character") - func testInvalidUTF8SequenceBecomesReplacementCharacter() async { + func invalidUTF8SequenceBecomesReplacementCharacter() async { // Create a byte array with an invalid UTF-8 sequence var invalidBytesStream: [UInt8] = [] // Add "data: test" as raw bytes @@ -305,21 +305,21 @@ struct ParserTests { } @Test("Field without colon is ignored") - func testFieldWithoutColonIsIgnored() async { + func fieldWithoutColonIsIgnored() async { let stream = "data test\n\n" let events = await getEvents(from: stream) #expect(events.isEmpty) } @Test("Field with empty name is ignored") - func testFieldWithEmptyNameIsIgnored() async { + func fieldWithEmptyNameIsIgnored() async { let stream = ": value\n\n" let events = await getEvents(from: stream) #expect(events.isEmpty) } @Test("Field with empty value") - func testFieldWithEmptyValue() async { + func fieldWithEmptyValue() async { let stream = "data:\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -327,7 +327,7 @@ struct ParserTests { } @Test("Field with only spaces after colon") - func testFieldWithOnlySpacesAfterColon() async { + func fieldWithOnlySpacesAfterColon() async { let stream = "data: \n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -335,7 +335,7 @@ struct ParserTests { } @Test("Field with tab after colon") - func testFieldWithTabAfterColon() async { + func fieldWithTabAfterColon() async { let stream = "data:\tvalue\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -343,7 +343,7 @@ struct ParserTests { } @Test("Field with multiple colons") - func testFieldWithMultipleColons() async { + func fieldWithMultipleColons() async { let stream = "data:value:with:colons\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -351,7 +351,7 @@ struct ParserTests { } @Test("Field with leading and trailing spaces") - func testFieldWithLeadingAndTrailingSpaces() async { + func fieldWithLeadingAndTrailingSpaces() async { let stream = "data: value \n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -362,7 +362,7 @@ struct ParserTests { @Suite("Empty Field Tests") struct EmptyFieldTests { @Test("Empty data field dispatch") - func testEmptyDataFieldDispatch() async { + func emptyDataFieldDispatch() async { // An event with a "data" field that results in empty string data should still be dispatched. // "data" (field name with empty value) let stream1 = "data\n\n" @@ -378,7 +378,7 @@ struct ParserTests { } @Test("Fields without value are processed correctly") - func testFieldsWithoutValueAreProcessedCorrectly() async { + func fieldsWithoutValueAreProcessedCorrectly() async { // According to spec: // - "event" (no value): event type set to empty string. // - "data" (no value): data buffer gets an empty string. @@ -410,7 +410,7 @@ struct ParserTests { @Suite("Comment and Line Tests") struct CommentAndLineTests { @Test("Only comment lines and empty lines produce no events") - func testOnlyCommentLinesAndEmptyLinesProduceNoEvents() async { + func onlyCommentLinesAndEmptyLinesProduceNoEvents() async { // Includes dispatch-triggering empty lines but no data fields let stream = ":comment\n\n:another comment\r\n\r\n" let events = await getEvents(from: stream) @@ -421,7 +421,7 @@ struct ParserTests { } @Test("Line without colon is ignored") - func testLineWithoutColonIsIgnored() async { + func lineWithoutColonIsIgnored() async { let stream = "this is a bogus line\ndata: valid\n\nthis is also bogus\r\n" let events = await getEvents(from: stream) #expect(events.count == 1, "Only one event from the valid 'data: valid' line.") @@ -429,7 +429,7 @@ struct ParserTests { } @Test("Complex mixed comments and events") - func testMixedCommentsAndEvents() async { + func mixedCommentsAndEvents() async { let parser = EventSource.Parser() let bytes = """ :initial comment @@ -467,7 +467,7 @@ struct ParserTests { } @Test("Final line unterminated is processed by finish") - func testFinalLineUnterminatedIsProcessedByCurrentFinishLogic() async { + func finalLineUnterminatedIsProcessedByCurrentFinishLogic() async { // This test reflects the current behavior of Parser.finish(). // As noted, the SSE spec suggests an unterminated final block without a blank line should be discarded. // Your parser's finish() method currently dispatches such events. @@ -487,7 +487,7 @@ struct ParserTests { } @Test("Empty comment") - func testEmptyComment() async { + func emptyComment() async { let stream = ":\ndata: test\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -495,7 +495,7 @@ struct ParserTests { } @Test("Comment with colon in body") - func testCommentWithColonInBody() async { + func commentWithColonInBody() async { let stream = ":comment:with:colons\ndata: test\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -503,7 +503,7 @@ struct ParserTests { } @Test("Comment with leading space") - func testCommentWithLeadingSpace() async { + func commentWithLeadingSpace() async { let stream = ": comment with leading space\ndata: test\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -511,7 +511,7 @@ struct ParserTests { } @Test("Multiple consecutive comments") - func testMultipleConsecutiveComments() async { + func multipleConsecutiveComments() async { let stream = ":comment1\n:comment2\n:comment3\ndata: test\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -519,7 +519,7 @@ struct ParserTests { } @Test("Comment between data lines") - func testCommentBetweenDataLines() async { + func commentBetweenDataLines() async { let stream = "data: line1\n:comment\ndata: line2\n\n" let events = await getEvents(from: stream) #expect(events.count == 1) @@ -530,7 +530,7 @@ struct ParserTests { @Suite("BOM Tests") struct BOMTests { @Test("UTF8 BOM at stream start is handled") - func testUtf8BOMAtStreamStartIsHandled() async { + func utf8BOMAtStreamStartIsHandled() async { // Create a byte array with UTF-8 BOM followed by SSE content var fullStreamBytes: [UInt8] = [] // Add UTF-8 BOM @@ -565,7 +565,7 @@ struct ParserTests { @Suite("Retry Field") struct RetryFieldTests { @Test("Only retry field - no event") - func testOnlyRetryField() async throws { + func onlyRetryField() async throws { let sseData = "retry: 1000\n\n" var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator() @@ -574,7 +574,7 @@ struct ParserTests { } @Test("Retry with data") - func testRetryWithData() async throws { + func retryWithData() async throws { let sseData = """ retry: 1000 data: hello @@ -589,7 +589,7 @@ struct ParserTests { } @Test("Retry with no space after colon") - func testRetryWithNoSpace() async { + func retryWithNoSpace() async { let stream = "retry:7000\ndata: test\n\n" let parser = EventSource.Parser() for byte in stream.utf8 { @@ -600,7 +600,7 @@ struct ParserTests { } @Test("Retry with non-numeric value") - func testRetryWithNonNumericValue() async { + func retryWithNonNumericValue() async { let stream = "retry: 7000L\ndata: test\n\n" let parser = EventSource.Parser() let initialReconnectionTime = await parser.getReconnectionTime() @@ -612,7 +612,7 @@ struct ParserTests { } @Test("Empty retry field") - func testEmptyRetryField() async { + func emptyRetryField() async { let stream = "retry\ndata: test\n\n" let parser = EventSource.Parser() let initialReconnectionTime = await parser.getReconnectionTime() @@ -624,7 +624,7 @@ struct ParserTests { } @Test("Retry with out of bounds value") - func testRetryWithOutOfBoundsValue() async { + func retryWithOutOfBoundsValue() async { let stream = "retry: 10000000000000000000000000\ndata: test\n\n" let parser = EventSource.Parser() let initialReconnectionTime = await parser.getReconnectionTime() @@ -636,7 +636,7 @@ struct ParserTests { } @Test("Retry persists across events") - func testRetryPersistsAcrossEvents() async { + func retryPersistsAcrossEvents() async { let stream = "retry: 7000\ndata: event1\n\nretry: 5000\ndata: event2\n\n" let parser = EventSource.Parser() for byte in stream.utf8 {