Skip to content

Commit c1e8728

Browse files
authored
Merge pull request #237 from http4s/pr/optimize-to-readable-stream
Implement `toReadableStream` without `Dispatcher`
2 parents b7eadd3 + 0609176 commit c1e8728

File tree

2 files changed

+87
-30
lines changed

2 files changed

+87
-30
lines changed

dom/src/main/scala/org/http4s/dom/package.scala

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ package org.http4s
1818

1919
import cats.effect.kernel.Async
2020
import cats.effect.kernel.Resource
21-
import cats.effect.std.Dispatcher
22-
import cats.effect.std.Queue
2321
import cats.effect.syntax.all._
2422
import cats.syntax.all._
25-
import fs2.Chunk
2623
import fs2.Stream
2724
import org.http4s.headers.`Transfer-Encoding`
2825
import org.scalajs.dom.Blob
@@ -122,29 +119,73 @@ package object dom {
122119
}
123120

124121
private[dom] def toReadableStream[F[_]](in: Stream[F, Byte])(
125-
implicit F: Async[F]): Resource[F, ReadableStream[Uint8Array]] =
126-
Dispatcher.sequential.flatMap { dispatcher =>
127-
Resource.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks =>
128-
in.enqueueNoneTerminatedChunks(chunks).compile.drain.background.evalMap { _ =>
129-
F.delay {
130-
val source = new ReadableStreamUnderlyingSource[Uint8Array] {
131-
`type` = ReadableStreamType.bytes
132-
pull = js.defined { controller =>
133-
dispatcher.unsafeToPromise {
134-
chunks.take.flatMap {
135-
case Some(chunk) =>
136-
F.delay(controller.enqueue(chunk.toUint8Array))
137-
case None => F.delay(controller.close())
138-
}
122+
implicit F: Async[F]): Resource[F, ReadableStream[Uint8Array]] = {
123+
124+
final class Synchronizer[A] {
125+
126+
type TakeCallback = Either[Throwable, A] => Unit
127+
type OfferCallback = Either[Throwable, TakeCallback] => Unit
128+
129+
private[this] var callback: AnyRef = null
130+
@inline private[this] def offerCallback = callback.asInstanceOf[OfferCallback]
131+
@inline private[this] def takeCallback = callback.asInstanceOf[TakeCallback]
132+
133+
def offer(cb: OfferCallback): Unit =
134+
if (callback ne null) {
135+
cb(Right(takeCallback))
136+
callback = null
137+
} else {
138+
callback = cb
139+
}
140+
141+
def take(cb: TakeCallback): Unit =
142+
if (callback ne null) {
143+
offerCallback(Right(cb))
144+
callback = null
145+
} else {
146+
callback = cb
147+
}
148+
}
149+
150+
Resource.eval(F.delay(new Synchronizer[Option[Uint8Array]])).flatMap { synchronizer =>
151+
val offers = in
152+
.chunks
153+
.noneTerminate
154+
.foreach { chunk =>
155+
F.async[Either[Throwable, Option[Uint8Array]] => Unit] { cb =>
156+
F.delay(synchronizer.offer(cb)).as(Some(F.unit))
157+
}.flatMap(cb => F.delay(cb(Right(chunk.map(_.toUint8Array)))))
158+
}
159+
.compile
160+
.drain
161+
162+
offers.background.evalMap { _ =>
163+
F.delay {
164+
val source = new ReadableStreamUnderlyingSource[Uint8Array] {
165+
`type` = ReadableStreamType.bytes
166+
pull = js.defined { controller =>
167+
new js.Promise[Unit]({ (resolve, reject) =>
168+
synchronizer.take {
169+
case Right(Some(bytes)) =>
170+
controller.enqueue(bytes)
171+
resolve(())
172+
()
173+
case Right(None) =>
174+
controller.close()
175+
resolve(())
176+
()
177+
case Left(ex) =>
178+
reject(ex)
179+
()
139180
}
140-
}
181+
})
141182
}
142-
ReadableStream[Uint8Array](source)
143183
}
184+
ReadableStream[Uint8Array](source)
144185
}
145186
}
146187
}
147-
188+
}
148189
private[dom] lazy val supportsRequestStreams = {
149190
val request = new DomRequest(
150191
"data:a/a;charset=utf-8,",

tests/src/test/scala/org/http4s/ReadableStreamSuite.scala

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,37 @@ import fs2.Chunk
2121
import fs2.Stream
2222
import munit.CatsEffectSuite
2323
import munit.ScalaCheckEffectSuite
24+
import org.scalacheck.Test.Parameters
2425
import org.scalacheck.effect.PropF.forAllF
2526

27+
import scala.concurrent.duration._
28+
2629
class ReadableStreamSuite extends CatsEffectSuite with ScalaCheckEffectSuite {
2730

31+
override def scalaCheckTestParameters: Parameters =
32+
super.scalaCheckTestParameters.withMaxSize(20)
33+
2834
test("to/read ReadableStream") {
29-
forAllF { (chunks: Vector[Vector[Byte]]) =>
30-
Stream
31-
.emits(chunks)
32-
.map(Chunk.seq(_))
33-
.unchunks
34-
.through(in => Stream.resource(toReadableStream[IO](in)))
35-
.flatMap(readable => readReadableStream(IO(readable)))
36-
.compile
37-
.toVector
38-
.assertEquals(chunks.flatten)
35+
forAllF {
36+
(chunks: Vector[Vector[Byte]], offerSleeps: Vector[Int], takeSleeps: Vector[Int]) =>
37+
def snooze(sleeps: Vector[Int]): Stream[IO, Unit] =
38+
Stream
39+
.emits(sleeps)
40+
.ifEmpty(Stream.emit(0))
41+
.repeat
42+
.evalMap(d => IO.sleep((d & 3).millis))
43+
44+
Stream
45+
.emits(chunks)
46+
.map(Chunk.seq(_))
47+
.zipLeft(snooze(offerSleeps))
48+
.unchunks
49+
.through(in => Stream.resource(toReadableStream[IO](in)))
50+
.flatMap(readable => readReadableStream(IO(readable)))
51+
.zipLeft(snooze(takeSleeps))
52+
.compile
53+
.toVector
54+
.assertEquals(chunks.flatten)
3955
}
4056
}
4157

0 commit comments

Comments
 (0)