Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http
package scaladsl
package unmarshalling
package sse

import org.apache.pekko
import pekko.http.scaladsl.model.sse.ServerSentEvent
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.util.ByteString

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

/**
* Tests for CRLF line-ending handling via the public [[EventStreamParser]] API.
* Covers issue https://github.com/apache/pekko-http/issues/797.
*/
final class EventStreamParserCrlfSpec extends AsyncWordSpec with Matchers with BaseUnmarshallingSpec {

private val maxLineSize = 1048576
private val maxEventSize = 1048576

"EventStreamParser" when {

"receiving a stream with CRLF line endings" should {

"parse a single event with CRLF-terminated data line" in {
val input = ByteString("data: hello\r\n\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(ServerSentEvent("hello")))
}

"parse multiple events all using CRLF line endings" in {
val input = ByteString("data: event1\r\n\r\ndata: event2\r\n\r\ndata: event3\r\n\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("event1"),
ServerSentEvent("event2"),
ServerSentEvent("event3")))
}

"parse all SSE field types with CRLF line endings" in {
val input = ByteString(
"data: the data\r\n" +
"event: my-event\r\n" +
"id: 99\r\n" +
"retry: 3000\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("the data", Some("my-event"), Some("99"), Some(3000))))
}

"parse multi-line data fields with CRLF line endings" in {
val input = ByteString(
"data: line1\r\n" +
"data: line2\r\n" +
"data: line3\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(ServerSentEvent("line1\nline2\nline3")))
}

"ignore comment lines with CRLF endings" in {
val input = ByteString(
"data: event1\r\n" +
":this is a comment\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(ServerSentEvent("event1")))
}

"not emit events with no data field when emitEmptyEvents is false" in {
val input = ByteString(
"data: real\r\n" +
"\r\n" +
"\r\n" +
"data: also real\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize, emitEmptyEvents = false))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("real"),
ServerSentEvent("also real")))
}

"emit empty events (heartbeats) when emitEmptyEvents is true with CRLF" in {
// A heartbeat is a data field with an empty value (i.e. "data: " or "data:"),
// not merely a blank separator line.
val input = ByteString(
"data: before\r\n" +
"\r\n" +
"data: \r\n" +
"\r\n" +
"data: after\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize, emitEmptyEvents = true))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("before"),
ServerSentEvent.heartbeat,
ServerSentEvent("after")))
}
}

"receiving a stream with CR-only (\\r) line endings" should {

"parse a single event with CR-only line endings" in {
val input = ByteString("data: hello\r\r")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(ServerSentEvent("hello")))
}

"parse multiple events with CR-only line endings" in {
val input = ByteString("data: event1\r\rdata: event2\r\r")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("event1"),
ServerSentEvent("event2")))
}

"parse all SSE field types with CR-only line endings" in {
val input = ByteString("data: the data\revent: my-event\rid: 42\rretry: 1000\r\r")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("the data", Some("my-event"), Some("42"), Some(1000))))
}
}

"receiving a stream with mixed line endings" should {

"parse events correctly when line endings vary within the stream" in {
// Mix of LF-only, CR-only, and CRLF
val input = ByteString(
"data: lf-event\n" +
"\n" +
"data: cr-event\r" +
"\r" +
"data: crlf-event\r\n" +
"\r\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("lf-event"),
ServerSentEvent("cr-event"),
ServerSentEvent("crlf-event")))
}

"parse a single event whose fields use different line endings" in {
// Each field line uses a different terminator; the event is terminated by a lone LF
val input = ByteString(
"data: the data\r\n" +
"event: my-event\n" +
"id: 7\r\n" +
"\n")
Source.single(input)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("the data", Some("my-event"), Some("7"))))
}
}

"receiving a CRLF stream delivered in multiple small chunks" should {

"parse events correctly when CRLF is split across chunk boundaries" in {
// The \r and \n of the CRLF pair for event1 arrive in separate ByteString chunks
val chunks = Vector(
ByteString("data: event1\r"), // ends with \r
ByteString("\n\r\n"), // \n completes the CRLF; \r\n is the event separator
ByteString("data: event2\r\n\r\n"))
Source(chunks)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("event1"),
ServerSentEvent("event2")))
}

"parse events correctly when data arrives byte by byte with CRLF" in {
val bytes = ByteString("data: hello\r\n\r\n")
Source(bytes.map(ByteString(_)))
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(ServerSentEvent("hello")))
}

"reassemble a multi-event CRLF stream delivered in arbitrary chunks" in {
val fullStream =
"data: first\r\nevent: alpha\r\nid: 1\r\n\r\n" +
"data: second\r\nid: 2\r\n\r\n"
// split into 5-byte chunks
val chunks = ByteString(fullStream).grouped(5).map(ByteString(_)).toVector
Source(chunks)
.via(EventStreamParser(maxLineSize, maxEventSize))
.runWith(Sink.seq)
.map(_ shouldBe Vector(
ServerSentEvent("first", Some("alpha"), Some("1")),
ServerSentEvent("second", None, Some("2"))))
}
}
}
}
Loading