Skip to content

Commit

Permalink
Merge pull request #236 from http4s/pr/i184
Browse files Browse the repository at this point in the history
Support streaming requests
  • Loading branch information
armanbilge authored Jan 18, 2023
2 parents 11ccbdd + 2dee843 commit c86ec9a
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 88 deletions.
18 changes: 16 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,20 @@ lazy val dom = project
"co.fs2" %%% "fs2-core" % fs2Version,
"org.http4s" %%% "http4s-client" % http4sVersion,
"org.scala-js" %%% "scalajs-dom" % scalaJSDomVersion
)
),
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
if (tlIsScala3.value)
Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.closeReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.fromReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.toDomHeaders")
)
else Seq()
}
)
.enablePlugins(ScalaJSPlugin)

Expand All @@ -146,7 +159,8 @@ def configureTest(project: Project): Project =
libraryDependencies ++= Seq(
"org.http4s" %%% "http4s-client-testkit" % http4sVersion,
"org.scalameta" %%% "munit" % munitVersion % Test,
"org.typelevel" %%% "munit-cats-effect" % munitCEVersion % Test
"org.typelevel" %%% "munit-cats-effect" % munitCEVersion % Test,
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test
),
Compile / unmanagedSourceDirectories +=
(LocalRootProject / baseDirectory).value / "tests" / "src" / "main" / "scala",
Expand Down
124 changes: 73 additions & 51 deletions dom/src/main/scala/org/http4s/dom/FetchClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import cats.syntax.all._
import org.http4s.client.Client
import org.http4s.headers.Referer
import org.scalajs.dom.AbortController
import org.scalajs.dom.BodyInit
import org.scalajs.dom.Fetch
import org.scalajs.dom.Headers
import org.scalajs.dom.HttpMethod
import org.scalajs.dom.RequestDuplex
import org.scalajs.dom.RequestInit
import org.scalajs.dom.{Response => FetchResponse}

