Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions Sources/EventSource/EventSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ public actor EventSource {
handleLine(line)
}

// Send an empty line to trigger event dispatch for most test cases
// This matches the behavior expected by most tests
// Send an empty line to trigger event dispatch
if !currentData.isEmpty || currentEventId != nil || currentEventType != nil {
handleLine("")
}
Expand All @@ -271,13 +270,25 @@ public actor EventSource {
/// The current state of the connection (connecting, open, or closed).
public private(set) var readyState: ReadyState = .connecting

/// The maximum number of events to deliver when finalizing parsing.
///
/// This limit prevents unbounded memory growth in edge cases where
/// a server sends a large burst of events just before closing the connection.
/// When the limit is reached, remaining events in the parser's queue are discarded.
///
/// The default value of 100 should be sufficient for most use cases.
/// Increase this value if your application needs to buffer more events
/// during connection finalization.
public var maximumFinalizationEventCount: Int = 100

// Backing storage for callbacks
private var _onOpenCallback: (@Sendable () -> Void)?
private var _onMessageCallback: (@Sendable (Event) -> Void)?
private var _onErrorCallback: (@Sendable (Swift.Error?) -> Void)?
private var _onOpenCallback: (@Sendable () async -> Void)?
private var _onMessageCallback: (@Sendable (Event) async -> Void)?
private var _onErrorCallback: (@Sendable (Swift.Error?) async -> Void)?

/// The callback to invoke when the connection is opened.
nonisolated public var onOpen: (@Sendable () -> Void)? {
/// This callback is awaited before proceeding with reconnection or completion.
nonisolated public var onOpen: (@Sendable () async -> Void)? {
get { fatalError("onOpen can only be set, not read") }
set {
if let newValue {
Expand All @@ -287,7 +298,8 @@ public actor EventSource {
}

/// The callback to invoke when a message is received.
nonisolated public var onMessage: (@Sendable (Event) -> Void)? {
/// This callback is awaited before proceeding with reconnection or completion.
nonisolated public var onMessage: (@Sendable (Event) async -> Void)? {
get { fatalError("onMessage can only be set, not read") }
set {
if let newValue {
Expand All @@ -297,7 +309,8 @@ public actor EventSource {
}

/// The callback to invoke when an error occurs.
nonisolated public var onError: (@Sendable (Swift.Error?) -> Void)? {
/// This callback is awaited before proceeding with reconnection or completion.
nonisolated public var onError: (@Sendable (Swift.Error?) async -> Void)? {
get { fatalError("onError can only be set, not read") }
set {
if let newValue {
Expand All @@ -307,15 +320,15 @@ public actor EventSource {
}

// Actor-isolated setters
private func setOnOpenCallback(_ callback: @escaping @Sendable () -> Void) {
private func setOnOpenCallback(_ callback: @escaping @Sendable () async -> Void) {
self._onOpenCallback = callback
}

private func setOnMessageCallback(_ callback: @escaping @Sendable (Event) -> Void) {
private func setOnMessageCallback(_ callback: @escaping @Sendable (Event) async -> Void) {
self._onMessageCallback = callback
}

private func setOnErrorCallback(_ callback: @escaping @Sendable (Swift.Error?) -> Void) {
private func setOnErrorCallback(_ callback: @escaping @Sendable (Swift.Error?) async -> Void) {
self._onErrorCallback = callback
}

Expand Down Expand Up @@ -394,7 +407,6 @@ public actor EventSource {
currentRequest.setValue(lastEventId, forHTTPHeaderField: "Last-Event-ID")
}

// Perform the HTTP request and get an asynchronous byte stream.
let (byteStream, response) = try await session.bytes(
for: currentRequest,
delegate: nil
Expand All @@ -404,53 +416,48 @@ public actor EventSource {
if let httpResponse = response as? HTTPURLResponse {
let status = httpResponse.statusCode
if status != 200 {
// HTTP status not OK -> do not reconnect (per spec, fail the connection).
throw EventSourceError.invalidHTTPStatus(status)
}
// Check Content-Type header
let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type")
if contentType?.lowercased().hasPrefix("text/event-stream") != true {
throw EventSourceError.invalidContentType(contentType)
}
} else {
// Non-HTTP response (unlikely for URLSession) -> treat as error.
throw EventSourceError.invalidHTTPStatus(0)
}

// Connection is established and content type is correct.
// Update state to `.open` and notify.
readyState = .open
if let onOpen = _onOpenCallback {
onOpen()
}
await _onOpenCallback?()

// Read the incoming byte stream and parse events.
for try await byte in byteStream {
// If closed or task cancelled during streaming, break out.
if Task.isCancelled || readyState == .closed {
break
}

await parser.consume(byte)

// Retrieve all complete events available after processing this byte.
while let event = await parser.getNextEvent() {
// Trigger onMessage callback for each event.
if let onMessage = _onMessageCallback {
onMessage(event)
}
await _onMessageCallback?(event)
}
}

// End of stream reached (server closed connection).
await parser.finish() // finalize parsing, drop any partial event
await parser.finish() // finalize parsing, delivers any pending events to queue

// Deliver any events that were queued during finish()
var eventsDelivered = 0
let maxEvents = maximumFinalizationEventCount
while let event = await parser.getNextEvent(), eventsDelivered < maxEvents {
eventsDelivered += 1
await _onMessageCallback?(event)
}

// If not cancelled and still open, treat as a disconnection to reconnect.
if !Task.isCancelled && readyState != .closed {
// Notify an error event due to unexpected close, then loop to reconnect.
if let onError = _onErrorCallback {
onError(nil) // stream closed without error (will attempt reconnect)
}
await _onErrorCallback?(nil) // stream closed without error (will attempt reconnect)
} else {
// If cancelled or closed intentionally, break without reconnecting.
break
Expand All @@ -463,9 +470,7 @@ public actor EventSource {
}

// Notify error event.
if let onError = _onErrorCallback {
onError(error)
}
await _onErrorCallback?(error)

// For HTTP status/content-type errors, break out (do not reconnect as per spec).
if error is EventSourceError {
Expand Down
30 changes: 15 additions & 15 deletions Tests/EventSourceTests/AsyncEventsSequenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Testing
@Suite("AsyncEventsSequence Tests", .timeLimit(.minutes(1)))
struct AsyncEventsSequenceTests {
@Test("Basic event parsing from byte sequence")
func testBasicEventParsing() async throws {
func basicEventParsing() async throws {
// Create a simple SSE data string
let sseData = "data: hello\n\n"

Expand All @@ -28,7 +28,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Multiple events parsing")
func testMultipleEventsParsing() async throws {
func multipleEventsParsing() async throws {
// Create a string with multiple SSE events
let sseData = """
data: event1
Expand All @@ -55,7 +55,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Event with all fields")
func testEventWithAllFields() async throws {
func eventWithAllFields() async throws {
let sseData = """
id: 123
event: test
Expand All @@ -75,7 +75,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Empty sequence")
func testEmptySequence() async throws {
func emptySequence() async throws {
let emptySequence = AsyncBytes("".utf8).events
var iterator = emptySequence.makeAsyncIterator()
let event = try await iterator.next()
Expand All @@ -85,7 +85,7 @@ struct AsyncEventsSequenceTests {
@Suite("Line Break Handling")
struct LineBreakTests {
@Test("CRLF line breaks")
func testCRLFLineBreaks() async throws {
func crlfLineBreaks() async throws {
let sseData = "data: hello\r\n\r\n"
var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator()
let event = try await iterator.next()
Expand All @@ -95,7 +95,7 @@ struct AsyncEventsSequenceTests {
}

@Test("CR line breaks")
func testCRLineBreaks() async throws {
func crLineBreaks() async throws {
let sseData = "data: hello\r\r"
var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator()
let event = try await iterator.next()
Expand All @@ -108,7 +108,7 @@ struct AsyncEventsSequenceTests {
@Suite("Comment Handling")
struct CommentTests {
@Test("Comments in event stream")
func testComments() async throws {
func comments() async throws {
let sseData = """
:comment line
data: hello
Expand All @@ -124,7 +124,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Only comments")
func testOnlyComments() async throws {
func onlyComments() async throws {
let sseData = """
:comment line 1
:comment line 2
Expand All @@ -140,7 +140,7 @@ struct AsyncEventsSequenceTests {
@Suite("Multi-line Data")
struct MultilineDataTests {
@Test("Multiple data lines")
func testMultipleDataLines() async throws {
func multipleDataLines() async throws {
let sseData = """
data: line1
data: line2
Expand All @@ -156,7 +156,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Empty data line")
func testEmptyDataLine() async throws {
func emptyDataLine() async throws {
// The SSE spec states that if there's a data: field with no value,
// it should be treated as an empty string value, not as absent data
let parser = EventSource.Parser()
Expand All @@ -175,7 +175,7 @@ struct AsyncEventsSequenceTests {
@Suite("Retry Field")
struct RetryFieldTests {
@Test("Only retry field - no event")
func testOnlyRetryField() async throws {
func onlyRetryField() async throws {
let sseData = "retry: 1000\n\n"

var iterator = AsyncBytes(sseData.utf8).events.makeAsyncIterator()
Expand All @@ -184,7 +184,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Retry with data")
func testRetryWithData() async throws {
func retryWithData() async throws {
let sseData = """
retry: 1000
data: hello
Expand All @@ -202,7 +202,7 @@ struct AsyncEventsSequenceTests {
@Suite("Chunked Data")
struct ChunkedDataTests {
@Test("Chunked delivery simulation")
func testChunkedDelivery() async throws {
func chunkedDelivery() async throws {
// Simulate chunked SSE data delivery
let eventData = "id: 123\nevent: update\ndata: partial content\ndata: more content\n\n"

Expand Down Expand Up @@ -231,7 +231,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Event spanning multiple chunks")
func testEventSpanningChunks() async throws {
func eventSpanningChunks() async throws {
// Create events with fixed size, non-recursive approach
let eventData = "data: first event\n\ndata: second event\n\ndata: third event\n\n"

Expand Down Expand Up @@ -261,7 +261,7 @@ struct AsyncEventsSequenceTests {
}

@Test("Integration with AsyncSequence operators")
func testAsyncSequenceIntegration() async throws {
func asyncSequenceIntegration() async throws {
let sseData = "data: event1\n\ndata: event2\n\ndata: event3\n\n"

// Convert to async sequence of bytes
Expand Down
Loading