-
Notifications
You must be signed in to change notification settings - Fork 48
SSE Client Oversized Messages Handling #744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SSE Client Oversized Messages Handling #744
Conversation
…with tests and documentation
That's ture, we at work configured it to 5MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks , but seems the Java api is missing, it is needed , if we want to build a client like Spring Ai
.../scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/EventStreamParserOversizedSpec.scala
Outdated
Show resolved
Hide resolved
http/src/main/scala/org/apache/pekko/http/scaladsl/settings/OversizedSseStrategy.scala
Show resolved
Hide resolved
...sts/src/test/java/org/apache/pekko/http/javadsl/settings/OversizedSseStrategySimpleTest.java
Outdated
Show resolved
Hide resolved
http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/LineParserSpec.scala
Show resolved
Hide resolved
http/src/main/scala/org/apache/pekko/http/javadsl/settings/ServerSentEventSettings.scala
Show resolved
Hide resolved
need sbt javafmtAll and scalafmt |
Scala 2.12 build has
Can you fix the import and try to work out what causes the compile issue - which doesn't seem to happen in Scala 2.13 build? |
This PR is targeting |
Pending CI runs/success, I believe everything else for the PR is addressed and it should be mergeable. |
http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/LineParser.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an awesome contribution: useful goal, reasonable implementation, great tests. I have a few minor comments to doublecheck, but this looks very close to being merge-able to me.
* Fail the stream with an IllegalStateException when an oversized message is encountered. This is | ||
* the default behavior to maintain backward compatibility. | ||
*/ | ||
FailStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the convention for Java enums is still FAIL_STREAM
, right? or is that outdated? For example compared to https://github.com/apache/pekko/blob/main/stream/src/main/java/org/apache/pekko/stream/javadsl/AsPublisher.java ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought on style choices like that is to prioritize consistency with the other code in the repo. Looks like there is some variation throughout all Pekko libraries, though consistent usage of UPPERCASE in pekko streams (like the AsPublisher
example you linked to). But in this pekko-http repo, a quick check looks like CamelCase is used everywhere in the production code (e.g.: Coder
, SameSite
), and the UPPERCASE notation is used in a test (e.g. MyRole
). That makes me think it fits best in the local scope of this repository to keep it as CamelCase. But no strong opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought on style choices like that is to prioritize consistency with the other code in the repo.
I agree!
I don't see clear guidance in https://github.com/apache/pekko/blob/main/CONTRIBUTING.md or https://pekko.apache.org/docs/pekko/current/typed/style-guide.html . Javafmt doesn't seem to have a preference either. I agree it's already inconsistent, and while I'd prefer a consistent style across all Pekko projects, following the 'local convention' here now seems sensible.
builder.appendData(line) | ||
val event = builder.build() | ||
failStage(new IllegalStateException( | ||
s"Oversized SSE Event ${event.id.fold("") { id => s"at ID: $id " }}" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the id
ever be user-controlled here? Perhaps to be safe it might make sense to use a ExceptionWithErrorInfo
here where the id is in the 'detail' rather than the 'summary'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure what you mean by "user-controlled". The exception would occur in the SSE client. The id
value comes from the SSE id:
field which is data sent from the server. The meaning of the ID field from the SSE spec is that it "sets the last event ID string," which means it identifies and distinguishes each semantically-whole SSE event (which will be spread across multiple lines), and is used for tracking events and resuming connections (e.g. browsers automatically send Last-Event-ID
header on reconnect). The spec only limits it to a UTF-8 string of non-null characters. No length constraint (hence the need for a line limit which is the topic of this PR). In general, it's usually a counter that behaves like the numeric offset in Kafka.
I'm not really familiar with the intended use of ExceptionWithErrorInfo
and its docstring didn't help me much. Looking through the code, I see 13 total uses. Those uses look like they're all for processing HTTP entities and contexts on the server-side. I don't really have an opinion on whether that's the right exception choice to use here because I don't understand the intention of that exception class.
The choice of failing the stage with an IllegalStateException
is actually just preserving what the code used to do previously and keeps the default behavior (and exception type) unless the user opts into one of the new choices. I'm happy to change it though if there's clear guidance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scenario would be an application that connects to 'untrusted' SSE servers and exposes the result somewhere, including showing an error based on the exception. As a 'defense in depth' mechanism, it would be good to avoid including attacker-controllable input in that error, but it would be good for diagnostics if it's possible to have it in the logs. For that reason we have exceptions in the ExceptionWithErrorInfo
hierarchy, which allow splitting the message into summary
and 'details', where only the 'summary' is exposed by default, and 'details' can contain additional (possibly attacker-controlled) details. This is briefly documented at https://github.com/apache/pekko-http/blob/main/http-core/src/main/scala/org/apache/pekko/http/scaladsl/model/ErrorInfo.scala#L25 and https://pekko.apache.org/docs/pekko-http/current/routing-dsl/exception-handling.html#including-sensitive-data-in-exceptions
The choice of failing the stage with an
IllegalStateException
is actually just preserving what the code used to do previously and keeps the default behavior (and exception type) unless the user opts into one of the new choices. I'm happy to change it though if there's clear guidance.
That makes sense (though the original exception didn't include the id...)
I guess perhaps it's somewhat far-fetched in this case, I'm ok with a 'traditional' exception.
Related tiny PR to avoid swallowing the error message in the SSE connector is here: apache/pekko-connectors#1205 |
SSE is needed to working with Ai:) |
yeah. but watch out. 😞
|
* | ||
* @since 1.3.0 | ||
*/ | ||
public enum OversizedSseStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding the getInstance method in the Scala OversizedSseStrategy, this will keep the same as pekko, otherwise, there will be two OversizedSseStrategys shows in IDE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any precedence for that elsewhere? I think it's OK to have two, people will recognize the javadsl/scaladsl package names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Pekko-core org.apache.pekko.stream.OverflowStrategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, so that's not in a javadsl
/scaladsl
package but moves it to the common parent package. https://github.com/apache/pekko/blob/main/CONTRIBUTING.md#java-apis-in-pekko does not provide a clear preference, and I don't think I have a clear preference either.
* | ||
* @since 1.3.0 | ||
*/ | ||
public enum OversizedSseStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any precedence for that elsewhere? I think it's OK to have two, people will recognize the javadsl/scaladsl package names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks!!!
thanks any improvement can come up later |
Problem
The current SSE (Server-Sent Events) client implementation fails its stream every time a message arrives over the size limits. When a Pekko SSE client connects to a Pekko SSE server who serves a message larger than the size limit, the client fails the stream, tries to reconnect and resume at the same (oversized) message and fails again. The client falls into an infinite connection retry loop. The oversized message error is swallowed by the sse-connector (another PR for the
pekko-connectors
library will follow), so the user never knows what is happening. The observed behavior appears as if the stream is still active and just gets stuck at the message that preceeded the oversized message.Solution
This PR introduces two new configuration values:
pekko.http.sse.oversized-line-handling
andpekko.http.sse.oversized-event-handling
which give the user additional options for how the SSE client should handle receiving a message sized over the configured limit. Four options are available for each:fail-stream
: Terminates the stream when an oversized line/event is encountered (this is the previous behavior and the new default behavior)log-and-skip
: Log oversized line/event as a warning, skips it and continues processing the streamtruncate
: Truncates the oversized line/event to fit within limits and continue (event truncation is by-line, for safety reasons)dead-letter
: Sends the oversized line/event to dead letter queue (truncated for safety, if need be) and continuesTests
Debatable Topics
Calling out specifically some changes which could very reasonably be otherwise:
Config valuesThe following config value changes were pulled out to simplify the PR. I'll submit an issue with a recommendation and summary of my comparison research for what better default values could be. Then a separate PR can consider that alone.
Size limits of oversized messages are determined by two config values (one deprecated),max-event-size
andmax-live-size
which have their own complex relationships and very small default values. This PR increases the default values to the smallest reasonable size for modern uses (estimated by looking at standards applied in browsers. I found none for SSE, but Websocket limits seem to be within the same spirit and some data on browser limits was available.The previous limits (4k and 8k) are remarkably small for modern use cases. As an example: ethereum block chain events (a common enough SSE use case) can be hundreds of kilobytes. While it is good practice to keep Pekko messages small, these limits affect what the client is allowed to consume from the server over HTTP. They aren't primarily about Pekko messages. Larger defaults seem like a good idea, but it's not core to the primary change proposed in this PR.Unlimited message size
This PR adds the option to disable line and event size limits entirely by setting those config settings to zero.