Skip to content

Commit

Permalink
Merge pull request #240 from http4s/pr/opt-in-streaming-requests
Browse files Browse the repository at this point in the history
Make streaming requests opt-in
  • Loading branch information
armanbilge authored Jan 22, 2023
2 parents a02f35e + d72491a commit a37e9d7
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 34 deletions.
28 changes: 18 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,24 @@ lazy val dom = project
),
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
if (tlIsScala3.value)
Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.closeReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.fromReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.toDomHeaders")
)
else Seq()
Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.FetchClientBuilder.this"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.http4s.dom.FetchOptions.*"),
ProblemFilters.exclude[Problem]("org.http4s.dom.FetchOptions#FetchOptionsImpl*"),
ProblemFilters.exclude[Problem]("org.http4s.dom.FetchOptions$FetchOptionsImpl*")
) ++ {
if (tlIsScala3.value)
Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.closeReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.fromReadableStream"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.http4s.dom.package.toDomHeaders")
)
else Seq()
}
}
)
.enablePlugins(ScalaJSPlugin)
Expand Down
8 changes: 4 additions & 4 deletions dom/src/main/scala/org/http4s/dom/FetchClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ private[dom] object FetchClient {
requestTimeout: Duration,
options: FetchOptions
)(implicit F: Async[F]): Client[F] = Client[F] { (req: Request[F]) =>
val requestOptions = req.attributes.lookup(FetchOptions.Key)
val mergedOptions = requestOptions.fold(options)(options.merge)

Resource.eval(F.fromPromise(F.delay(supportsRequestStreams))).flatMap {
supportsRequestStreams =>
val reqBody =
if (req.body eq EmptyBody)
Resource.pure[F, (Request[F], Option[BodyInit])]((req, None))
else if (supportsRequestStreams)
else if (supportsRequestStreams && mergedOptions.streamingRequests)
toReadableStream(req.body).map(Some[BodyInit](_)).tupleLeft(req)
else
Resource.eval {
Expand All @@ -68,9 +71,6 @@ private[dom] object FetchClient {
Resource
.makeCaseFull { (poll: Poll[F]) =>
F.delay(new AbortController()).flatMap { abortController =>
val requestOptions = req.attributes.lookup(FetchOptions.Key)
val mergedOptions = requestOptions.fold(options)(options.merge)

val init = new RequestInit {}

init.method = req.method.name.asInstanceOf[HttpMethod]
Expand Down
27 changes: 20 additions & 7 deletions dom/src/main/scala/org/http4s/dom/FetchClientBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ sealed abstract class FetchClientBuilder[F[_]] private (
val mode: Option[RequestMode],
val redirect: Option[RequestRedirect],
val referrer: Option[FetchReferrer],
val referrerPolicy: Option[ReferrerPolicy]
val referrerPolicy: Option[ReferrerPolicy],
val streamingRequests: Boolean
)(override implicit protected val F: Async[F])
extends BackendBuilder[F, Client[F]] {

Expand All @@ -67,7 +68,8 @@ sealed abstract class FetchClientBuilder[F[_]] private (
mode: Option[RequestMode] = mode,
redirect: Option[RequestRedirect] = redirect,
referrer: Option[FetchReferrer] = referrer,
referrerPolicy: Option[ReferrerPolicy] = referrerPolicy
referrerPolicy: Option[ReferrerPolicy] = referrerPolicy,
streamingRequests: Boolean = streamingRequests
): FetchClientBuilder[F] =
new FetchClientBuilder[F](
requestTimeout,
Expand All @@ -76,7 +78,8 @@ sealed abstract class FetchClientBuilder[F[_]] private (
mode,
redirect,
referrer,
referrerPolicy
referrerPolicy,
streamingRequests
) {}

def withRequestTimeout(requestTimeout: Duration): FetchClientBuilder[F] =
Expand Down Expand Up @@ -124,6 +127,11 @@ sealed abstract class FetchClientBuilder[F[_]] private (
def withDefaultReferrerPolicy: FetchClientBuilder[F] =
withReferrerPolicyOption(None)

def withStreamingRequests: FetchClientBuilder[F] =
copy(streamingRequests = true)
def withoutStreamingRequests: FetchClientBuilder[F] =
copy(streamingRequests = false)

/**
* Creates a `Client`.
*/
Expand All @@ -132,14 +140,18 @@ sealed abstract class FetchClientBuilder[F[_]] private (
FetchOptions(
cache = cache,
credentials = credentials,
integrity = None,
keepAlive = None,
mode = mode,
redirect = redirect,
referrer = referrer,
referrerPolicy = referrerPolicy
))
referrerPolicy = referrerPolicy,
streamingRequests = streamingRequests
)
)

override def resource: Resource[F, Client[F]] =
Resource.eval(F.delay(create))
Resource.pure(create)
}

object FetchClientBuilder {
Expand All @@ -155,6 +167,7 @@ object FetchClientBuilder {
mode = None,
redirect = None,
referrer = None,
referrerPolicy = None
referrerPolicy = None,
streamingRequests = false
) {}
}
60 changes: 47 additions & 13 deletions dom/src/main/scala/org/http4s/dom/FetchOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ sealed abstract class FetchOptions extends Product with Serializable {
def redirect: Option[RequestRedirect]
def referrer: Option[FetchReferrer]
def referrerPolicy: Option[ReferrerPolicy]
def streamingRequests: Boolean

def withCacheOption(cache: Option[RequestCache]): FetchOptions
final def withCache(cache: RequestCache): FetchOptions =
Expand Down Expand Up @@ -106,6 +107,9 @@ sealed abstract class FetchOptions extends Product with Serializable {
final def withoutReferrerPolicy: FetchOptions =
withReferrerPolicyOption(None)

def withStreamingRequests: FetchOptions
def withoutStreamingRequests: FetchOptions

/**
* Merge Two FetchOptions. `other` is prioritized.
*/
Expand All @@ -118,20 +122,22 @@ sealed abstract class FetchOptions extends Product with Serializable {
mode = other.mode.orElse(mode),
redirect = other.redirect.orElse(redirect),
referrer = other.referrer.orElse(referrer),
referrerPolicy = other.referrerPolicy.orElse(referrerPolicy)
referrerPolicy = other.referrerPolicy.orElse(referrerPolicy),
streamingRequests = other.streamingRequests || streamingRequests
)
}

object FetchOptions {
private[this] final case class FetchOptionsImpl(
override final val cache: Option[RequestCache],
override final val cache: Option[RequestCache] = None,
override final val credentials: Option[RequestCredentials] = None,
override final val integrity: Option[String] = None,
override final val keepAlive: Option[Boolean] = None,
override final val mode: Option[RequestMode] = None,
override final val redirect: Option[RequestRedirect] = None,
override final val referrer: Option[FetchReferrer] = None,
override final val referrerPolicy: Option[ReferrerPolicy] = None
override final val referrerPolicy: Option[ReferrerPolicy] = None,
override final val streamingRequests: Boolean = false
) extends FetchOptions {

override def withCacheOption(cache: Option[RequestCache]): FetchOptions =
Expand All @@ -157,8 +163,14 @@ object FetchOptions {
override def withReferrerPolicyOption(
referrerPolicy: Option[ReferrerPolicy]): FetchOptions =
copy(referrerPolicy = referrerPolicy)

override def withStreamingRequests: FetchOptions = copy(streamingRequests = true)
override def withoutStreamingRequests: FetchOptions = copy(streamingRequests = false)
}

def default: FetchOptions = FetchOptionsImpl()

@deprecated("0.2.6", "Use `default` with builder methods")
def apply(
cache: Option[RequestCache] = None,
credentials: Option[RequestCredentials] = None,
Expand All @@ -168,16 +180,38 @@ object FetchOptions {
redirect: Option[RequestRedirect] = None,
referrer: Option[FetchReferrer] = None,
referrerPolicy: Option[ReferrerPolicy] = None
): FetchOptions =
FetchOptionsImpl(
cache,
credentials,
integrity,
keepAlive,
mode,
redirect,
referrer,
referrerPolicy)
): FetchOptions = FetchOptionsImpl(
cache,
credentials,
integrity,
keepAlive,
mode,
redirect,
referrer,
referrerPolicy
)

private[dom] def apply(
cache: Option[RequestCache],
credentials: Option[RequestCredentials],
integrity: Option[String],
keepAlive: Option[Boolean],
mode: Option[RequestMode],
redirect: Option[RequestRedirect],
referrer: Option[FetchReferrer],
referrerPolicy: Option[ReferrerPolicy],
streamingRequests: Boolean
): FetchOptions = FetchOptionsImpl(
cache,
credentials,
integrity,
keepAlive,
mode,
redirect,
referrer,
referrerPolicy,
streamingRequests
)

val Key: vault.Key[FetchOptions] = vault.Key.newKey[SyncIO, FetchOptions].unsafeRunSync()
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class FetchServiceWorkerSuite extends CatsEffectSuite {
.assertEquals("This is chunked.")
}

test("POST a chunked body with streaming requests") {
FetchClientBuilder[IO]
.withStreamingRequests
.create
.expect[String](POST(Stream("This is chunked.").covary[IO], baseUrl / "echo"))
.assertEquals("This is chunked.")
}

test("POST a multipart body") {
Multiparts.forSync[IO].flatMap { multiparts =>
multiparts
Expand Down

0 comments on commit a37e9d7

Please sign in to comment.