@@ -39,6 +39,7 @@ import org.scalajs.dom.WebSocket
39
39
import org .typelevel .ci ._
40
40
import scodec .bits .ByteVector
41
41
42
+ import java .io .IOException
42
43
import scala .scalajs .js
43
44
import scala .scalajs .js .JSConverters ._
44
45
@@ -50,7 +51,6 @@ object WebSocketClient {
50
51
dispatcher <- Dispatcher .sequential[F ]
51
52
messages <- Queue .unbounded[F , Option [MessageEvent ]].toResource
52
53
semaphore <- Semaphore [F ](1 ).toResource
53
- error <- F .deferred[Throwable ].toResource
54
54
close <- F .deferred[CloseEvent ].toResource
55
55
ws <- Resource .makeCase {
56
56
F .async_[WebSocket ] { cb =>
@@ -67,15 +67,23 @@ object WebSocketClient {
67
67
ws.binaryType = " arraybuffer" // the default is blob
68
68
69
69
ws.onopen = { _ =>
70
- ws.onerror = // replace the error handler
71
- e => dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException (e)))
70
+ ws.onmessage = // setup message handler
71
+ e => dispatcher.unsafeRunAndForget(messages.offer(Some (e)))
72
+
73
+ ws.onclose = // replace the close handler
74
+ e => dispatcher.unsafeRunAndForget(messages.offer(None ) *> close.complete(e))
75
+
76
+ // no explicit error handler. according to spec:
77
+ // 1. an error event is *always* followed by a close event and
78
+ // 2. an error event doesn't carry any useful information *by design*
79
+
72
80
cb(Right (ws))
73
81
}
74
82
75
- ws.onerror = e => cb( Left (js. JavaScriptException (e)))
76
- ws.onmessage = e => dispatcher.unsafeRunAndForget(messages.offer( Some (e)))
77
- ws.onclose =
78
- e => dispatcher.unsafeRunAndForget(messages.offer( None ) *> close.complete(e ))
83
+ // a close at this stage can only be an error
84
+ // following spec we cannot get any detail about the error
85
+ // https://websockets.spec.whatwg.org/#eventdef-websocket-error
86
+ ws.onclose = _ => cb( Left ( new IOException ( " Connection failed " ) ))
79
87
}
80
88
} {
81
89
case (ws, exitCase) =>
@@ -117,33 +125,27 @@ object WebSocketClient {
117
125
def closeFrame : DeferredSource [F , WSFrame .Close ] =
118
126
(close : DeferredSource [F , CloseEvent ]).map(e => WSFrame .Close (e.code, e.reason))
119
127
120
- def receive : F [Option [WSDataFrame ]] = semaphore
121
- .permit
122
- .surround(OptionT (messages.take).map(decodeMessage).value)
123
- .race(error.get.flatMap(F .raiseError[Option [WSDataFrame ]]))
124
- .map(_.merge)
128
+ def receive : F [Option [WSDataFrame ]] =
129
+ semaphore.permit.surround(OptionT (messages.take).map(decodeMessage).value)
125
130
126
131
override def receiveStream : Stream [F , WSDataFrame ] =
127
- Stream
128
- .resource(semaphore.permit)
129
- .flatMap(_ => Stream .fromQueueNoneTerminated(messages))
130
- .map(decodeMessage)
131
- .concurrently(Stream .exec(error.get.flatMap(F .raiseError)))
132
+ Stream .resource(semaphore.permit) >>
133
+ Stream .fromQueueNoneTerminated(messages).map(decodeMessage)
132
134
133
135
private def decodeMessage (e : MessageEvent ): WSDataFrame =
134
136
e.data match {
135
137
case s : String => WSFrame .Text (s)
136
138
case b : js.typedarray.ArrayBuffer =>
137
139
WSFrame .Binary (ByteVector .fromJSArrayBuffer(b))
138
140
case _ => // this should never happen
139
- throw new RuntimeException
141
+ throw new AssertionError
140
142
}
141
143
142
144
override def sendText (text : String ): F [Unit ] =
143
- errorOr( F .delay(ws.send(text) ))
145
+ F .delay(ws.send(text))
144
146
145
147
override def sendBinary (bytes : ByteVector ): F [Unit ] =
146
- errorOr( F .delay(ws.send(bytes.toJSArrayBuffer) ))
148
+ F .delay(ws.send(bytes.toJSArrayBuffer))
147
149
148
150
def send (wsf : WSDataFrame ): F [Unit ] =
149
151
wsf match {
@@ -153,11 +155,6 @@ object WebSocketClient {
153
155
F .raiseError(new IllegalArgumentException (" DataFrames cannot be fragmented" ))
154
156
}
155
157
156
- private def errorOr (fu : F [Unit ]): F [Unit ] = error.tryGet.flatMap {
157
- case Some (error) => F .raiseError(error)
158
- case None => fu
159
- }
160
-
161
158
def sendMany [G [_]: Foldable , A <: WSDataFrame ](wsfs : G [A ]): F [Unit ] =
162
159
wsfs.foldMapM(send(_))
163
160
0 commit comments