@@ -42,7 +42,6 @@ import scodec.bits.ByteVector
42
42
43
43
import scala .scalajs .js
44
44
import scala .scalajs .js .JSConverters ._
45
- import fs2 .Pipe
46
45
47
46
final class WebSocketException private [dom] (
48
47
private [dom] val reason : String
@@ -55,8 +54,7 @@ object WebSocketClient {
55
54
for {
56
55
dispatcher <- Dispatcher [F ]
57
56
messages <- Queue .unbounded[F , Option [MessageEvent ]].toResource
58
- receiveSemaphore <- Semaphore [F ](1 ).toResource
59
- sendSemaphore <- Semaphore [F ](1 ).toResource
57
+ semaphore <- Semaphore [F ](1 ).toResource
60
58
error <- F .deferred[Either [Throwable , INothing ]].toResource
61
59
close <- F .deferred[CloseEvent ].toResource
62
60
ws <- Resource .makeCase {
@@ -127,15 +125,15 @@ object WebSocketClient {
127
125
def closeFrame : DeferredSource [F , WSFrame .Close ] =
128
126
(close : DeferredSource [F , CloseEvent ]).map(e => WSFrame .Close (e.code, e.reason))
129
127
130
- def receive : F [Option [WSDataFrame ]] = receiveSemaphore
128
+ def receive : F [Option [WSDataFrame ]] = semaphore
131
129
.permit
132
- .surround( OptionT (messages.take).semiflatMap(decodeMessage).value)
130
+ .use(_ => OptionT (messages.take).semiflatMap(decodeMessage).value)
133
131
.race(error.get.rethrow)
134
132
.map(_.merge)
135
133
136
134
override def receiveStream : Stream [F , WSDataFrame ] =
137
135
Stream
138
- .resource(receiveSemaphore .permit)
136
+ .resource(semaphore .permit)
139
137
.flatMap(_ => Stream .fromQueueNoneTerminated(messages))
140
138
.evalMap(decodeMessage)
141
139
.concurrently(Stream .exec(error.get.rethrow.widen))
@@ -152,10 +150,10 @@ object WebSocketClient {
152
150
}
153
151
154
152
override def sendText (text : String ): F [Unit ] =
155
- errorOr(sendSemaphore.permit.surround( F .delay(ws.send(text) )))
153
+ errorOr(F .delay(ws.send(text)))
156
154
157
155
override def sendBinary (bytes : ByteVector ): F [Unit ] =
158
- errorOr(sendSemaphore.permit.surround( F .delay(ws.send(bytes.toJSArrayBuffer) )))
156
+ errorOr(F .delay(ws.send(bytes.toJSArrayBuffer)))
159
157
160
158
def send (wsf : WSDataFrame ): F [Unit ] =
161
159
wsf match {
@@ -171,10 +169,7 @@ object WebSocketClient {
171
169
}
172
170
173
171
def sendMany [G [_]: Foldable , A <: WSDataFrame ](wsfs : G [A ]): F [Unit ] =
174
- sendSemaphore.permit.surround(wsfs.foldMapM(send(_)))
175
-
176
- override def sendPipe : Pipe [F , WSDataFrame , Unit ] = in =>
177
- Stream .resource(sendSemaphore.permit) >> in.evalMap(send(_))
172
+ wsfs.foldMapM(send(_))
178
173
179
174
def subprotocol : Option [String ] = Option (ws.protocol).filter(_.nonEmpty)
180
175
}
0 commit comments