Skip to content

Commit fc9be41

Browse files
authored
SSE Client Oversized Messages Handling (#744)
* Add options for how to handle SSE client consumer oversized messages with tests and documentation * Java API and formatting. * Pass oversized message to dead letter queue with a type for pattern matching * License headers, markdown whitespace, restore test, since annotation * Separate oversized strategies for lines and events. Fix compilation in 2.12. * binary compat, scala 3 compat. restore original default size limit values * more binary compat. docs cleanup * Explicitly name size variable as counting bytes.
1 parent 5c2b6fc commit fc9be41

File tree

18 files changed

+1732
-36
lines changed

18 files changed

+1732
-36
lines changed

docs/src/main/paradox/common/sse-support.md

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,168 @@ Scala
5656
Java
5757
: @@snip [EventStreamMarshallingTest.java](/http-tests/src/test/java/org/apache/pekko/http/javadsl/unmarshalling/sse/EventStreamUnmarshallingTest.java) { #event-stream-unmarshalling-example }
5858

59+
## Configuration
60+
61+
Apache Pekko HTTP provides several configuration options for Server-Sent Events handling:
62+
63+
### Message Size Limits
64+
65+
The SSE client parser has configurable limits to handle various message sizes:
66+
67+
```hocon
68+
pekko.http.sse {
69+
# The maximum size for parsing server-sent events. Set to 0 to disable limit entirely (unlimited).
70+
max-event-size = 8192
71+
72+
# The maximum size for parsing lines of a server-sent event. Set to 0 to disable limit entirely (unlimited).
73+
max-line-size = 4096
74+
}
75+
```
76+
77+
### Oversized Message Handling
78+
79+
Apache Pekko HTTP uses a two-stage parsing process for SSE streams, and oversized content can be handled at either stage:
80+
81+
1. **Line-level parsing**: Individual lines are checked against `max-line-size`
82+
2. **Event-level parsing**: Complete events are limited to `max-event-size`
83+
84+
When SSE content exceeds the configured limits, Apache Pekko HTTP provides four handling strategies that can be configured separately for lines and events:
85+
86+
- **fail-stream** (default): Fails the stream with an error message
87+
- **log-and-skip**: Logs a warning and skips the oversized content, continuing stream processing
88+
- **truncate**: Logs an info message and handles oversized content appropriately, continuing processing
89+
- **dead-letter**: Sends the oversized content to the dead letter queue, continuing processing
90+
91+
**Warning about truncate strategy**: For event-level truncation, the strategy drops entire lines that would exceed event size limits rather than truncating field values. This can change event semantics in unexpected ways when non-data fields (like `id:` or `event:`) are dropped. For predictable behavior, ensure that `id:` and `event:` fields appear before `data:` fields in your SSE events, or consider using `log-and-skip` or `dead-letter` strategies instead.
92+
93+
```hocon
94+
pekko.http.sse {
95+
# How to handle lines that exceed max-line-size limit
96+
# Options:
97+
# "fail-stream" - Fail the stream with an error message (default)
98+
# "log-and-skip" - Log a warning and skip the oversized line, continuing stream processing
99+
# "truncate" - Log an info message and truncate the line to max-line-size, continuing processing
100+
# "dead-letter" - Send oversized line to the dead letter queue, continuing processing
101+
oversized-line-handling = "fail-stream"
102+
103+
# How to handle events that exceed max-event-size limit
104+
# Options:
105+
# "fail-stream" - Fail the stream with an error message (default)
106+
# "log-and-skip" - Log a warning and skip the oversized event, continuing stream processing
107+
# "truncate" - Log an info message and drop lines that would exceed max-event-size, continuing processing
108+
# "dead-letter" - Send oversized event to the dead letter queue, continuing processing
109+
oversized-event-handling = "fail-stream"
110+
}
111+
```
112+
113+
#### Line vs Event Handling Examples
114+
115+
Line-level and event-level size limits are imposed separately and their behavior is different:
116+
- Lines are parsed one line at a time. The limits and handling strategy are applied per line. Line length limits include
117+
the SSE field names (`id: `, `data: `, `event: `, etc.).
118+
- Events are built from successive `data:` lines. As each line is added to the built event, the event size limit is used
119+
to short-circuit processing of the current and/or subsequent lines. This limit is generally meant to help prevent
120+
runaway memory usage causing an application crash from a single (possible erroneous) message from the server.
121+
122+
Since line and event strategies can be configured independently, you can have different behaviors for each level. For example:
123+
```hocon
124+
pekko.http.sse {
125+
oversized-line-handling = "truncate" # Truncate oversized lines
126+
oversized-event-handling = "log-and-skip" # Skip oversized events entirely
127+
}
128+
```
129+
130+
**Example 1: Oversized Line in Multi-line Event**
131+
132+
Consider this SSE event with `max-line-size = 50`:
133+
134+
```
135+
data: This is a normal line
136+
data: This line is much too long and exceeds the configured max-line-size limit by a lot
137+
data: Another normal line
138+
139+
```
140+
141+
With **log-and-skip** strategy:
142+
- The oversized line gets skipped
143+
- The resulting event contains only:
144+
```
145+
data: This is a normal line
146+
data: Another normal line
147+
```
148+
149+
With **truncate** strategy:
150+
- The oversized line gets truncated to 50 characters
151+
- The resulting event contains:
152+
```
153+
data: This is a normal line
154+
data: This line is much too long and exceeds the c
155+
data: Another normal line
156+
```
157+
158+
With **dead-letter** strategy:
159+
- The oversized line is sent to the dead letter queue as `OversizedSseLine(line: String)`
160+
- The resulting event contains only:
161+
```
162+
data: This is a normal line
163+
data: Another normal line
164+
```
165+
166+
**Example 2: Event Exceeds max-event-size**
167+
168+
If the complete event (after line processing) exceeds `max-event-size`, the strategy applies to the entire event:
169+
170+
```
171+
data: Line 1
172+
data: Line 2
173+
data: Line 3
174+
[... many more lines causing total event size > max-event-size]
175+
```
176+
177+
With **log-and-skip**: The entire event is logged and skipped
178+
With **truncate**: The event is truncated by dropping the final line(s) that would exceed the limit, and the remaining event is emitted
179+
With **dead-letter**: The oversized event is sent to dead letters as `OversizedSseEvent(event: ServerSentEvent)` and skipped from the stream
180+
181+
**Example 3: Event Size Exceeded During Line Parsing (Memory Protection)**
182+
183+
If while parsing individual lines, the accumulated event size would exceed `max-event-size`, the parser stops processing
184+
additional lines for that event to prevent memory exhaustion:
185+
186+
```
187+
data: Line 1 (fits)
188+
data: Line 2 (fits)
189+
data: Line 3 (would exceed max-event-size)
190+
data: Line 4 (fits line limit)
191+
```
192+
193+
With **log-and-skip**:
194+
- The event with Line 3 is built and logged as oversized
195+
- Lines 3 and 4 are both skipped until the empty line
196+
- No event is emitted to the stream
197+
198+
With **truncate**:
199+
- The event with lines 1 and 2 (without Line 3) is emitted
200+
- Lines 3 and 4 are both skipped until the empty line
201+
202+
With **dead-letter**:
203+
- The event with Line 3 is built and sent to dead letters as `OversizedSseEvent(event: ServerSentEvent)`
204+
- Lines 3 and 4 are both skipped until the empty line
205+
- No event is emitted to the stream
206+
207+
This behavior prevents the application from running out of memory when processing very large events, as the parser
208+
immediately stops accumulating data once the size limit would be exceeded.
209+
210+
#### Processing Order
211+
212+
1. **First**: Individual lines are processed against `max-line-size`
213+
2. **Then**: Each line is added to the event builder, checking against `max-event-size` during accumulation
214+
215+
This means an event can have individual lines handled by the line-level strategy, then be subject to event-level
216+
handling during building as lines are accumulated.
217+
218+
For applications that need to handle very large messages (like blockchain data or detailed JSON payloads), consider
219+
setting `max-line-size = 0` and `max-event-size = 0` to disable limits entirely, or use one of the non-failing handling modes.
220+
59221
Notice that if you are looking for a resilient way to permanently subscribe to an event stream,
60-
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
222+
Apache Pekko Connectors provides the [EventSource](https://pekko.apache.org/docs/pekko-connectors/current/sse.html)
61223
connector which reconnects automatically with the id of the last seen event.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.http.javadsl.settings;
19+
20+
import org.junit.Test;
21+
import org.scalatestplus.junit.JUnitSuite;
22+
import static org.junit.Assert.*;
23+
24+
public class OversizedSseStrategySimpleTest extends JUnitSuite {
25+
26+
@Test
27+
public void testEnumValues() {
28+
// Simple test that the enum values exist and can be converted
29+
OversizedSseStrategy failStream = OversizedSseStrategy.FailStream;
30+
OversizedSseStrategy logAndSkip = OversizedSseStrategy.LogAndSkip;
31+
OversizedSseStrategy truncate = OversizedSseStrategy.Truncate;
32+
OversizedSseStrategy deadLetter = OversizedSseStrategy.DeadLetter;
33+
34+
assertNotNull("FailStream should not be null", failStream);
35+
assertNotNull("LogAndSkip should not be null", logAndSkip);
36+
assertNotNull("Truncate should not be null", truncate);
37+
assertNotNull("DeadLetter should not be null", deadLetter);
38+
}
39+
40+
@Test
41+
public void testFromScala() {
42+
// Test that fromScala method works
43+
OversizedSseStrategy strategy =
44+
OversizedSseStrategy.fromScala(
45+
org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy.FailStream$.MODULE$);
46+
assertEquals("Should convert from Scala FailStream", OversizedSseStrategy.FailStream, strategy);
47+
}
48+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.http.javadsl.unmarshalling.sse;
19+
20+
import org.apache.pekko.actor.ActorSystem;
21+
import org.apache.pekko.http.javadsl.settings.OversizedSseStrategy;
22+
import org.apache.pekko.http.javadsl.settings.ServerSentEventSettings;
23+
import org.junit.AfterClass;
24+
import org.junit.BeforeClass;
25+
import org.junit.Test;
26+
import org.scalatestplus.junit.JUnitSuite;
27+
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.junit.Assert.*;
31+
32+
public class EventStreamUnmarshallingOversizedTest extends JUnitSuite {
33+
34+
private static ActorSystem system;
35+
36+
@BeforeClass
37+
public static void setup() {
38+
system = ActorSystem.create();
39+
}
40+
41+
@AfterClass
42+
public static void teardown() throws Exception {
43+
system.terminate();
44+
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
45+
}
46+
47+
@Test
48+
public void testOversizedStrategyEnum() {
49+
// Test that the Java enum can be used with the settings
50+
ServerSentEventSettings settings =
51+
ServerSentEventSettings.create(system)
52+
.withLineLength(50)
53+
.withOversizedLineStrategy(OversizedSseStrategy.FailStream);
54+
55+
assertEquals("Should have correct line length", 50, settings.maxLineSize());
56+
assertEquals(
57+
"Should have FailStream line strategy",
58+
OversizedSseStrategy.FailStream,
59+
settings.getOversizedLineStrategyEnum());
60+
61+
// Test all strategies can be set
62+
settings = settings.withOversizedLineStrategy(OversizedSseStrategy.LogAndSkip);
63+
assertEquals(
64+
"Should have LogAndSkip line strategy",
65+
OversizedSseStrategy.LogAndSkip,
66+
settings.getOversizedLineStrategyEnum());
67+
68+
settings = settings.withOversizedLineStrategy(OversizedSseStrategy.Truncate);
69+
assertEquals(
70+
"Should have Truncate line strategy",
71+
OversizedSseStrategy.Truncate,
72+
settings.getOversizedLineStrategyEnum());
73+
74+
settings = settings.withOversizedLineStrategy(OversizedSseStrategy.DeadLetter);
75+
assertEquals(
76+
"Should have DeadLetter line strategy",
77+
OversizedSseStrategy.DeadLetter,
78+
settings.getOversizedLineStrategyEnum());
79+
}
80+
81+
@Test
82+
public void testOversizedLineStrategyStringCompatibility() {
83+
// Test that the string-based API works for line strategies
84+
ServerSentEventSettings settings =
85+
ServerSentEventSettings.create(system).withOversizedLineStrategy("log-and-skip");
86+
87+
assertEquals(
88+
"Should have log-and-skip line strategy string",
89+
"log-and-skip",
90+
settings.getOversizedLineStrategy());
91+
assertEquals(
92+
"Should have LogAndSkip line strategy enum",
93+
OversizedSseStrategy.LogAndSkip,
94+
settings.getOversizedLineStrategyEnum());
95+
}
96+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* license agreements; and to You under the Apache License, version 2.0:
4+
*
5+
* https://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* This file is part of the Apache Pekko project, which was derived from Akka.
8+
*/
9+
10+
package org.apache.pekko.http
11+
package scaladsl
12+
package settings
13+
14+
import org.apache.pekko.http.scaladsl.settings.OversizedSseStrategy
15+
import org.scalatest.matchers.should.Matchers
16+
import org.scalatest.wordspec.AnyWordSpec
17+
18+
final class OversizedSseStrategySpec extends AnyWordSpec with Matchers {
19+
20+
"OversizedSseStrategy" should {
21+
"parse valid string values correctly" in {
22+
OversizedSseStrategy.fromString("fail-stream") shouldBe OversizedSseStrategy.FailStream
23+
OversizedSseStrategy.fromString("log-and-skip") shouldBe OversizedSseStrategy.LogAndSkip
24+
OversizedSseStrategy.fromString("truncate") shouldBe OversizedSseStrategy.Truncate
25+
OversizedSseStrategy.fromString("dead-letter") shouldBe OversizedSseStrategy.DeadLetter
26+
}
27+
28+
"throw IllegalArgumentException for invalid string values" in {
29+
val exception = intercept[IllegalArgumentException] {
30+
OversizedSseStrategy.fromString("invalid-strategy")
31+
}
32+
exception.getMessage should include("Invalid oversized-message-handling: 'invalid-strategy'")
33+
exception.getMessage should include("Valid options are: fail-stream, log-and-skip, truncate, dead-letter")
34+
}
35+
36+
"convert strategy objects back to strings correctly" in {
37+
OversizedSseStrategy.toString(OversizedSseStrategy.FailStream) shouldBe "fail-stream"
38+
OversizedSseStrategy.toString(OversizedSseStrategy.LogAndSkip) shouldBe "log-and-skip"
39+
OversizedSseStrategy.toString(OversizedSseStrategy.Truncate) shouldBe "truncate"
40+
OversizedSseStrategy.toString(OversizedSseStrategy.DeadLetter) shouldBe "dead-letter"
41+
}
42+
43+
"handle case-sensitive strings" in {
44+
intercept[IllegalArgumentException] {
45+
OversizedSseStrategy.fromString("FAIL-STREAM")
46+
}
47+
intercept[IllegalArgumentException] {
48+
OversizedSseStrategy.fromString("Fail-Stream")
49+
}
50+
}
51+
52+
"handle empty and null strings" in {
53+
intercept[IllegalArgumentException] {
54+
OversizedSseStrategy.fromString("")
55+
}
56+
intercept[IllegalArgumentException] {
57+
OversizedSseStrategy.fromString(null)
58+
}
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)