Expand All @@ -41,64 +43,84 @@ private[dom] object FetchClient {
requestTimeout: Duration,
options: FetchOptions
)(implicit F: Async[F]): Client[F] = Client[F] { (req: Request[F]) =>
Resource
.eval(
(if (req.isChunked) req.toStrict(None) else req.pure)
.mproduct(_.body.chunkAll.filter(_.nonEmpty).compile.last)
)
.flatMap {
case (req, body) =>
Resource
.makeCaseFull { (poll: Poll[F]) =>
F.delay(new AbortController()).flatMap { abortController =>
val requestOptions = req.attributes.lookup(FetchOptions.Key)
val mergedOptions = requestOptions.fold(options)(options.merge)
Resource.eval(F.fromPromise(F.delay(supportsRequestStreams))).flatMap {
supportsRequestStreams =>
val reqBody =
if (req.body eq EmptyBody)
Resource.pure[F, (Request[F], Option[BodyInit])]((req, None))
else if (supportsRequestStreams)
toReadableStream(req.body).map(Some[BodyInit](_)).tupleLeft(req)
else
Resource.eval {
(if (req.isChunked) req.toStrict(None) else req.pure).mproduct { req =>
req
.body
.chunkAll
.filter(_.nonEmpty)
.map(c => c.toUint8Array: BodyInit)
.compile
.last
}
}

val init = new RequestInit {}
reqBody.flatMap {
case (req, body) =>
Resource
.makeCaseFull { (poll: Poll[F]) =>
F.delay(new AbortController()).flatMap { abortController =>
val requestOptions = req.attributes.lookup(FetchOptions.Key)
val mergedOptions = requestOptions.fold(options)(options.merge)

init.method = req.method.name.asInstanceOf[HttpMethod]
init.headers = new Headers(toDomHeaders(req.headers))
body.foreach { body => init.body = body.toJSArrayBuffer }
init.signal = abortController.signal
mergedOptions.cache.foreach(init.cache = _)
mergedOptions.credentials.foreach(init.credentials = _)
mergedOptions.integrity.foreach(init.integrity = _)
mergedOptions.keepAlive.foreach(init.keepalive = _)
mergedOptions.mode.foreach(init.mode = _)
mergedOptions.redirect.foreach(init.redirect = _)
// Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients.
// See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name
// If there's a Referer header, it will have more priority than the client's `referrer` (if present)
// but less priority than the request's `referrer` (if present).
requestOptions
.flatMap(_.referrer)
.orElse(req.headers.get[Referer].map(_.uri))
.orElse(options.referrer)
.foreach(referrer => init.referrer = referrer.renderString)
mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _)
val init = new RequestInit {}

val fetch =
poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init))))
.onCancel(F.delay(abortController.abort()))
init.method = req.method.name.asInstanceOf[HttpMethod]
init.headers = new Headers(toDomHeaders(req.headers, request = true))
body.foreach { body =>
init.body = body
if (supportsRequestStreams)
init.duplex = RequestDuplex.half
}
init.signal = abortController.signal
mergedOptions.cache.foreach(init.cache = _)
mergedOptions.credentials.foreach(init.credentials = _)
mergedOptions.integrity.foreach(init.integrity = _)
mergedOptions.keepAlive.foreach(init.keepalive = _)
mergedOptions.mode.foreach(init.mode = _)
mergedOptions.redirect.foreach(init.redirect = _)
// Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients.
// See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name
// If there's a Referer header, it will have more priority than the client's `referrer` (if present)
// but less priority than the request's `referrer` (if present).
requestOptions
.flatMap(_.referrer)
.orElse(req.headers.get[Referer].map(_.uri))
.orElse(options.referrer)
.foreach(referrer => init.referrer = referrer.renderString)
mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _)

requestTimeout match {
case d: FiniteDuration =>
fetch.timeoutTo(
d,
F.raiseError[FetchResponse](new TimeoutException(
s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms"))
)
case _ =>
fetch
val fetch =
poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init))))
.onCancel(F.delay(abortController.abort()))

requestTimeout match {
case d: FiniteDuration =>
fetch.timeoutTo(
d,
F.raiseError[FetchResponse](new TimeoutException(
s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms"))
)
case _ =>
fetch
}
}
} {
case (r, exitCase) =>
OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase))
}
} {
case (r, exitCase) =>
OptionT.fromOption(Option(r.body)).foreachF(closeReadableStream(_, exitCase))
}
.evalMap(fromDomResponse[F])
.evalMap(fromDomResponse[F])

}
}
}
}

}
2 changes: 1 addition & 1 deletion dom/src/main/scala/org/http4s/dom/ServiceWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object ServiceWorker {
new ResponseInit {
this.status = response.status.code
this.statusText = response.status.reason
this.headers = toDomHeaders(response.headers)
this.headers = toDomHeaders(response.headers, request = false)
}
)
}
Expand Down
130 changes: 96 additions & 34 deletions dom/src/main/scala/org/http4s/dom/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ package org.http4s

import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.Chunk
import fs2.Stream
import org.http4s.headers.`Transfer-Encoding`
import org.scalajs.dom.Blob
import org.scalajs.dom.Fetch
import org.scalajs.dom.File
import org.scalajs.dom.HttpMethod
import org.scalajs.dom.ReadableStream
import org.scalajs.dom.ReadableStreamType
import org.scalajs.dom.ReadableStreamUnderlyingSource
import org.scalajs.dom.RequestDuplex
import org.scalajs.dom.RequestInit
import org.scalajs.dom.{Headers => DomHeaders}
import org.scalajs.dom.{Request => DomRequest}
import org.scalajs.dom.{Response => DomResponse}

import scala.scalajs.js
import scala.scalajs.js.JSConverters._
import scala.scalajs.js.typedarray.Uint8Array

