Skip to content

Commit a236060

Browse files
authored
Merge pull request #93 from http4s/feature/websocket-client
Implement WebSocket client
2 parents 1900f95 + 08f44a4 commit a236060

File tree

5 files changed

+246
-20
lines changed

5 files changed

+246
-20
lines changed

build.sbt

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,29 +53,35 @@ Global / fileServicePort := {
5353
import cats.data.Kleisli
5454
import cats.effect.IO
5555
import cats.effect.unsafe.implicits.global
56-
import com.comcast.ip4s.Port
57-
import org.http4s.ember.server.EmberServerBuilder
56+
import org.http4s._
57+
import org.http4s.dsl.io._
58+
import org.http4s.blaze.server.BlazeServerBuilder
5859
import org.http4s.server.staticcontent._
60+
import java.net.InetSocketAddress
5961

6062
(for {
6163
deferredPort <- IO.deferred[Int]
62-
_ <- EmberServerBuilder
63-
.default[IO]
64-
.withPort(Port.fromInt(0).get)
65-
.withHttpApp {
66-
Kleisli { req =>
67-
fileService[IO](FileService.Config(".")).orNotFound.run(req).map { res =>
68-
// TODO find out why mime type is not auto-inferred
69-
if (req.uri.renderString.endsWith(".js"))
70-
res.withHeaders(
71-
"Service-Worker-Allowed" -> "/",
72-
"Content-Type" -> "text/javascript"
73-
)
74-
else res
64+
_ <- BlazeServerBuilder[IO]
65+
.bindSocketAddress(new InetSocketAddress("localhost", 0))
66+
.withHttpWebSocketApp { wsb =>
67+
HttpRoutes
68+
.of[IO] {
69+
case Method.GET -> Root / "ws" =>
70+
wsb.build(identity)
71+
case req =>
72+
fileService[IO](FileService.Config(".")).orNotFound.run(req).map { res =>
73+
// TODO find out why mime type is not auto-inferred
74+
if (req.uri.renderString.endsWith(".js"))
75+
res.withHeaders(
76+
"Service-Worker-Allowed" -> "/",
77+
"Content-Type" -> "text/javascript"
78+
)
79+
else res
80+
}
7581
}
76-
}
82+
.orNotFound
7783
}
78-
.build
84+
.resource
7985
.map(_.address.getPort)
8086
.evalTap(deferredPort.complete(_))
8187
.useForever
@@ -133,7 +139,7 @@ lazy val tests = project
133139
.settings(
134140
scalaJSUseMainModuleInitializer := true,
135141
(Test / test) := (Test / test).dependsOn(Compile / fastOptJS).value,
136-
buildInfoKeys := Seq[BuildInfoKey](scalaVersion),
142+
buildInfoKeys := Seq[BuildInfoKey](scalaVersion, fileServicePort),
137143
buildInfoPackage := "org.http4s.dom",
138144
libraryDependencies ++= Seq(
139145
"org.scalameta" %%% "munit" % munitVersion % Test,
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2021 http4s.org
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.http4s.dom
18+
19+
import cats.Foldable
20+
import cats.data.OptionT
21+
import cats.effect.kernel.Async
22+
import cats.effect.kernel.DeferredSource
23+
import cats.effect.kernel.Resource
24+
import cats.effect.std.Dispatcher
25+
import cats.effect.std.Queue
26+
import cats.effect.std.Semaphore
27+
import cats.effect.syntax.all._
28+
import cats.syntax.all._
29+
import fs2.INothing
30+
import fs2.Stream
31+
import org.http4s.Method
32+
import org.http4s.client.websocket.WSClientHighLevel
33+
import org.http4s.client.websocket.WSConnectionHighLevel
34+
import org.http4s.client.websocket.WSDataFrame
35+
import org.http4s.client.websocket.WSFrame
36+
import org.http4s.client.websocket.WSRequest
37+
import org.scalajs.dom.CloseEvent
38+
import org.scalajs.dom.MessageEvent
39+
import org.scalajs.dom.WebSocket
40+
import org.typelevel.ci._
41+
import scodec.bits.ByteVector
42+
43+
import scala.scalajs.js
44+
import scala.scalajs.js.JSConverters._
45+
46+
object WebSocketClient {
47+
48+
def apply[F[_]](implicit F: Async[F]): WSClientHighLevel[F] = new WSClientHighLevel[F] {
49+
def connectHighLevel(request: WSRequest): Resource[F, WSConnectionHighLevel[F]] =
50+
for {
51+
dispatcher <- Dispatcher[F]
52+
messages <- Queue.unbounded[F, Option[MessageEvent]].toResource
53+
semaphore <- Semaphore[F](1).toResource
54+
error <- F.deferred[Either[Throwable, INothing]].toResource
55+
close <- F.deferred[CloseEvent].toResource
56+
ws <- Resource.makeCase {
57+
F.async_[WebSocket] { cb =>
58+
if (request.method != Method.GET)
59+
cb(Left(new IllegalArgumentException("Must be GET Request")))
60+
61+
val protocols = request
62+
.headers
63+
.get(ci"Sec-WebSocket-Protocol")
64+
.toList
65+
.flatMap(_.toList.map(_.value))
66+
67+
val ws = new WebSocket(request.uri.renderString, protocols.toJSArray)
68+
ws.binaryType = "arraybuffer" // the default is blob
69+
70+
ws.onopen = { _ =>
71+
ws.onerror = // replace the error handler
72+
e =>
73+
dispatcher.unsafeRunAndForget(error.complete(Left(js.JavaScriptException(e))))
74+
cb(Right(ws))
75+
}
76+
77+
ws.onerror = e => cb(Left(js.JavaScriptException(e)))
78+
ws.onmessage = e => dispatcher.unsafeRunAndForget(messages.offer(Some(e)))
79+
ws.onclose =
80+
e => dispatcher.unsafeRunAndForget(messages.offer(None) *> close.complete(e))
81+
}
82+
} {
83+
case (ws, exitCase) =>
84+
val reason = exitCase match {
85+
case Resource.ExitCase.Succeeded =>
86+
None
87+
case Resource.ExitCase.Errored(ex) =>
88+
val reason = ex.toString
89+
// reason must be no longer than 123 bytes of UTF-8 text
90+
// UTF-8 character is max 4 bytes so we can fast-path
91+
if (reason.length <= 30 || reason.getBytes.length <= 123)
92+
Some(reason)
93+
else
94+
None
95+
case Resource.ExitCase.Canceled =>
96+
Some("canceled")
97+
}
98+
99+
val shutdown = F
100+
.async_[CloseEvent] { cb =>
101+
ws.onerror = e => cb(Left(js.JavaScriptException(e)))
102+
ws.onclose = e => cb(Right(e))
103+
reason match { // 1000 "normal closure" is only code supported in browser
104+
case Some(reason) => ws.close(1000, reason)
105+
case None => ws.close(1000)
106+
}
107+
}
108+
.flatMap(close.complete(_)) *> messages.offer(None)
109+
110+
F.delay(ws.readyState).flatMap {
111+
case 0 | 1 => shutdown // CONNECTING | OPEN
112+
case 2 => close.get.void // CLOSING
113+
case 3 => F.unit // CLOSED
114+
case s => F.raiseError(new IllegalStateException(s"WebSocket.readyState: $s"))
115+
}
116+
}
117+
} yield new WSConnectionHighLevel[F] {
118+
119+
def closeFrame: DeferredSource[F, WSFrame.Close] =
120+
(close: DeferredSource[F, CloseEvent]).map(e => WSFrame.Close(e.code, e.reason))
121+
122+
def receive: F[Option[WSDataFrame]] = semaphore
123+
.permit
124+
.surround(OptionT(messages.take).map(decodeMessage).value)
125+
.race(error.get.rethrow)
126+
.map(_.merge)
127+
128+
override def receiveStream: Stream[F, WSDataFrame] =
129+
Stream
130+
.resource(semaphore.permit)
131+
.flatMap(_ => Stream.fromQueueNoneTerminated(messages))
132+
.map(decodeMessage)
133+
.concurrently(Stream.exec(error.get.rethrow.widen))
134+
135+
private def decodeMessage(e: MessageEvent): WSDataFrame =
136+
e.data match {
137+
case s: String => WSFrame.Text(s)
138+
case b: js.typedarray.ArrayBuffer =>
139+
WSFrame.Binary(ByteVector.fromJSArrayBuffer(b))
140+
case _ => // this should never happen
141+
throw new RuntimeException
142+
}
143+
144+
override def sendText(text: String): F[Unit] =
145+
errorOr(F.delay(ws.send(text)))
146+
147+
override def sendBinary(bytes: ByteVector): F[Unit] =
148+
errorOr(F.delay(ws.send(bytes.toJSArrayBuffer)))
149+
150+
def send(wsf: WSDataFrame): F[Unit] =
151+
wsf match {
152+
case WSFrame.Text(data, true) => sendText(data)
153+
case WSFrame.Binary(data, true) => sendBinary(data)
154+
case _ =>
155+
F.raiseError(new IllegalArgumentException("DataFrames cannot be fragmented"))
156+
}
157+
158+
private def errorOr(fu: F[Unit]): F[Unit] = error.tryGet.flatMap {
159+
case Some(error) => F.fromEither[Unit](error)
160+
case None => fu
161+
}
162+
163+
def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit] =
164+
wsfs.foldMapM(send(_))
165+
166+
def subprotocol: Option[String] = Option(ws.protocol).filter(_.nonEmpty)
167+
}
168+
}
169+
170+
}

project/plugins.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ enablePlugins(BuildInfoPlugin)
44
buildInfoKeys += "http4sVersion" -> http4sVersion
55

66
libraryDependencies += "org.scala-js" %% "scalajs-env-selenium" % "1.1.1"
7-
libraryDependencies += "org.http4s" %% "http4s-ember-server" % http4sVersion
7+
libraryDependencies += "org.http4s" %% "http4s-dsl" % http4sVersion
8+
libraryDependencies += "org.http4s" %% "http4s-blaze-server" % http4sVersion
89

910
addSbtPlugin("org.http4s" % "sbt-http4s-org" % "0.13.0")
1011
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.1")

tests/src/test/scala/org/http4s/dom/DomSuite.scala renamed to tests/src/test/scala/org/http4s/dom/FetchServiceWorkerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.scalajs.dom.window
3232
import scala.concurrent.duration._
3333
import scala.scalajs.js
3434

35-
class DomSuite extends CatsEffectSuite {
35+
class FetchServiceWorkerSuite extends CatsEffectSuite {
3636

3737
def scalaVersion = if (BuildInfo.scalaVersion.startsWith("2"))
3838
BuildInfo.scalaVersion.split("\\.").init.mkString(".")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2021 http4s.org
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.http4s.dom
18+
19+
import cats.effect.IO
20+
import munit.CatsEffectSuite
21+
import org.http4s.Uri
22+
import org.http4s.client.websocket.WSFrame
23+
import org.http4s.client.websocket.WSRequest
24+
import org.http4s.dom.BuildInfo.fileServicePort
25+
import scodec.bits.ByteVector
26+
27+
class WebSocketSuite extends CatsEffectSuite {
28+
29+
test("send and receive frames") {
30+
WebSocketClient[IO]
31+
.connectHighLevel(
32+
WSRequest(Uri.fromString(s"ws://localhost:${fileServicePort}/ws").toOption.get))
33+
.use { conn =>
34+
for {
35+
_ <- conn.send(WSFrame.Binary(ByteVector(15, 2, 3)))
36+
_ <- conn.sendMany(List(WSFrame.Text("foo"), WSFrame.Text("bar")))
37+
recv <- conn.receiveStream.take(3).compile.toList
38+
} yield recv
39+
}
40+
.assertEquals(
41+
List(
42+
WSFrame.Binary(ByteVector(15, 2, 3)),
43+
WSFrame.Text("foo"),
44+
WSFrame.Text("bar")
45+
)
46+
)
47+
}
48+
49+
}

0 commit comments

Comments
 (0)