diff --git a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParserSpec.scala b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParserSpec.scala index 5228a9c7ae..caa399324c 100644 --- a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParserSpec.scala +++ b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParserSpec.scala @@ -19,6 +19,7 @@ package sse import org.apache.pekko import pekko.http.scaladsl.model.sse.ServerSentEvent import pekko.stream.scaladsl.{ Sink, Source } +import pekko.util.ByteString import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec @@ -132,5 +133,34 @@ final class ServerSentEventParserSpec extends AsyncWordSpec with Matchers with B ServerSentEvent("event 3", None, Some("")), ServerSentEvent("event 4"))) } + "parse ServerSentEvents with CRLF line endings" in { + Source( + Vector( + "data: event 1\r", + "data: event 1 line 2\r", + "\r", + "data: event 2\r", + "event: my-event\r", + "id: 42\r", + "\r", + "\r")) + .via(new ServerSentEventParser(1048576, emitEmptyEvents = false)) + .runWith(Sink.seq) + .map( + _ shouldBe Vector( + ServerSentEvent("event 1\nevent 1 line 2"), + ServerSentEvent("event 2", Some("my-event"), Some("42")))) + } + "parse ServerSentEvents from a CRLF byte stream" in { + val input = ByteString("data: event 1\r\ndata: event 1 line 2\r\n\r\ndata: event 2\r\nevent: my-event\r\nid: 42\r\n\r\n") + Source + .single(input) + .via(EventStreamParser(1048576, 1048576, emitEmptyEvents = false)) + .runWith(Sink.seq) + .map( + _ shouldBe Vector( + ServerSentEvent("event 1\nevent 1 line 2"), + ServerSentEvent("event 2", Some("my-event"), Some("42")))) + } } } diff --git a/http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala b/http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala index c40edf7a5e..3875d3d841 100644 --- a/http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala +++ b/http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala @@ -126,7 +126,7 @@ private final class ServerSentEventParser( setHandlers(in, out, this) override def onPush(): Unit = { - val line = grab(in) + val line = grab(in).stripSuffix("\r") if (shouldSkipUntilEventEnd) { // Max event size was previously reached. Skip successive lines until event ends if (line.isEmpty) shouldSkipUntilEventEnd = false // Stop skipping when end of event (empty line) is reached pull(in) // Already reported oversized event (below). Drop and continue to next line.