From fd7eee4218d60c7f2783f93a8a5a06d30f06b069 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 03:26:09 +0000 Subject: [PATCH 01/11] Update `ReadableStream` converters --- .../scala/org/http4s/dom/FetchClient.scala | 2 +- .../main/scala/org/http4s/dom/package.scala | 109 ++++++++++++------ 2 files changed, 75 insertions(+), 36 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/FetchClient.scala b/dom/src/main/scala/org/http4s/dom/FetchClient.scala index c410e010..0c8e8638 100644 --- a/dom/src/main/scala/org/http4s/dom/FetchClient.scala +++ b/dom/src/main/scala/org/http4s/dom/FetchClient.scala @@ -94,7 +94,7 @@ private[dom] object FetchClient { } } { case (r, exitCase) => - OptionT.fromOption(Option(r.body)).foreachF(closeReadableStream(_, exitCase)) + OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase)) } .evalMap(fromDomResponse[F]) diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index 832517f7..a0f5cf53 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -18,13 +18,13 @@ package org.http4s import cats.effect.kernel.Async import cats.effect.kernel.Resource +import cats.effect.std.Dispatcher +import cats.effect.std.Queue import cats.syntax.all._ +import fs2.Chunk +import fs2.Pipe import fs2.Stream -import org.scalajs.dom.Blob -import org.scalajs.dom.File -import org.scalajs.dom.ReadableStream -import org.scalajs.dom.{Headers => DomHeaders} -import org.scalajs.dom.{Response => DomResponse} +import org.scalajs.dom import scala.scalajs.js import scala.scalajs.js.JSConverters._ @@ -32,34 +32,34 @@ import scala.scalajs.js.typedarray.Uint8Array package object dom { - implicit def fileEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, File] = + implicit def fileEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, dom.File] = blobEncoder.narrow - implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, Blob] = + implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, dom.Blob] = EntityEncoder.entityBodyEncoder.contramap { blob => - Stream - .bracketCase { - F.delay(blob.stream()) - } { case (rs, exitCase) => closeReadableStream(rs, exitCase) } - .flatMap(fromReadableStream[F]) + readReadableStream[F](F.delay(blob.stream()), cancelAfterUse = true) } implicit def readableStreamEncoder[F[_]: Async] - : EntityEncoder[F, ReadableStream[Uint8Array]] = - EntityEncoder.entityBodyEncoder.contramap { rs => fromReadableStream(rs) } + : EntityEncoder[F, dom.ReadableStream[Uint8Array]] = + EntityEncoder.entityBodyEncoder.contramap { rs => + readReadableStream(rs.pure, cancelAfterUse = true) + } - private[dom] def fromDomResponse[F[_]](response: DomResponse)( + private[dom] def fromDomResponse[F[_]](response: dom.Response)( implicit F: Async[F]): F[Response[F]] = F.fromEither(Status.fromInt(response.status)).map { status => Response[F]( status = status, headers = fromDomHeaders(response.headers), - body = Stream.fromOption(Option(response.body)).flatMap(fromReadableStream[F]) + body = Stream.fromOption(Option(response.body)).flatMap { rs => + readReadableStream[F](rs.pure, cancelAfterUse = true) + } ) } - private[dom] def toDomHeaders(headers: Headers): DomHeaders = - new DomHeaders( + private[dom] def toDomHeaders(headers: Headers): dom.Headers = + new dom.Headers( headers .headers .view @@ -70,27 +70,39 @@ package object dom { .toMap .toJSDictionary) - private[dom] def fromDomHeaders(headers: DomHeaders): Headers = + private[dom] def fromDomHeaders(headers: dom.Headers): Headers = Headers( headers.map { header => header(0) -> header(1) }.toList ) - private[dom] def fromReadableStream[F[_]](rs: ReadableStream[Uint8Array])( - implicit F: Async[F]): Stream[F, Byte] = - Stream.bracket(F.delay(rs.getReader()))(r => F.delay(r.releaseLock())).flatMap { reader => - Stream.unfoldChunkEval(reader) { reader => - F.fromPromise(F.delay(reader.read())).map { chunk => - if (chunk.done) - None - else - Some((fs2.Chunk.uint8Array(chunk.value), reader)) + private def readReadableStream[F[_]]( + readableStream: F[dom.ReadableStream[Uint8Array]], + cancelAfterUse: Boolean + )(implicit F: Async[F]): Stream[F, Byte] = { + def read(readableStream: dom.ReadableStream[Uint8Array]) = + Stream + .bracket(F.delay(readableStream.getReader()))(r => F.delay(r.releaseLock())) + .flatMap { reader => + Stream.unfoldChunkEval(reader) { reader => + F.fromPromise(F.delay(reader.read())).map { chunk => + if (chunk.done) + None + else + Some((fs2.Chunk.uint8Array(chunk.value), reader)) + } + } } - } - } - private[dom] def closeReadableStream[F[_], A]( - rs: ReadableStream[A], - exitCase: Resource.ExitCase)(implicit F: Async[F]): F[Unit] = F.fromPromise { + if (cancelAfterUse) + Stream.bracketCase(readableStream)(cancelReadableStream(_, _)).flatMap(read(_)) + else + Stream.eval(readableStream).flatMap(read(_)) + } + + private[dom] def cancelReadableStream[F[_], A]( + rs: dom.ReadableStream[A], + exitCase: Resource.ExitCase + )(implicit F: Async[F]): F[Unit] = F.fromPromise { F.delay { // Best guess: Firefox internally locks a ReadableStream after it is "drained" // This checks if the stream is locked before canceling it to avoid an error @@ -98,12 +110,39 @@ package object dom { case Resource.ExitCase.Succeeded => rs.cancel(js.undefined) case Resource.ExitCase.Errored(ex) => - rs.cancel(ex.getLocalizedMessage()) + rs.cancel(ex.toString()) case Resource.ExitCase.Canceled => rs.cancel(js.undefined) } else js.Promise.resolve[Unit](()) } - }.void + } + + private def toReadableStream[F[_]]( + implicit F: Async[F]): Pipe[F, Byte, dom.ReadableStream[Uint8Array]] = + (in: Stream[F, Byte]) => + Stream.resource(Dispatcher.sequential).flatMap { dispatcher => + Stream.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks => + Stream + .eval { + F.delay { + val source = new dom.ReadableStreamUnderlyingSource[Uint8Array] { + `type` = dom.ReadableStreamType.bytes + pull = js.defined { controller => + dispatcher.unsafeToPromise { + chunks.take.flatMap { + case Some(chunk) => + F.delay(controller.enqueue(chunk.toUint8Array)) + case None => F.delay(controller.close()) + } + } + } + } + dom.ReadableStream[Uint8Array](source) + } + } + .concurrently(in.enqueueNoneTerminatedChunks(chunks)) + } + } } From 47b173563469d71d6b437407c3f09807b34e8e92 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 04:12:00 +0000 Subject: [PATCH 02/11] Add feature test for request streams --- .../main/scala/org/http4s/dom/package.scala | 21 +++++++++++++++ .../org/http4s/dom/RequestStreamsSuite.scala | 27 +++++++++++++++++++ .../org/http4s/dom/RequestStreamsSuite.scala | 27 +++++++++++++++++++ .../org/http4s/dom/RequestStreamsSuite.scala | 27 +++++++++++++++++++ 4 files changed, 102 insertions(+) create mode 100644 testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala create mode 100644 testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala create mode 100644 testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index a0f5cf53..a19ea688 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -29,6 +29,7 @@ import org.scalajs.dom import scala.scalajs.js import scala.scalajs.js.JSConverters._ import scala.scalajs.js.typedarray.Uint8Array +import scala.scalajs.js.annotation.JSExport package object dom { @@ -145,4 +146,24 @@ package object dom { } } + private[dom] lazy val supportsRequestStreams = { + var duplexAccessed = false + val hasContentType = new dom.Request( + "http://http4s.org/", + new AnyRef { + @JSExport + val body = dom.ReadableStream() + @JSExport + val method = dom.HttpMethod.POST + @JSExport + def duplex = { + duplexAccessed = true + dom.RequestDuplex.half + } + }.asInstanceOf[dom.RequestInit] + ).headers.has("Content-Type") + + duplexAccessed && !hasContentType + } + } diff --git a/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala new file mode 100644 index 00000000..43450c53 --- /dev/null +++ b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2021 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.dom + +import munit.FunSuite + +class RequestStreamsSuite extends FunSuite { + + test("chrome supports request streams") { + assert(supportsRequestStreams) + } + +} diff --git a/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala new file mode 100644 index 00000000..b6146251 --- /dev/null +++ b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2021 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.dom + +import munit.FunSuite + +class RequestStreamsSuite extends FunSuite { + + test("firefox does not support request streams") { + assert(!supportsRequestStreams) + } + +} diff --git a/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala new file mode 100644 index 00000000..f910e5e2 --- /dev/null +++ b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2021 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.dom + +import munit.FunSuite + +class RequestStreamsSuite extends FunSuite { + + test("node.js supports request streams") { + assert(supportsRequestStreams) + } + +} From 90124d3841c020e145f4575eb70b482aa16dc00d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 04:51:03 +0000 Subject: [PATCH 03/11] Support streaming requests --- .../scala/org/http4s/dom/FetchClient.scala | 115 ++++++++++-------- .../main/scala/org/http4s/dom/package.scala | 39 +++--- 2 files changed, 82 insertions(+), 72 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/FetchClient.scala b/dom/src/main/scala/org/http4s/dom/FetchClient.scala index 0c8e8638..f1d7d6b3 100644 --- a/dom/src/main/scala/org/http4s/dom/FetchClient.scala +++ b/dom/src/main/scala/org/http4s/dom/FetchClient.scala @@ -26,9 +26,11 @@ import cats.syntax.all._ import org.http4s.client.Client import org.http4s.headers.Referer import org.scalajs.dom.AbortController +import org.scalajs.dom.BodyInit import org.scalajs.dom.Fetch import org.scalajs.dom.Headers import org.scalajs.dom.HttpMethod +import org.scalajs.dom.RequestDuplex import org.scalajs.dom.RequestInit import org.scalajs.dom.{Response => FetchResponse} @@ -41,64 +43,75 @@ private[dom] object FetchClient { requestTimeout: Duration, options: FetchOptions )(implicit F: Async[F]): Client[F] = Client[F] { (req: Request[F]) => - Resource - .eval( - (if (req.isChunked) req.toStrict(None) else req.pure) - .mproduct(_.body.chunkAll.filter(_.nonEmpty).compile.last) - ) - .flatMap { - case (req, body) => - Resource - .makeCaseFull { (poll: Poll[F]) => - F.delay(new AbortController()).flatMap { abortController => - val requestOptions = req.attributes.lookup(FetchOptions.Key) - val mergedOptions = requestOptions.fold(options)(options.merge) + val reqBody = + if (req.body eq EmptyBody) + Resource.pure[F, (Request[F], Option[BodyInit])]((req, None)) + else if (supportsRequestStreams) + toReadableStream(req.body).map(Some[BodyInit](_)).tupleLeft(req) + else + Resource.eval { + (if (req.isChunked) req.toStrict(None) else req.pure).mproduct { req => + req.body.chunkAll.filter(_.nonEmpty).map(c => c.toUint8Array: BodyInit).compile.last + } + } - val init = new RequestInit {} + reqBody.flatMap { + case (req, body) => + Resource + .makeCaseFull { (poll: Poll[F]) => + F.delay(new AbortController()).flatMap { abortController => + val requestOptions = req.attributes.lookup(FetchOptions.Key) + val mergedOptions = requestOptions.fold(options)(options.merge) - init.method = req.method.name.asInstanceOf[HttpMethod] - init.headers = new Headers(toDomHeaders(req.headers)) - body.foreach { body => init.body = body.toJSArrayBuffer } - init.signal = abortController.signal - mergedOptions.cache.foreach(init.cache = _) - mergedOptions.credentials.foreach(init.credentials = _) - mergedOptions.integrity.foreach(init.integrity = _) - mergedOptions.keepAlive.foreach(init.keepalive = _) - mergedOptions.mode.foreach(init.mode = _) - mergedOptions.redirect.foreach(init.redirect = _) - // Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients. - // See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name - // If there's a Referer header, it will have more priority than the client's `referrer` (if present) - // but less priority than the request's `referrer` (if present). - requestOptions - .flatMap(_.referrer) - .orElse(req.headers.get[Referer].map(_.uri)) - .orElse(options.referrer) - .foreach(referrer => init.referrer = referrer.renderString) - mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _) + val init = new RequestInit {} - val fetch = - poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init)))) - .onCancel(F.delay(abortController.abort())) + init.method = req.method.name.asInstanceOf[HttpMethod] + init.headers = new Headers(toDomHeaders(req.headers)) + body.foreach { body => + init.body = body + if (supportsRequestStreams) + init.duplex = RequestDuplex.half + } + init.signal = abortController.signal + mergedOptions.cache.foreach(init.cache = _) + mergedOptions.credentials.foreach(init.credentials = _) + mergedOptions.integrity.foreach(init.integrity = _) + mergedOptions.keepAlive.foreach(init.keepalive = _) + mergedOptions.mode.foreach(init.mode = _) + mergedOptions.redirect.foreach(init.redirect = _) + // Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients. + // See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name + // If there's a Referer header, it will have more priority than the client's `referrer` (if present) + // but less priority than the request's `referrer` (if present). + requestOptions + .flatMap(_.referrer) + .orElse(req.headers.get[Referer].map(_.uri)) + .orElse(options.referrer) + .foreach(referrer => init.referrer = referrer.renderString) + mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _) + + val fetch = + poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init)))) + .onCancel(F.delay(abortController.abort())) - requestTimeout match { - case d: FiniteDuration => - fetch.timeoutTo( - d, - F.raiseError[FetchResponse](new TimeoutException( - s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms")) - ) - case _ => - fetch - } + requestTimeout match { + case d: FiniteDuration => + fetch.timeoutTo( + d, + F.raiseError[FetchResponse](new TimeoutException( + s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms")) + ) + case _ => + fetch } - } { - case (r, exitCase) => - OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase)) } - .evalMap(fromDomResponse[F]) + } { + case (r, exitCase) => + OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase)) + } + .evalMap(fromDomResponse[F]) - } + } } } diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index a19ea688..89f96a02 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -20,9 +20,9 @@ import cats.effect.kernel.Async import cats.effect.kernel.Resource import cats.effect.std.Dispatcher import cats.effect.std.Queue +import cats.effect.syntax.all._ import cats.syntax.all._ import fs2.Chunk -import fs2.Pipe import fs2.Stream import org.scalajs.dom @@ -119,32 +119,29 @@ package object dom { } } - private def toReadableStream[F[_]]( - implicit F: Async[F]): Pipe[F, Byte, dom.ReadableStream[Uint8Array]] = - (in: Stream[F, Byte]) => - Stream.resource(Dispatcher.sequential).flatMap { dispatcher => - Stream.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks => - Stream - .eval { - F.delay { - val source = new dom.ReadableStreamUnderlyingSource[Uint8Array] { - `type` = dom.ReadableStreamType.bytes - pull = js.defined { controller => - dispatcher.unsafeToPromise { - chunks.take.flatMap { - case Some(chunk) => - F.delay(controller.enqueue(chunk.toUint8Array)) - case None => F.delay(controller.close()) - } - } + private[dom] def toReadableStream[F[_]](in: Stream[F, Byte])( + implicit F: Async[F]): Resource[F, dom.ReadableStream[Uint8Array]] = + Dispatcher.sequential.flatMap { dispatcher => + Resource.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks => + in.enqueueNoneTerminatedChunks(chunks).compile.drain.background.evalMap { _ => + F.delay { + val source = new dom.ReadableStreamUnderlyingSource[Uint8Array] { + `type` = dom.ReadableStreamType.bytes + pull = js.defined { controller => + dispatcher.unsafeToPromise { + chunks.take.flatMap { + case Some(chunk) => + F.delay(controller.enqueue(chunk.toUint8Array)) + case None => F.delay(controller.close()) } } - dom.ReadableStream[Uint8Array](source) } } - .concurrently(in.enqueueNoneTerminatedChunks(chunks)) + dom.ReadableStream[Uint8Array](source) + } } } + } private[dom] lazy val supportsRequestStreams = { var duplexAccessed = false From 3589f468fc4ffcc628ce040a9ae24fd8cd71d4e3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 04:54:55 +0000 Subject: [PATCH 04/11] Formatting --- .../src/test/scala/org/http4s/dom/RequestStreamsSuite.scala | 2 +- .../src/test/scala/org/http4s/dom/RequestStreamsSuite.scala | 2 +- .../src/test/scala/org/http4s/dom/RequestStreamsSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index 43450c53..1bdc389c 100644 --- a/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -19,7 +19,7 @@ package org.http4s.dom import munit.FunSuite class RequestStreamsSuite extends FunSuite { - + test("chrome supports request streams") { assert(supportsRequestStreams) } diff --git a/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index b6146251..92b905e9 100644 --- a/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -19,7 +19,7 @@ package org.http4s.dom import munit.FunSuite class RequestStreamsSuite extends FunSuite { - + test("firefox does not support request streams") { assert(!supportsRequestStreams) } diff --git a/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index f910e5e2..5a4867ff 100644 --- a/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -19,7 +19,7 @@ package org.http4s.dom import munit.FunSuite class RequestStreamsSuite extends FunSuite { - + test("node.js supports request streams") { assert(supportsRequestStreams) } From 89c9f42c9251373d5d4e8cf5e9bb02fc3769b231 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 05:00:38 +0000 Subject: [PATCH 05/11] Organize imports --- dom/src/main/scala/org/http4s/dom/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index 89f96a02..995a0ab7 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -28,8 +28,8 @@ import org.scalajs.dom import scala.scalajs.js import scala.scalajs.js.JSConverters._ -import scala.scalajs.js.typedarray.Uint8Array import scala.scalajs.js.annotation.JSExport +import scala.scalajs.js.typedarray.Uint8Array package object dom { From 12fdd793ae20add6eb3e38d36cb6eafe0e0d6822 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 07:21:09 +0000 Subject: [PATCH 06/11] Implement async feature test --- .../scala/org/http4s/dom/FetchClient.scala | 131 ++++++++++-------- .../main/scala/org/http4s/dom/package.scala | 38 ++--- .../org/http4s/dom/RequestStreamsSuite.scala | 7 +- .../org/http4s/dom/RequestStreamsSuite.scala | 7 +- .../org/http4s/dom/RequestStreamsSuite.scala | 7 +- 5 files changed, 103 insertions(+), 87 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/FetchClient.scala b/dom/src/main/scala/org/http4s/dom/FetchClient.scala index f1d7d6b3..80481207 100644 --- a/dom/src/main/scala/org/http4s/dom/FetchClient.scala +++ b/dom/src/main/scala/org/http4s/dom/FetchClient.scala @@ -43,74 +43,83 @@ private[dom] object FetchClient { requestTimeout: Duration, options: FetchOptions )(implicit F: Async[F]): Client[F] = Client[F] { (req: Request[F]) => - val reqBody = - if (req.body eq EmptyBody) - Resource.pure[F, (Request[F], Option[BodyInit])]((req, None)) - else if (supportsRequestStreams) - toReadableStream(req.body).map(Some[BodyInit](_)).tupleLeft(req) - else - Resource.eval { - (if (req.isChunked) req.toStrict(None) else req.pure).mproduct { req => - req.body.chunkAll.filter(_.nonEmpty).map(c => c.toUint8Array: BodyInit).compile.last - } - } + Resource.eval(F.fromPromise(F.delay(supportsRequestStreams))).flatMap { + supportsRequestStreams => + val reqBody = + if (req.body eq EmptyBody) + Resource.pure[F, (Request[F], Option[BodyInit])]((req, None)) + else if (supportsRequestStreams) + toReadableStream(req.body).map(Some[BodyInit](_)).tupleLeft(req) + else + Resource.eval { + (if (req.isChunked) req.toStrict(None) else req.pure).mproduct { req => + req + .body + .chunkAll + .filter(_.nonEmpty) + .map(c => c.toUint8Array: BodyInit) + .compile + .last + } + } - reqBody.flatMap { - case (req, body) => - Resource - .makeCaseFull { (poll: Poll[F]) => - F.delay(new AbortController()).flatMap { abortController => - val requestOptions = req.attributes.lookup(FetchOptions.Key) - val mergedOptions = requestOptions.fold(options)(options.merge) + reqBody.flatMap { + case (req, body) => + Resource + .makeCaseFull { (poll: Poll[F]) => + F.delay(new AbortController()).flatMap { abortController => + val requestOptions = req.attributes.lookup(FetchOptions.Key) + val mergedOptions = requestOptions.fold(options)(options.merge) - val init = new RequestInit {} + val init = new RequestInit {} - init.method = req.method.name.asInstanceOf[HttpMethod] - init.headers = new Headers(toDomHeaders(req.headers)) - body.foreach { body => - init.body = body - if (supportsRequestStreams) - init.duplex = RequestDuplex.half - } - init.signal = abortController.signal - mergedOptions.cache.foreach(init.cache = _) - mergedOptions.credentials.foreach(init.credentials = _) - mergedOptions.integrity.foreach(init.integrity = _) - mergedOptions.keepAlive.foreach(init.keepalive = _) - mergedOptions.mode.foreach(init.mode = _) - mergedOptions.redirect.foreach(init.redirect = _) - // Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients. - // See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name - // If there's a Referer header, it will have more priority than the client's `referrer` (if present) - // but less priority than the request's `referrer` (if present). - requestOptions - .flatMap(_.referrer) - .orElse(req.headers.get[Referer].map(_.uri)) - .orElse(options.referrer) - .foreach(referrer => init.referrer = referrer.renderString) - mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _) + init.method = req.method.name.asInstanceOf[HttpMethod] + init.headers = new Headers(toDomHeaders(req.headers)) + body.foreach { body => + init.body = body + if (supportsRequestStreams) + init.duplex = RequestDuplex.half + } + init.signal = abortController.signal + mergedOptions.cache.foreach(init.cache = _) + mergedOptions.credentials.foreach(init.credentials = _) + mergedOptions.integrity.foreach(init.integrity = _) + mergedOptions.keepAlive.foreach(init.keepalive = _) + mergedOptions.mode.foreach(init.mode = _) + mergedOptions.redirect.foreach(init.redirect = _) + // Referer headers are forbidden in Fetch, but we make a best effort to preserve behavior across clients. + // See https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name + // If there's a Referer header, it will have more priority than the client's `referrer` (if present) + // but less priority than the request's `referrer` (if present). + requestOptions + .flatMap(_.referrer) + .orElse(req.headers.get[Referer].map(_.uri)) + .orElse(options.referrer) + .foreach(referrer => init.referrer = referrer.renderString) + mergedOptions.referrerPolicy.foreach(init.referrerPolicy = _) - val fetch = - poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init)))) - .onCancel(F.delay(abortController.abort())) + val fetch = + poll(F.fromPromise(F.delay(Fetch.fetch(req.uri.renderString, init)))) + .onCancel(F.delay(abortController.abort())) - requestTimeout match { - case d: FiniteDuration => - fetch.timeoutTo( - d, - F.raiseError[FetchResponse](new TimeoutException( - s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms")) - ) - case _ => - fetch + requestTimeout match { + case d: FiniteDuration => + fetch.timeoutTo( + d, + F.raiseError[FetchResponse](new TimeoutException( + s"Request to ${req.uri.renderString} timed out after ${d.toMillis} ms")) + ) + case _ => + fetch + } + } + } { + case (r, exitCase) => + OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase)) } - } - } { - case (r, exitCase) => - OptionT.fromOption(Option(r.body)).foreachF(cancelReadableStream(_, exitCase)) - } - .evalMap(fromDomResponse[F]) + .evalMap(fromDomResponse[F]) + } } } diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index 995a0ab7..0d0f6fd5 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -28,7 +28,6 @@ import org.scalajs.dom import scala.scalajs.js import scala.scalajs.js.JSConverters._ -import scala.scalajs.js.annotation.JSExport import scala.scalajs.js.typedarray.Uint8Array package object dom { @@ -144,23 +143,28 @@ package object dom { } private[dom] lazy val supportsRequestStreams = { - var duplexAccessed = false - val hasContentType = new dom.Request( - "http://http4s.org/", - new AnyRef { - @JSExport - val body = dom.ReadableStream() - @JSExport - val method = dom.HttpMethod.POST - @JSExport - def duplex = { - duplexAccessed = true - dom.RequestDuplex.half - } - }.asInstanceOf[dom.RequestInit] - ).headers.has("Content-Type") - duplexAccessed && !hasContentType + val request = new dom.Request( + "data:a/a;charset=utf-8,", + new dom.RequestInit { + body = dom.ReadableStream() + method = dom.HttpMethod.POST + duplex = dom.RequestDuplex.half + } + ) + + val supportsStreamsInRequestObjects = !request.headers.has("Content-Type") + + if (!supportsStreamsInRequestObjects) + js.Promise.resolve[Boolean](false) + else + dom + .Fetch + .fetch(request) + .`then`[Boolean]( + _ => true, + (_ => false): js.Function1[Any, Boolean] + ) } } diff --git a/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index 1bdc389c..c344fe3d 100644 --- a/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsChrome/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -16,12 +16,13 @@ package org.http4s.dom -import munit.FunSuite +import cats.effect.IO +import munit.CatsEffectSuite -class RequestStreamsSuite extends FunSuite { +class RequestStreamsSuite extends CatsEffectSuite { test("chrome supports request streams") { - assert(supportsRequestStreams) + IO.fromPromise(IO(supportsRequestStreams)).assert } } diff --git a/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index 92b905e9..907a78f4 100644 --- a/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsFirefox/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -16,12 +16,13 @@ package org.http4s.dom -import munit.FunSuite +import cats.effect.IO +import munit.CatsEffectSuite -class RequestStreamsSuite extends FunSuite { +class RequestStreamsSuite extends CatsEffectSuite { test("firefox does not support request streams") { - assert(!supportsRequestStreams) + IO.fromPromise(IO(supportsRequestStreams)).map(!_).assert } } diff --git a/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala index 5a4867ff..0896631d 100644 --- a/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala +++ b/testsNodeJS/src/test/scala/org/http4s/dom/RequestStreamsSuite.scala @@ -16,12 +16,13 @@ package org.http4s.dom -import munit.FunSuite +import cats.effect.IO +import munit.CatsEffectSuite -class RequestStreamsSuite extends FunSuite { +class RequestStreamsSuite extends CatsEffectSuite { test("node.js supports request streams") { - assert(supportsRequestStreams) + IO.fromPromise(IO(supportsRequestStreams)).assert } } From e15eb86a76aec1efac53f4af7bb75ef0cf0f3dbf Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 08:28:04 +0000 Subject: [PATCH 07/11] Drop `Transfer-Encoding` header --- .../main/scala/org/http4s/dom/package.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index 0d0f6fd5..faa7e196 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -24,10 +24,10 @@ import cats.effect.syntax.all._ import cats.syntax.all._ import fs2.Chunk import fs2.Stream +import org.http4s.headers.`Transfer-Encoding` import org.scalajs.dom import scala.scalajs.js -import scala.scalajs.js.JSConverters._ import scala.scalajs.js.typedarray.Uint8Array package object dom { @@ -58,17 +58,15 @@ package object dom { ) } - private[dom] def toDomHeaders(headers: Headers): dom.Headers = - new dom.Headers( - headers - .headers - .view - .map { - case Header.Raw(name, value) => - name.toString -> value - } - .toMap - .toJSDictionary) + private[dom] def toDomHeaders(headers: Headers): dom.Headers = { + val domHeaders = new dom.Headers() + headers.foreach { + case Header.Raw(name, value) => + if (name != `Transfer-Encoding`.name) + domHeaders.append(name.toString, value) + } + domHeaders + } private[dom] def fromDomHeaders(headers: dom.Headers): Headers = Headers( From a5c0795f77a058fe6db81281b2050d44100e393d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 08:33:23 +0000 Subject: [PATCH 08/11] Fix imports --- .../main/scala/org/http4s/dom/package.scala | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index faa7e196..545ece30 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -25,28 +25,39 @@ import cats.syntax.all._ import fs2.Chunk import fs2.Stream import org.http4s.headers.`Transfer-Encoding` -import org.scalajs.dom +import org.scalajs.dom.Blob +import org.scalajs.dom.Fetch +import org.scalajs.dom.File +import org.scalajs.dom.HttpMethod +import org.scalajs.dom.ReadableStream +import org.scalajs.dom.ReadableStreamType +import org.scalajs.dom.ReadableStreamUnderlyingSource +import org.scalajs.dom.RequestDuplex +import org.scalajs.dom.RequestInit +import org.scalajs.dom.{Headers => DomHeaders} +import org.scalajs.dom.{Request => DomRequest} +import org.scalajs.dom.{Response => DomResponse} import scala.scalajs.js import scala.scalajs.js.typedarray.Uint8Array package object dom { - implicit def fileEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, dom.File] = + implicit def fileEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, File] = blobEncoder.narrow - implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, dom.Blob] = + implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, Blob] = EntityEncoder.entityBodyEncoder.contramap { blob => readReadableStream[F](F.delay(blob.stream()), cancelAfterUse = true) } implicit def readableStreamEncoder[F[_]: Async] - : EntityEncoder[F, dom.ReadableStream[Uint8Array]] = + : EntityEncoder[F, ReadableStream[Uint8Array]] = EntityEncoder.entityBodyEncoder.contramap { rs => readReadableStream(rs.pure, cancelAfterUse = true) } - private[dom] def fromDomResponse[F[_]](response: dom.Response)( + private[dom] def fromDomResponse[F[_]](response: DomResponse)( implicit F: Async[F]): F[Response[F]] = F.fromEither(Status.fromInt(response.status)).map { status => Response[F]( @@ -58,8 +69,8 @@ package object dom { ) } - private[dom] def toDomHeaders(headers: Headers): dom.Headers = { - val domHeaders = new dom.Headers() + private[dom] def toDomHeaders(headers: Headers): DomHeaders = { + val domHeaders = new DomHeaders() headers.foreach { case Header.Raw(name, value) => if (name != `Transfer-Encoding`.name) @@ -68,16 +79,16 @@ package object dom { domHeaders } - private[dom] def fromDomHeaders(headers: dom.Headers): Headers = + private[dom] def fromDomHeaders(headers: DomHeaders): Headers = Headers( headers.map { header => header(0) -> header(1) }.toList ) private def readReadableStream[F[_]]( - readableStream: F[dom.ReadableStream[Uint8Array]], + readableStream: F[ReadableStream[Uint8Array]], cancelAfterUse: Boolean )(implicit F: Async[F]): Stream[F, Byte] = { - def read(readableStream: dom.ReadableStream[Uint8Array]) = + def read(readableStream: ReadableStream[Uint8Array]) = Stream .bracket(F.delay(readableStream.getReader()))(r => F.delay(r.releaseLock())) .flatMap { reader => @@ -98,7 +109,7 @@ package object dom { } private[dom] def cancelReadableStream[F[_], A]( - rs: dom.ReadableStream[A], + rs: ReadableStream[A], exitCase: Resource.ExitCase )(implicit F: Async[F]): F[Unit] = F.fromPromise { F.delay { @@ -117,13 +128,13 @@ package object dom { } private[dom] def toReadableStream[F[_]](in: Stream[F, Byte])( - implicit F: Async[F]): Resource[F, dom.ReadableStream[Uint8Array]] = + implicit F: Async[F]): Resource[F, ReadableStream[Uint8Array]] = Dispatcher.sequential.flatMap { dispatcher => Resource.eval(Queue.synchronous[F, Option[Chunk[Byte]]]).flatMap { chunks => in.enqueueNoneTerminatedChunks(chunks).compile.drain.background.evalMap { _ => F.delay { - val source = new dom.ReadableStreamUnderlyingSource[Uint8Array] { - `type` = dom.ReadableStreamType.bytes + val source = new ReadableStreamUnderlyingSource[Uint8Array] { + `type` = ReadableStreamType.bytes pull = js.defined { controller => dispatcher.unsafeToPromise { chunks.take.flatMap { @@ -134,20 +145,19 @@ package object dom { } } } - dom.ReadableStream[Uint8Array](source) + ReadableStream[Uint8Array](source) } } } } private[dom] lazy val supportsRequestStreams = { - - val request = new dom.Request( + val request = new DomRequest( "data:a/a;charset=utf-8,", - new dom.RequestInit { - body = dom.ReadableStream() - method = dom.HttpMethod.POST - duplex = dom.RequestDuplex.half + new RequestInit { + body = ReadableStream() + method = HttpMethod.POST + duplex = RequestDuplex.half } ) @@ -156,8 +166,7 @@ package object dom { if (!supportsStreamsInRequestObjects) js.Promise.resolve[Boolean](false) else - dom - .Fetch + Fetch .fetch(request) .`then`[Boolean]( _ => true, From 9718659e30f8252decadbf2d412465330b8dbf92 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 08:49:01 +0000 Subject: [PATCH 09/11] Treat request/response headers differently --- dom/src/main/scala/org/http4s/dom/FetchClient.scala | 2 +- dom/src/main/scala/org/http4s/dom/ServiceWorker.scala | 2 +- dom/src/main/scala/org/http4s/dom/package.scala | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/FetchClient.scala b/dom/src/main/scala/org/http4s/dom/FetchClient.scala index 80481207..30695cd3 100644 --- a/dom/src/main/scala/org/http4s/dom/FetchClient.scala +++ b/dom/src/main/scala/org/http4s/dom/FetchClient.scala @@ -74,7 +74,7 @@ private[dom] object FetchClient { val init = new RequestInit {} init.method = req.method.name.asInstanceOf[HttpMethod] - init.headers = new Headers(toDomHeaders(req.headers)) + init.headers = new Headers(toDomHeaders(req.headers, request = true)) body.foreach { body => init.body = body if (supportsRequestStreams) diff --git a/dom/src/main/scala/org/http4s/dom/ServiceWorker.scala b/dom/src/main/scala/org/http4s/dom/ServiceWorker.scala index 1720e427..89d8a839 100644 --- a/dom/src/main/scala/org/http4s/dom/ServiceWorker.scala +++ b/dom/src/main/scala/org/http4s/dom/ServiceWorker.scala @@ -92,7 +92,7 @@ object ServiceWorker { new ResponseInit { this.status = response.status.code this.statusText = response.status.reason - this.headers = toDomHeaders(response.headers) + this.headers = toDomHeaders(response.headers, request = false) } ) } diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index 545ece30..e7de31a5 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -69,12 +69,12 @@ package object dom { ) } - private[dom] def toDomHeaders(headers: Headers): DomHeaders = { + private[dom] def toDomHeaders(headers: Headers, request: Boolean): DomHeaders = { val domHeaders = new DomHeaders() headers.foreach { case Header.Raw(name, value) => - if (name != `Transfer-Encoding`.name) - domHeaders.append(name.toString, value) + val skip = request && name == `Transfer-Encoding`.name + if (!skip) domHeaders.append(name.toString, value) } domHeaders } From 07febcceb620ff1eafd2988c7e5249b8c32c70dc Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 08:55:43 +0000 Subject: [PATCH 10/11] Hush mima on scala 3 --- build.sbt | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index ae9255c1..72ee94cb 100644 --- a/build.sbt +++ b/build.sbt @@ -120,7 +120,20 @@ lazy val dom = project "co.fs2" %%% "fs2-core" % fs2Version, "org.http4s" %%% "http4s-client" % http4sVersion, "org.scala-js" %%% "scalajs-dom" % scalaJSDomVersion - ) + ), + mimaBinaryIssueFilters ++= { + import com.typesafe.tools.mima.core._ + if (tlIsScala3.value) + Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.http4s.dom.package.closeReadableStream"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.http4s.dom.package.fromReadableStream"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.http4s.dom.package.toDomHeaders") + ) + else Seq() + } ) .enablePlugins(ScalaJSPlugin) From 2dee843c2d065adc2671096b52a89e3b0bf2c00d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 09:15:23 +0000 Subject: [PATCH 11/11] Add test for `ReadableStream` interop --- build.sbt | 3 +- .../main/scala/org/http4s/dom/package.scala | 18 +++----- .../org/http4s/ReadableStreamSuite.scala | 42 +++++++++++++++++++ 3 files changed, 50 insertions(+), 13 deletions(-) create mode 100644 tests/src/test/scala/org/http4s/ReadableStreamSuite.scala diff --git a/build.sbt b/build.sbt index 72ee94cb..dde4cb9e 100644 --- a/build.sbt +++ b/build.sbt @@ -159,7 +159,8 @@ def configureTest(project: Project): Project = libraryDependencies ++= Seq( "org.http4s" %%% "http4s-client-testkit" % http4sVersion, "org.scalameta" %%% "munit" % munitVersion % Test, - "org.typelevel" %%% "munit-cats-effect" % munitCEVersion % Test + "org.typelevel" %%% "munit-cats-effect" % munitCEVersion % Test, + "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test ), Compile / unmanagedSourceDirectories += (LocalRootProject / baseDirectory).value / "tests" / "src" / "main" / "scala", diff --git a/dom/src/main/scala/org/http4s/dom/package.scala b/dom/src/main/scala/org/http4s/dom/package.scala index e7de31a5..48ef58ca 100644 --- a/dom/src/main/scala/org/http4s/dom/package.scala +++ b/dom/src/main/scala/org/http4s/dom/package.scala @@ -48,14 +48,12 @@ package object dom { implicit def blobEncoder[F[_]](implicit F: Async[F]): EntityEncoder[F, Blob] = EntityEncoder.entityBodyEncoder.contramap { blob => - readReadableStream[F](F.delay(blob.stream()), cancelAfterUse = true) + readReadableStream[F](F.delay(blob.stream())) } implicit def readableStreamEncoder[F[_]: Async] : EntityEncoder[F, ReadableStream[Uint8Array]] = - EntityEncoder.entityBodyEncoder.contramap { rs => - readReadableStream(rs.pure, cancelAfterUse = true) - } + EntityEncoder.entityBodyEncoder.contramap { rs => readReadableStream(rs.pure) } private[dom] def fromDomResponse[F[_]](response: DomResponse)( implicit F: Async[F]): F[Response[F]] = @@ -64,7 +62,7 @@ package object dom { status = status, headers = fromDomHeaders(response.headers), body = Stream.fromOption(Option(response.body)).flatMap { rs => - readReadableStream[F](rs.pure, cancelAfterUse = true) + readReadableStream[F](rs.pure) } ) } @@ -84,9 +82,8 @@ package object dom { headers.map { header => header(0) -> header(1) }.toList ) - private def readReadableStream[F[_]]( - readableStream: F[ReadableStream[Uint8Array]], - cancelAfterUse: Boolean + private[dom] def readReadableStream[F[_]]( + readableStream: F[ReadableStream[Uint8Array]] )(implicit F: Async[F]): Stream[F, Byte] = { def read(readableStream: ReadableStream[Uint8Array]) = Stream @@ -102,10 +99,7 @@ package object dom { } } - if (cancelAfterUse) - Stream.bracketCase(readableStream)(cancelReadableStream(_, _)).flatMap(read(_)) - else - Stream.eval(readableStream).flatMap(read(_)) + Stream.bracketCase(readableStream)(cancelReadableStream(_, _)).flatMap(read(_)) } private[dom] def cancelReadableStream[F[_], A]( diff --git a/tests/src/test/scala/org/http4s/ReadableStreamSuite.scala b/tests/src/test/scala/org/http4s/ReadableStreamSuite.scala new file mode 100644 index 00000000..70a1946d --- /dev/null +++ b/tests/src/test/scala/org/http4s/ReadableStreamSuite.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2021 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.dom + +import cats.effect.IO +import fs2.Chunk +import fs2.Stream +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.effect.PropF.forAllF + +class ReadableStreamSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + + test("to/read ReadableStream") { + forAllF { (chunks: Vector[Vector[Byte]]) => + Stream + .emits(chunks) + .map(Chunk.seq(_)) + .unchunks + .through(in => Stream.resource(toReadableStream[IO](in))) + .flatMap(readable => readReadableStream(IO(readable))) + .compile + .toVector + .assertEquals(chunks.flatten) + } + } + +}