From 07cc9d85d5bd7d6042ece9149a2bfdb61f01f95b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 5 Oct 2023 01:27:52 +0000 Subject: [PATCH] Use `Mutex` instead of `Semaphore(1)` --- dom/src/main/scala/org/http4s/dom/WebSocketClient.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dom/src/main/scala/org/http4s/dom/WebSocketClient.scala b/dom/src/main/scala/org/http4s/dom/WebSocketClient.scala index a674ebbf..a9050ab0 100644 --- a/dom/src/main/scala/org/http4s/dom/WebSocketClient.scala +++ b/dom/src/main/scala/org/http4s/dom/WebSocketClient.scala @@ -22,8 +22,8 @@ import cats.effect.kernel.Async import cats.effect.kernel.DeferredSource import cats.effect.kernel.Resource import cats.effect.std.Dispatcher +import cats.effect.std.Mutex import cats.effect.std.Queue -import cats.effect.std.Semaphore import cats.effect.syntax.all._ import cats.syntax.all._ import fs2.Stream @@ -50,7 +50,7 @@ object WebSocketClient { for { dispatcher <- Dispatcher.sequential[F] messages <- Queue.unbounded[F, Option[MessageEvent]].toResource - semaphore <- Semaphore[F](1).toResource + mutex <- Mutex[F].toResource close <- F.deferred[CloseEvent].toResource ws <- Resource.makeCase { F.async_[WebSocket] { cb => @@ -126,10 +126,10 @@ object WebSocketClient { (close: DeferredSource[F, CloseEvent]).map(e => WSFrame.Close(e.code, e.reason)) def receive: F[Option[WSDataFrame]] = - semaphore.permit.surround(OptionT(messages.take).map(decodeMessage).value) + mutex.lock.surround(OptionT(messages.take).map(decodeMessage).value) override def receiveStream: Stream[F, WSDataFrame] = - Stream.resource(semaphore.permit) >> + Stream.resource(mutex.lock) >> Stream.fromQueueNoneTerminated(messages).map(decodeMessage) private def decodeMessage(e: MessageEvent): WSDataFrame =