Skip to content

Commit

Permalink
Make opening a ws connection cancelable
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Oct 5, 2023
1 parent d65f363 commit 0a78ea4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Global / fileServicePort := {
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.staticcontent._
import java.net.InetSocketAddress
import scala.concurrent.duration._

(for {
deferredPort <- IO.deferred[Int]
Expand All @@ -66,6 +67,8 @@ Global / fileServicePort := {
.of[IO] {
case Method.GET -> Root / "ws" =>
wsb.build(identity)
case Method.GET -> Root / "slows" =>
IO.sleep(3.seconds) *> wsb.build(identity)
case req =>
fileService[IO](FileService.Config[IO](".")).orNotFound.run(req).map { res =>
// TODO find out why mime type is not auto-inferred
Expand Down
64 changes: 35 additions & 29 deletions dom/src/main/scala/org/http4s/dom/WebSocketClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,44 @@ object WebSocketClient {
messages <- Queue.unbounded[F, Option[MessageEvent]].toResource
mutex <- Mutex[F].toResource
close <- F.deferred[CloseEvent].toResource
ws <- Resource.makeCase {
F.async_[WebSocket] { cb =>
if (request.method != Method.GET)
cb(Left(new IllegalArgumentException("Must be GET Request")))

val protocols = request
.headers
.get(ci"Sec-WebSocket-Protocol")
.toList
.flatMap(_.toList.map(_.value))

val ws = new WebSocket(request.uri.renderString, protocols.toJSArray)
ws.binaryType = "arraybuffer" // the default is blob

ws.onopen = { _ =>
ws.onmessage = // setup message handler
e => dispatcher.unsafeRunAndForget(messages.offer(Some(e)))

ws.onclose = // replace the close handler
e => dispatcher.unsafeRunAndForget(messages.offer(None) *> close.complete(e))
ws <- Resource.makeCaseFull[F, WebSocket] { poll =>
poll {
F.async[WebSocket] { cb =>
F.delay {
if (request.method != Method.GET)
cb(Left(new IllegalArgumentException("Must be GET Request")))

val protocols = request
.headers
.get(ci"Sec-WebSocket-Protocol")
.toList
.flatMap(_.toList.map(_.value))

val ws = new WebSocket(request.uri.renderString, protocols.toJSArray)
ws.binaryType = "arraybuffer" // the default is blob

ws.onopen = { _ =>
ws.onmessage = e => // setup message handler
dispatcher.unsafeRunAndForget(messages.offer(Some(e)))

ws.onclose = e => // replace the close handler
dispatcher.unsafeRunAndForget(messages.offer(None) *> close.complete(e))

// no explicit error handler. according to spec:
// 1. an error event is *always* followed by a close event and
// 2. an error event doesn't carry any useful information *by design*

cb(Right(ws))
}

// no explicit error handler. according to spec:
// 1. an error event is *always* followed by a close event and
// 2. an error event doesn't carry any useful information *by design*
// a close at this stage can only be an error
// following spec we cannot get any detail about the error
// https://websockets.spec.whatwg.org/#eventdef-websocket-error
ws.onclose = _ => cb(Left(new IOException("Connection failed")))

cb(Right(ws))
Some(F.delay(ws.close()))
}
}

// a close at this stage can only be an error
// following spec we cannot get any detail about the error
// https://websockets.spec.whatwg.org/#eventdef-websocket-error
ws.onclose = _ => cb(Left(new IOException("Connection failed")))
}
} {
case (ws, exitCase) =>
Expand Down
14 changes: 14 additions & 0 deletions testsBrowser/src/test/scala/org/http4s/dom/WebSocketSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.http4s.dom.BuildInfo.fileServicePort
import scodec.bits.ByteVector

import java.io.IOException
import scala.concurrent.duration._

class WebSocketSuite extends CatsEffectSuite {

Expand Down Expand Up @@ -56,4 +57,17 @@ class WebSocketSuite extends CatsEffectSuite {
.intercept[IOException]
}

test("Cancel a connection attempt") {
WebSocketClient[IO]
.connectHighLevel(
WSRequest(Uri.fromString(s"ws://localhost:${fileServicePort}/slows").toOption.get))
.use_
.timeoutTo(100.millis, IO.unit)
.timed
.flatMap {
case (duration, _) =>
IO(assert(clue(duration) < 500.millis))
}
}

}

0 comments on commit 0a78ea4

Please sign in to comment.