Skip to content

Commit

Permalink
Merge pull request #327 from http4s/pr/mutex
Browse files Browse the repository at this point in the history
Use `Mutex` instead of `Semaphore(1)`
  • Loading branch information
armanbilge authored Oct 5, 2023
2 parents 8dffd92 + 07cc9d8 commit d65f363
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions dom/src/main/scala/org/http4s/dom/WebSocketClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import cats.effect.kernel.Async
import cats.effect.kernel.DeferredSource
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.effect.std.Mutex
import cats.effect.std.Queue
import cats.effect.std.Semaphore
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.Stream
Expand All @@ -50,7 +50,7 @@ object WebSocketClient {
for {
dispatcher <- Dispatcher.sequential[F]
messages <- Queue.unbounded[F, Option[MessageEvent]].toResource
semaphore <- Semaphore[F](1).toResource
mutex <- Mutex[F].toResource
close <- F.deferred[CloseEvent].toResource
ws <- Resource.makeCase {
F.async_[WebSocket] { cb =>
Expand Down Expand Up @@ -126,10 +126,10 @@ object WebSocketClient {
(close: DeferredSource[F, CloseEvent]).map(e => WSFrame.Close(e.code, e.reason))

def receive: F[Option[WSDataFrame]] =
semaphore.permit.surround(OptionT(messages.take).map(decodeMessage).value)
mutex.lock.surround(OptionT(messages.take).map(decodeMessage).value)

override def receiveStream: Stream[F, WSDataFrame] =
Stream.resource(semaphore.permit) >>
Stream.resource(mutex.lock) >>
Stream.fromQueueNoneTerminated(messages).map(decodeMessage)

private def decodeMessage(e: MessageEvent): WSDataFrame =
Expand Down

0 comments on commit d65f363

Please sign in to comment.