package object dom {
Expand All @@ -37,73 +48,124 @@ package object dom {

implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, Blob] =
EntityEncoder.entityBodyEncoder.contramap { blob =>
Stream
.bracketCase {
F.delay(blob.stream())
} { case (rs, exitCase) => closeReadableStream(rs, exitCase) }
.flatMap(fromReadableStream[F])
readReadableStream[F](F.delay(blob.stream()))
}

implicit def readableStreamEncoder[F[_]: Async]
: EntityEncoder[F, ReadableStream[Uint8Array]] =
EntityEncoder.entityBodyEncoder.contramap { rs => fromReadableStream(rs) }
EntityEncoder.entityBodyEncoder.contramap { rs => readReadableStream(rs.pure) }

private[dom] def fromDomResponse[F[_]](response: DomResponse)(
implicit F: Async[F]): F[Response[F]] =
F.fromEither(Status.fromInt(response.status)).map { status =>
Response[F](
status = status,
headers = fromDomHeaders(response.headers),
body = Stream.fromOption(Option(response.body)).flatMap(fromReadableStream[F])
body = Stream.fromOption(Option(response.body)).flatMap { rs =>
readReadableStream[F](rs.pure)
}
)
}

private[dom] def toDomHeaders(headers: Headers): DomHeaders =
new DomHeaders(
headers
.headers
.view
.map {
case Header.Raw(name, value) =>
name.toString -> value
}
.toMap
.toJSDictionary)
private[dom] def toDomHeaders(headers: Headers, request: Boolean): DomHeaders = {
val domHeaders = new DomHeaders()
headers.foreach {
case Header.Raw(name, value) =>
val skip = request && name == `Transfer-Encoding`.name
if (!skip) domHeaders.append(name.toString, value)
}
domHeaders
}

private[dom] def fromDomHeaders(headers: DomHeaders): Headers =
Headers(
headers.map { header => header(0) -> header(1) }.toList
)

private[dom] def fromReadableStream[F[_]](rs: ReadableStream[Uint8Array])(
implicit F: Async[F]): Stream[F, Byte] =
Stream.bracket(F.delay(rs.getReader()))(r => F.delay(r.releaseLock())).flatMap { reader =>
Stream.unfoldChunkEval(reader) { reader =>
F.fromPromise(F.delay(reader.read())).map { chunk =>
if (chunk.done)
None
else
Some((fs2.Chunk.uint8Array(chunk.value), reader))
private[dom] def readReadableStream[F[_]](
readableStream: F[ReadableStream[Uint8Array]]
)(implicit F: Async[F]): Stream[F, Byte] = {
def read(readableStream: ReadableStream[Uint8Array]) =
Stream
.bracket(F.delay(readableStream.getReader()))(r => F.delay(r.releaseLock()))
.flatMap { reader =>
Stream.unfoldChunkEval(reader) { reader =>
F.fromPromise(F.delay(reader.read())).map { chunk =>
if (chunk.done)
None
else
Some((fs2.Chunk.uint8Array(chunk.value), reader))
}
}
}
}
}

private[dom] def closeReadableStream[F[_], A](
Stream.bracketCase(readableStream)(cancelReadableStream(_, _)).flatMap(read(_))
}

private[dom] def cancelReadableStream[F[_], A](
rs: ReadableStream[A],
exitCase: Resource.ExitCase)(implicit F: Async[F]): F[Unit] = F.fromPromise {
exitCase: Resource.ExitCase
)(implicit F: Async[F]): F[Unit] = F.fromPromise {
F.delay {
// Best guess: Firefox internally locks a ReadableStream after it is "drained"
// This checks if the stream is locked before canceling it to avoid an error
if (!rs.locked) exitCase match {
case Resource.ExitCase.Succeeded =>
rs.cancel(js.undefined)
case Resource.ExitCase.Errored(ex) =>
rs.cancel(ex.getLocalizedMessage())
rs.cancel(ex.toString())
case Resource.ExitCase.Canceled =>
rs.cancel(js.undefined)
}
else js.Promise.resolve[Unit](())
}
}.void
}

private[dom] def toReadableStream[F[_]](in: Stream[F, Byte])(
implicit F: Async[F]): Resource[F, ReadableStream[Uint8Array]] =
Dispatcher.sequential.flatMap { dispatcher =>
Resource.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks =>
in.enqueueNoneTerminatedChunks(chunks).compile.drain.background.evalMap { _ =>
F.delay {
val source = new ReadableStreamUnderlyingSource[Uint8Array] {
`type` = ReadableStreamType.bytes
pull = js.defined { controller =>
dispatcher.unsafeToPromise {
chunks.take.flatMap {
case Some(chunk) =>
F.delay(controller.enqueue(chunk.toUint8Array))
case None => F.delay(controller.close())
}
}
}
}
ReadableStream[Uint8Array](source)
}
}
}
}

private[dom] lazy val supportsRequestStreams = {
val request = new DomRequest(
"data:a/a;charset=utf-8,",
new RequestInit {
body = ReadableStream()
method = HttpMethod.POST
duplex = RequestDuplex.half
}
)

val supportsStreamsInRequestObjects = !request.headers.has("Content-Type")

if (!supportsStreamsInRequestObjects)
js.Promise.resolve[Boolean](false)
else
Fetch
.fetch(request)
.`then`[Boolean](
_ => true,
(_ => false): js.Function1[Any, Boolean]
)
}

}
Loading

0 comments on commit c86ec9a

Please sign in to comment.