Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 163 additions & 1 deletion docs/src/main/paradox/common/sse-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,168 @@ Scala
Java
: @@snip [EventStreamMarshallingTest.java](/http-tests/src/test/java/org/apache/pekko/http/javadsl/unmarshalling/sse/EventStreamUnmarshallingTest.java) { #event-stream-unmarshalling-example }

## Configuration

Apache Pekko HTTP provides several configuration options for Server-Sent Events handling:

### Message Size Limits

The SSE client parser has configurable limits to handle various message sizes:

```hocon
pekko.http.sse {
# The maximum size for parsing server-sent events. Set to 0 to disable limit entirely (unlimited).
max-event-size = 8192

# The maximum size for parsing lines of a server-sent event. Set to 0 to disable limit entirely (unlimited).
max-line-size = 4096
}
```

### Oversized Message Handling

Apache Pekko HTTP uses a two-stage parsing process for SSE streams, and oversized content can be handled at either stage:

1. **Line-level parsing**: Individual lines are checked against `max-line-size`
2. **Event-level parsing**: Complete events are limited to `max-event-size`

When SSE content exceeds the configured limits, Apache Pekko HTTP provides four handling strategies that can be configured separately for lines and events:

- **fail-stream** (default): Fails the stream with an error message
- **log-and-skip**: Logs a warning and skips the oversized content, continuing stream processing
- **truncate**: Logs an info message and handles oversized content appropriately, continuing processing
- **dead-letter**: Sends the oversized content to the dead letter queue, continuing processing

**Warning about truncate strategy**: For event-level truncation, the strategy drops entire lines that would exceed event size limits rather than truncating field values. This can change event semantics in unexpected ways when non-data fields (like `id:` or `event:`) are dropped. For predictable behavior, ensure that `id:` and `event:` fields appear before `data:` fields in your SSE events, or consider using `log-and-skip` or `dead-letter` strategies instead.

```hocon
pekko.http.sse {
# How to handle lines that exceed max-line-size limit
# Options:
# "fail-stream" - Fail the stream with an error message (default)
# "log-and-skip" - Log a warning and skip the oversized line, continuing stream processing
# "truncate" - Log an info message and truncate the line to max-line-size, continuing processing
# "dead-letter" - Send oversized line to the dead letter queue, continuing processing
oversized-line-handling = "fail-stream"

# How to handle events that exceed max-event-size limit
# Options:
# "fail-stream" - Fail the stream with an error message (default)
# "log-and-skip" - Log a warning and skip the oversized event, continuing stream processing
# "truncate" - Log an info message and drop lines that would exceed max-event-size, continuing processing
# "dead-letter" - Send oversized event to the dead letter queue, continuing processing
oversized-event-handling = "fail-stream"
}
```

#### Line vs Event Handling Examples

Line-level and event-level size limits are imposed separately and their behavior is different:
- Lines are parsed one line at a time. The limits and handling strategy are applied per line. Line length limits include
the SSE field names (`id: `, `data: `, `event: `, etc.).
- Events are built from successive `data:` lines. As each line is added to the built event, the event size limit is used
to short-circuit processing of the current and/or subsequent lines. This limit is generally meant to help prevent
runaway memory usage causing an application crash from a single (possible erroneous) message from the server.

Since line and event strategies can be configured independently, you can have different behaviors for each level. For example:
```hocon
pekko.http.sse {
oversized-line-handling = "truncate" # Truncate oversized lines
oversized-event-handling = "log-and-skip" # Skip oversized events entirely
}
```

**Example 1: Oversized Line in Multi-line Event**

Consider this SSE event with `max-line-size = 50`:

```
data: This is a normal line
data: This line is much too long and exceeds the configured max-line-size limit by a lot
data: Another normal line

```

With **log-and-skip** strategy:
- The oversized line gets skipped
- The resulting event contains only:
```
data: This is a normal line
data: Another normal line
```

With **truncate** strategy:
- The oversized line gets truncated to 50 characters
- The resulting event contains:
```
data: This is a normal line
data: This line is much too long and exceeds the c
data: Another normal line
```

With **dead-letter** strategy:
- The oversized line is sent to the dead letter queue as `OversizedSseLine(line: String)`
- The resulting event contains only:
```
data: This is a normal line
data: Another normal line
```

**Example 2: Event Exceeds max-event-size**

If the complete event (after line processing) exceeds `max-event-size`, the strategy applies to the entire event:

```
data: Line 1
data: Line 2
data: Line 3
[... many more lines causing total event size > max-event-size]
```

With **log-and-skip**: The entire event is logged and skipped
With **truncate**: The event is truncated by dropping the final line(s) that would exceed the limit, and the remaining event is emitted
With **dead-letter**: The oversized event is sent to dead letters as `OversizedSseEvent(event: ServerSentEvent)` and skipped from the stream

**Example 3: Event Size Exceeded During Line Parsing (Memory Protection)**

If while parsing individual lines, the accumulated event size would exceed `max-event-size`, the parser stops processing
additional lines for that event to prevent memory exhaustion:

```
data: Line 1 (fits)
data: Line 2 (fits)
data: Line 3 (would exceed max-event-size)
data: Line 4 (fits line limit)
```

With **log-and-skip**:
- The event with Line 3 is built and logged as oversized
- Lines 3 and 4 are both skipped until the empty line
- No event is emitted to the stream

With **truncate**:
- The event with lines 1 and 2 (without Line 3) is emitted
- Lines 3 and 4 are both skipped until the empty line

With **dead-letter**:
- The event with Line 3 is built and sent to dead letters as `OversizedSseEvent(event: ServerSentEvent)`
- Lines 3 and 4 are both skipped until the empty line
- No event is emitted to the stream

This behavior prevents the application from running out of memory when processing very large events, as the parser
immediately stops accumulating data once the size limit would be exceeded.

#### Processing Order

1. **First**: Individual lines are processed against `max-line-size`
2. **Then**: Each line is added to the event builder, checking against `max-event-size` during accumulation

This means an event can have individual lines handled by the line-level strategy, then be subject to event-level
handling during building as lines are accumulated.

For applications that need to handle very large messages (like blockchain data or detailed JSON payloads), consider
setting `max-line-size = 0` and `max-event-size = 0` to disable limits entirely, or use one of the non-failing handling modes.

Notice that if you are looking for a resilient way to permanently subscribe to an event stream,
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
connector which reconnects automatically with the id of the last seen event.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.javadsl.settings;

import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import static org.junit.Assert.*;

public class OversizedSseStrategySimpleTest extends JUnitSuite {

@Test
public void testEnumValues() {
// Simple test that the enum values exist and can be converted
OversizedSseStrategy failStream = OversizedSseStrategy.FailStream;
OversizedSseStrategy logAndSkip = OversizedSseStrategy.LogAndSkip;
OversizedSseStrategy truncate = OversizedSseStrategy.Truncate;
OversizedSseStrategy deadLetter = OversizedSseStrategy.DeadLetter;

assertNotNull("FailStream should not be null", failStream);
assertNotNull("LogAndSkip should not be null", logAndSkip);
assertNotNull("Truncate should not be null", truncate);
assertNotNull("DeadLetter should not be null", deadLetter);
}

@Test
public void testFromScala() {
// Test that fromScala method works
OversizedSseStrategy strategy =
OversizedSseStrategy.fromScala(
org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy.FailStream$.MODULE$);
assertEquals("Should convert from Scala FailStream", OversizedSseStrategy.FailStream, strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.javadsl.unmarshalling.sse;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.settings.OversizedSseStrategy;
import org.apache.pekko.http.javadsl.settings.ServerSentEventSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

public class EventStreamUnmarshallingOversizedTest extends JUnitSuite {

private static ActorSystem system;

@BeforeClass
public static void setup() {
system = ActorSystem.create();
}

@AfterClass
public static void teardown() throws Exception {
system.terminate();
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
}

@Test
public void testOversizedStrategyEnum() {
// Test that the Java enum can be used with the settings
ServerSentEventSettings settings =
ServerSentEventSettings.create(system)
.withLineLength(50)
.withOversizedLineStrategy(OversizedSseStrategy.FailStream);

assertEquals("Should have correct line length", 50, settings.maxLineSize());
assertEquals(
"Should have FailStream line strategy",
OversizedSseStrategy.FailStream,
settings.getOversizedLineStrategyEnum());

// Test all strategies can be set
settings = settings.withOversizedLineStrategy(OversizedSseStrategy.LogAndSkip);
assertEquals(
"Should have LogAndSkip line strategy",
OversizedSseStrategy.LogAndSkip,
settings.getOversizedLineStrategyEnum());

settings = settings.withOversizedLineStrategy(OversizedSseStrategy.Truncate);
assertEquals(
"Should have Truncate line strategy",
OversizedSseStrategy.Truncate,
settings.getOversizedLineStrategyEnum());

settings = settings.withOversizedLineStrategy(OversizedSseStrategy.DeadLetter);
assertEquals(
"Should have DeadLetter line strategy",
OversizedSseStrategy.DeadLetter,
settings.getOversizedLineStrategyEnum());
}

@Test
public void testOversizedLineStrategyStringCompatibility() {
// Test that the string-based API works for line strategies
ServerSentEventSettings settings =
ServerSentEventSettings.create(system).withOversizedLineStrategy("log-and-skip");

assertEquals(
"Should have log-and-skip line strategy string",
"log-and-skip",
settings.getOversizedLineStrategy());
assertEquals(
"Should have LogAndSkip line strategy enum",
OversizedSseStrategy.LogAndSkip,
settings.getOversizedLineStrategyEnum());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.http
package scaladsl
package settings

import org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

final class OversizedSseStrategySpec extends AnyWordSpec with Matchers {

"OversizedSseStrategy" should {
"parse valid string values correctly" in {
OversizedSseStrategy.fromString("fail-stream") shouldBe OversizedSseStrategy.FailStream
OversizedSseStrategy.fromString("log-and-skip") shouldBe OversizedSseStrategy.LogAndSkip
OversizedSseStrategy.fromString("truncate") shouldBe OversizedSseStrategy.Truncate
OversizedSseStrategy.fromString("dead-letter") shouldBe OversizedSseStrategy.DeadLetter
}

"throw IllegalArgumentException for invalid string values" in {
val exception = intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("invalid-strategy")
}
exception.getMessage should include("Invalid oversized-message-handling: 'invalid-strategy'")
exception.getMessage should include("Valid options are: fail-stream, log-and-skip, truncate, dead-letter")
}

"convert strategy objects back to strings correctly" in {
OversizedSseStrategy.toString(OversizedSseStrategy.FailStream) shouldBe "fail-stream"
OversizedSseStrategy.toString(OversizedSseStrategy.LogAndSkip) shouldBe "log-and-skip"
OversizedSseStrategy.toString(OversizedSseStrategy.Truncate) shouldBe "truncate"
OversizedSseStrategy.toString(OversizedSseStrategy.DeadLetter) shouldBe "dead-letter"
}

"handle case-sensitive strings" in {
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("FAIL-STREAM")
}
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("Fail-Stream")
}
}

"handle empty and null strings" in {
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString("")
}
intercept[IllegalArgumentException] {
OversizedSseStrategy.fromString(null)
}
}
}
}
Loading
Loading