From 39f2bcd22c95256a45866e4c8ec5dc14bd272dc0 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 01:03:27 -0600 Subject: [PATCH 01/10] Implement Shared --- core/shared/src/main/scala/fs2/Shared.scala | 69 +++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 core/shared/src/main/scala/fs2/Shared.scala diff --git a/core/shared/src/main/scala/fs2/Shared.scala b/core/shared/src/main/scala/fs2/Shared.scala new file mode 100644 index 0000000000..2e9dd6c4fb --- /dev/null +++ b/core/shared/src/main/scala/fs2/Shared.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 + +import cats.effect._ +import cats.effect.kernel.Resource +import cats.syntax.all._ + +sealed trait Shared[F[_], A] { + def resource: Resource[F, A] +} + +object Shared { + def allocate[F[_], A]( + resource: Resource[F, A] + )(implicit F: Concurrent[F]): Resource[F, (Shared[F, A], A)] = { + final case class State(value: A, finalizer: F[Unit], permits: Int) { + def addPermit: State = copy(permits = permits + 1) + def releasePermit: State = copy(permits = permits - 1) + } + + MonadCancel[Resource[F, *]].uncancelable { _ => + for { + shared <- Resource.eval(resource.allocated.flatMap { case (a, fin) => + F.ref[Option[State]](Some(State(a, fin, 0))).map { state => + def acquire: F[A] = + state.modify { + case Some(st) => (Some(st.addPermit), F.pure(st.value)) + case None => + (None, F.raiseError[A](new Throwable("finalization has already occurred"))) + }.flatten + + def release: F[Unit] = + state.modify { + case Some(st) if st.permits > 1 => (Some(st.releasePermit), F.unit) + case Some(st) => (None, st.finalizer) + case None => (None, F.raiseError[Unit](new Throwable("can't finalize"))) + }.flatten + + new Shared[F, A] { + override def resource: Resource[F, A] = + Resource.make(acquire)(_ => release) + } + } + }) + a <- shared.resource + } yield (shared, a) + } + } +} From f3751c776c395a944a2180b7d5c0e5f4dabd6b10 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 01:20:59 -0600 Subject: [PATCH 02/10] Revise SocketGroup server operations to yield a stream of shared sockets --- core/shared/src/main/scala/fs2/Shared.scala | 1 - io/src/main/scala/fs2/io/net/Network.scala | 4 +- io/src/main/scala/fs2/io/net/Socket.scala | 12 ++- .../main/scala/fs2/io/net/SocketGroup.scala | 77 ++++++++++--------- 4 files changed, 49 insertions(+), 45 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Shared.scala b/core/shared/src/main/scala/fs2/Shared.scala index 2e9dd6c4fb..b80cb8093d 100644 --- a/core/shared/src/main/scala/fs2/Shared.scala +++ b/core/shared/src/main/scala/fs2/Shared.scala @@ -22,7 +22,6 @@ package fs2 import cats.effect._ -import cats.effect.kernel.Resource import cats.syntax.all._ sealed trait Shared[F[_], A] { diff --git a/io/src/main/scala/fs2/io/net/Network.scala b/io/src/main/scala/fs2/io/net/Network.scala index 208df73282..755a2b888a 100644 --- a/io/src/main/scala/fs2/io/net/Network.scala +++ b/io/src/main/scala/fs2/io/net/Network.scala @@ -132,13 +132,13 @@ object Network { address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Stream[F, Socket[F]] = globalSocketGroup.server(address, port, options) + ): Stream[F, Shared[F, Socket[F]]] = globalSocketGroup.server(address, port, options) def serverResource( address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = globalSocketGroup.serverResource(address, port, options) def openDatagramSocket( diff --git a/io/src/main/scala/fs2/io/net/Socket.scala b/io/src/main/scala/fs2/io/net/Socket.scala index 0b0a191679..20ac671433 100644 --- a/io/src/main/scala/fs2/io/net/Socket.scala +++ b/io/src/main/scala/fs2/io/net/Socket.scala @@ -24,7 +24,7 @@ package io package net import com.comcast.ip4s.{IpAddress, SocketAddress} -import cats.effect.{Async, Resource} +import cats.effect.Async import cats.effect.std.Semaphore import cats.syntax.all._ @@ -79,12 +79,10 @@ trait Socket[F[_]] { object Socket { private[net] def forAsync[F[_]: Async]( ch: AsynchronousSocketChannel - ): Resource[F, Socket[F]] = - Resource.make { - (Semaphore[F](1), Semaphore[F](1)).mapN { (readSemaphore, writeSemaphore) => - new AsyncSocket[F](ch, readSemaphore, writeSemaphore) - } - }(_ => Async[F].delay(if (ch.isOpen) ch.close else ())) + ): F[Socket[F]] = + (Semaphore[F](1), Semaphore[F](1)).mapN { (readSemaphore, writeSemaphore) => + new AsyncSocket[F](ch, readSemaphore, writeSemaphore) + } private final class AsyncSocket[F[_]]( ch: AsynchronousSocketChannel, diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index cb92e71f74..f266371384 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -68,7 +68,7 @@ trait SocketGroup[F[_]] { address: Option[Host] = None, port: Option[Port] = None, options: List[SocketOption] = List.empty - ): Stream[F, Socket[F]] + ): Stream[F, Shared[F, Socket[F]]] /** Like [[server]] but provides the `SocketAddress` of the bound server socket before providing accepted sockets. */ @@ -76,7 +76,7 @@ trait SocketGroup[F[_]] { address: Option[Host] = None, port: Option[Port] = None, options: List[SocketOption] = List.empty - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] } private[net] object SocketGroup { @@ -98,30 +98,32 @@ private[net] object SocketGroup { ch } - def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] = - to.resolve[F].flatMap { ip => - Async[F].async_[AsynchronousSocketChannel] { cb => - ch.connect( - ip.toInetSocketAddress, - null, - new CompletionHandler[Void, Void] { - def completed(result: Void, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) - } + def connect(ch: AsynchronousSocketChannel): Resource[F, AsynchronousSocketChannel] = + Resource.eval(to.resolve[F]).flatMap { ip => + Resource.make { + Async[F].async_[AsynchronousSocketChannel] { cb => + ch.connect( + ip.toInetSocketAddress, + null, + new CompletionHandler[Void, Void] { + def completed(result: Void, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) + } + }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) } - Resource.eval(setup.flatMap(connect)).flatMap(Socket.forAsync(_)) + Resource.eval(setup).flatMap(connect).evalMap(Socket.forAsync(_)) } def server( address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Stream[F, Socket[F]] = + ): Stream[F, Shared[F, Socket[F]]] = Stream .resource( serverResource( @@ -136,7 +138,7 @@ private[net] object SocketGroup { address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = { + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = { val setup: F[AsynchronousServerSocketChannel] = address.traverse(_.resolve[F]).flatMap { addr => @@ -161,24 +163,29 @@ private[net] object SocketGroup { def acceptIncoming( sch: AsynchronousServerSocketChannel - ): Stream[F, Socket[F]] = { - def go: Stream[F, Socket[F]] = { - def acceptChannel: F[AsynchronousSocketChannel] = - Async[F].async_[AsynchronousSocketChannel] { cb => - sch.accept( - null, - new CompletionHandler[AsynchronousSocketChannel, Void] { - def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) - } + ): Stream[F, Shared[F, Socket[F]]] = { + def go: Stream[F, Shared[F, Socket[F]]] = { + def acceptChannel: Resource[F, Shared[F, Socket[F]]] = { + val acceptResource = Resource.make { + Async[F].async_[AsynchronousSocketChannel] { cb => + sch.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void] { + def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) + } + }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) + + Shared.allocate(acceptResource.evalMap(Socket.forAsync(_))).map(_._1) + } - Stream.eval(acceptChannel.attempt).flatMap { + Stream.resource(acceptChannel.attempt).flatMap { case Left(_) => Stream.empty[F] - case Right(accepted) => Stream.resource(Socket.forAsync(accepted)) + case Right(accepted) => Stream(accepted) } ++ go } From 8739f4da088b0e827ec54830f5290030c53645ff Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 02:01:31 -0600 Subject: [PATCH 03/10] Reorganize Shared --- core/shared/src/main/scala/fs2/Shared.scala | 48 +++++++++++---------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Shared.scala b/core/shared/src/main/scala/fs2/Shared.scala index b80cb8093d..720a7e80d5 100644 --- a/core/shared/src/main/scala/fs2/Shared.scala +++ b/core/shared/src/main/scala/fs2/Shared.scala @@ -39,30 +39,34 @@ object Shared { MonadCancel[Resource[F, *]].uncancelable { _ => for { - shared <- Resource.eval(resource.allocated.flatMap { case (a, fin) => - F.ref[Option[State]](Some(State(a, fin, 0))).map { state => - def acquire: F[A] = - state.modify { - case Some(st) => (Some(st.addPermit), F.pure(st.value)) - case None => - (None, F.raiseError[A](new Throwable("finalization has already occurred"))) - }.flatten + underlying <- Resource.eval(resource.allocated) + state <- Resource.eval(F.ref[Option[State]](Some(State(underlying._1, underlying._2, 0)))) + shared = new Shared[F, A] { + def acquire: F[A] = + state.modify { + case Some(st) => + println("acquired") + (Some(st.addPermit), F.pure(st.value)) + case None => + (None, F.raiseError[A](new Throwable("finalization has already occurred"))) + }.flatten - def release: F[Unit] = - state.modify { - case Some(st) if st.permits > 1 => (Some(st.releasePermit), F.unit) - case Some(st) => (None, st.finalizer) - case None => (None, F.raiseError[Unit](new Throwable("can't finalize"))) - }.flatten + def release: F[Unit] = + state.modify { + case Some(st) if st.permits > 1 => + println("released with many") + (Some(st.releasePermit), F.unit) + case Some(st) => + println("released with one") + (None, st.finalizer) + case None => (None, F.raiseError[Unit](new Throwable("can't finalize"))) + }.flatten - new Shared[F, A] { - override def resource: Resource[F, A] = - Resource.make(acquire)(_ => release) - } - } - }) - a <- shared.resource - } yield (shared, a) + override def resource: Resource[F, A] = + Resource.make(acquire)(_ => release) + } + _ <- shared.resource + } yield shared -> underlying._1 } } } From 112b322c437fe830dbb1194ef5846b954b34e14f Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 02:01:48 -0600 Subject: [PATCH 04/10] Tests are compiling again --- .../scala/fs2/io/net/tcp/SocketSuite.scala | 26 ++++++++++++------- .../scala/fs2/io/net/tls/TLSSocketSuite.scala | 7 ++++- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala b/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala index f9a2b849b4..9afa35f4d4 100644 --- a/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala +++ b/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala @@ -50,10 +50,12 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val echoServer = server.map { socket => - socket.reads - .through(socket.writes) - .onFinalize(socket.endOfOutput) + val echoServer = server.map { shared => + Stream.resource(shared.resource).flatMap { socket => + socket.reads + .through(socket.writes) + .onFinalize(socket.endOfOutput) + } }.parJoinUnbounded val msgClients = clients @@ -86,11 +88,13 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val junkServer = server.map { socket => - Stream - .chunk(message) - .through(socket.writes) - .onFinalize(socket.endOfOutput) + val junkServer = server.map { shared => + Stream.resource(shared.resource).flatMap { socket => + Stream + .chunk(message) + .through(socket.writes) + .onFinalize(socket.endOfOutput) + } }.parJoinUnbounded val client = @@ -117,7 +121,9 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val readOnlyServer = server.map(_.reads).parJoinUnbounded + val readOnlyServer = server.map { shared => + Stream.resource(shared.resource).flatMap(_.reads) + }.parJoinUnbounded val client = clients.take(1).flatMap { socket => // concurrent writes diff --git a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala index 5f8a58a802..1cdb6ce8b8 100644 --- a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala +++ b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala @@ -117,7 +117,12 @@ class TLSSocketSuite extends TLSSuite { addressAndConnections <- Network[IO].serverResource(Some(ip"127.0.0.1")) (serverAddress, server) = addressAndConnections client <- Network[IO].client(serverAddress).flatMap(tlsContext.client(_)) - } yield server.flatMap(s => Stream.resource(tlsContext.server(s))) -> client + } yield { + val tlsServer = server.flatMap { shared => + Stream.resource(shared.resource.flatMap(tlsContext.server(_))) + } + tlsServer -> client + } Stream .resource(setup) From 8b72d90280f5d91309b08c6f981852815420ce42 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 02:08:31 -0600 Subject: [PATCH 05/10] Try makeFull to surface interruption --- core/shared/src/main/scala/fs2/Shared.scala | 12 ++--- .../main/scala/fs2/io/net/SocketGroup.scala | 50 ++++++++++--------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Shared.scala b/core/shared/src/main/scala/fs2/Shared.scala index 720a7e80d5..8a83ac3277 100644 --- a/core/shared/src/main/scala/fs2/Shared.scala +++ b/core/shared/src/main/scala/fs2/Shared.scala @@ -44,21 +44,15 @@ object Shared { shared = new Shared[F, A] { def acquire: F[A] = state.modify { - case Some(st) => - println("acquired") - (Some(st.addPermit), F.pure(st.value)) + case Some(st) => (Some(st.addPermit), F.pure(st.value)) case None => (None, F.raiseError[A](new Throwable("finalization has already occurred"))) }.flatten def release: F[Unit] = state.modify { - case Some(st) if st.permits > 1 => - println("released with many") - (Some(st.releasePermit), F.unit) - case Some(st) => - println("released with one") - (None, st.finalizer) + case Some(st) if st.permits > 1 => (Some(st.releasePermit), F.unit) + case Some(st) => (None, st.finalizer) case None => (None, F.raiseError[Unit](new Throwable("can't finalize"))) }.flatten diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index f266371384..359b374a5c 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -100,18 +100,20 @@ private[net] object SocketGroup { def connect(ch: AsynchronousSocketChannel): Resource[F, AsynchronousSocketChannel] = Resource.eval(to.resolve[F]).flatMap { ip => - Resource.make { - Async[F].async_[AsynchronousSocketChannel] { cb => - ch.connect( - ip.toInetSocketAddress, - null, - new CompletionHandler[Void, Void] { - def completed(result: Void, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) + Resource.makeFull[F, AsynchronousSocketChannel] { poll => + poll { + Async[F].async_[AsynchronousSocketChannel] { cb => + ch.connect( + ip.toInetSocketAddress, + null, + new CompletionHandler[Void, Void] { + def completed(result: Void, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) + } } }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) } @@ -166,17 +168,19 @@ private[net] object SocketGroup { ): Stream[F, Shared[F, Socket[F]]] = { def go: Stream[F, Shared[F, Socket[F]]] = { def acceptChannel: Resource[F, Shared[F, Socket[F]]] = { - val acceptResource = Resource.make { - Async[F].async_[AsynchronousSocketChannel] { cb => - sch.accept( - null, - new CompletionHandler[AsynchronousSocketChannel, Void] { - def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) + val acceptResource = Resource.makeFull[F, AsynchronousSocketChannel] { poll => + poll { + Async[F].async_[AsynchronousSocketChannel] { cb => + sch.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void] { + def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) + } } }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) From 199e370b7cb4d136fd513f5b0af9b6d5c08d807a Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 02:27:02 -0600 Subject: [PATCH 06/10] Allocate shared socket separately from accept --- .../main/scala/fs2/io/net/SocketGroup.scala | 78 +++++++++---------- .../scala/fs2/io/net/tls/TLSSocketSuite.scala | 2 +- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index 359b374a5c..77d6fadd87 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -98,27 +98,25 @@ private[net] object SocketGroup { ch } - def connect(ch: AsynchronousSocketChannel): Resource[F, AsynchronousSocketChannel] = - Resource.eval(to.resolve[F]).flatMap { ip => - Resource.makeFull[F, AsynchronousSocketChannel] { poll => - poll { - Async[F].async_[AsynchronousSocketChannel] { cb => - ch.connect( - ip.toInetSocketAddress, - null, - new CompletionHandler[Void, Void] { - def completed(result: Void, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) + def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] = + to.resolve[F].flatMap { ip => + Async[F].async_[AsynchronousSocketChannel] { cb => + ch.connect( + ip.toInetSocketAddress, + null, + new CompletionHandler[Void, Void] { + def completed(result: Void, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) } - } - }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) + ) + } } - Resource.eval(setup).flatMap(connect).evalMap(Socket.forAsync(_)) + Resource + .make(setup.flatMap(connect))(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) + .evalMap(Socket.forAsync(_)) } def server( @@ -167,29 +165,31 @@ private[net] object SocketGroup { sch: AsynchronousServerSocketChannel ): Stream[F, Shared[F, Socket[F]]] = { def go: Stream[F, Shared[F, Socket[F]]] = { - def acceptChannel: Resource[F, Shared[F, Socket[F]]] = { - val acceptResource = Resource.makeFull[F, AsynchronousSocketChannel] { poll => - poll { - Async[F].async_[AsynchronousSocketChannel] { cb => - sch.accept( - null, - new CompletionHandler[AsynchronousSocketChannel, Void] { - def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) - } - ) + def acceptChannel: F[AsynchronousSocketChannel] = + Async[F].async_[AsynchronousSocketChannel] { cb => + sch.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void] { + def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) } - } - }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) - - Shared.allocate(acceptResource.evalMap(Socket.forAsync(_))).map(_._1) - } + ) + } - Stream.resource(acceptChannel.attempt).flatMap { - case Left(_) => Stream.empty[F] - case Right(accepted) => Stream(accepted) + Stream.eval(acceptChannel.attempt).flatMap { + case Left(_) => Stream.empty[F] + case Right(accepted) => + val socketResource = Resource + .make(Async[F].pure(accepted))(ch => + Async[F].delay( + if (ch.isOpen()) ch.close() + else () + ) + ) + .evalMap(Socket.forAsync(_)) + Stream.resource(Shared.allocate(socketResource)).map(_._1) } ++ go } diff --git a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala index 1cdb6ce8b8..cd89289717 100644 --- a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala +++ b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala @@ -118,7 +118,7 @@ class TLSSocketSuite extends TLSSuite { (serverAddress, server) = addressAndConnections client <- Network[IO].client(serverAddress).flatMap(tlsContext.client(_)) } yield { - val tlsServer = server.flatMap { shared => + val tlsServer = server.flatMap { shared => Stream.resource(shared.resource.flatMap(tlsContext.server(_))) } tlsServer -> client From 3e1018bf345ab509d1c1e936dafbcd48fbbeaecb Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 21:41:33 -0600 Subject: [PATCH 07/10] Fix resource polling --- core/shared/src/main/scala/fs2/Shared.scala | 4 +- .../main/scala/fs2/io/net/SocketGroup.scala | 40 +++++++++---------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Shared.scala b/core/shared/src/main/scala/fs2/Shared.scala index 8a83ac3277..d0c05c0454 100644 --- a/core/shared/src/main/scala/fs2/Shared.scala +++ b/core/shared/src/main/scala/fs2/Shared.scala @@ -37,9 +37,9 @@ object Shared { def releasePermit: State = copy(permits = permits - 1) } - MonadCancel[Resource[F, *]].uncancelable { _ => + MonadCancel[Resource[F, *]].uncancelable { poll => for { - underlying <- Resource.eval(resource.allocated) + underlying <- poll(Resource.eval(resource.allocated)) state <- Resource.eval(F.ref[Option[State]](Some(State(underlying._1, underlying._2, 0)))) shared = new Shared[F, A] { def acquire: F[A] = diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index 77d6fadd87..af56f5a9b1 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -165,31 +165,27 @@ private[net] object SocketGroup { sch: AsynchronousServerSocketChannel ): Stream[F, Shared[F, Socket[F]]] = { def go: Stream[F, Shared[F, Socket[F]]] = { - def acceptChannel: F[AsynchronousSocketChannel] = - Async[F].async_[AsynchronousSocketChannel] { cb => - sch.accept( - null, - new CompletionHandler[AsynchronousSocketChannel, Void] { - def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) + def acceptChannel: Resource[F, AsynchronousSocketChannel] = + Resource.makeFull[F, AsynchronousSocketChannel] { poll => + poll { + Async[F].async_[AsynchronousSocketChannel] { cb => + sch.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void] { + def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) } - ) - } + } + }(ch => Async[F].delay(if (ch.isOpen()) ch.close() else ())) - Stream.eval(acceptChannel.attempt).flatMap { + val sharedSocket = Shared.allocate(acceptChannel.evalMap(Socket.forAsync(_))) + Stream.resource(sharedSocket.attempt).flatMap { case Left(_) => Stream.empty[F] - case Right(accepted) => - val socketResource = Resource - .make(Async[F].pure(accepted))(ch => - Async[F].delay( - if (ch.isOpen()) ch.close() - else () - ) - ) - .evalMap(Socket.forAsync(_)) - Stream.resource(Shared.allocate(socketResource)).map(_._1) + case Right((shared, _)) => Stream(shared) } ++ go } From a6db21cf23ed598ead8dbf6c185a77a5994ccfd7 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 21:42:01 -0600 Subject: [PATCH 08/10] Scalafmt --- io/src/main/scala/fs2/io/net/SocketGroup.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index af56f5a9b1..67eda7d686 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -184,7 +184,7 @@ private[net] object SocketGroup { val sharedSocket = Shared.allocate(acceptChannel.evalMap(Socket.forAsync(_))) Stream.resource(sharedSocket.attempt).flatMap { - case Left(_) => Stream.empty[F] + case Left(_) => Stream.empty[F] case Right((shared, _)) => Stream(shared) } ++ go } From 42eaf3e26e86add1a2e435ebd6f0a584fde723a7 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 21:53:25 -0600 Subject: [PATCH 09/10] Recover old server signatures --- io/src/main/scala/fs2/io/net/Network.scala | 11 ++++++-- .../main/scala/fs2/io/net/SocketGroup.scala | 16 +++++++++--- .../scala/fs2/io/net/tcp/SocketSuite.scala | 26 +++++++------------ .../scala/fs2/io/net/tls/TLSSocketSuite.scala | 7 +---- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/io/src/main/scala/fs2/io/net/Network.scala b/io/src/main/scala/fs2/io/net/Network.scala index 755a2b888a..41310f33f2 100644 --- a/io/src/main/scala/fs2/io/net/Network.scala +++ b/io/src/main/scala/fs2/io/net/Network.scala @@ -132,15 +132,22 @@ object Network { address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Stream[F, Shared[F, Socket[F]]] = globalSocketGroup.server(address, port, options) + ): Stream[F, Socket[F]] = globalSocketGroup.server(address, port, options) def serverResource( address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = globalSocketGroup.serverResource(address, port, options) + // def serverResourceShared( + // address: Option[Host], + // port: Option[Port], + // options: List[SocketOption] + // ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = + // globalSocketGroup.serverResourceShared(address, port, options) + def openDatagramSocket( address: Option[Host], port: Option[Port], diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index 67eda7d686..06dcd585ae 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -68,7 +68,7 @@ trait SocketGroup[F[_]] { address: Option[Host] = None, port: Option[Port] = None, options: List[SocketOption] = List.empty - ): Stream[F, Shared[F, Socket[F]]] + ): Stream[F, Socket[F]] /** Like [[server]] but provides the `SocketAddress` of the bound server socket before providing accepted sockets. */ @@ -76,7 +76,7 @@ trait SocketGroup[F[_]] { address: Option[Host] = None, port: Option[Port] = None, options: List[SocketOption] = List.empty - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] } private[net] object SocketGroup { @@ -123,7 +123,7 @@ private[net] object SocketGroup { address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Stream[F, Shared[F, Socket[F]]] = + ): Stream[F, Socket[F]] = Stream .resource( serverResource( @@ -138,8 +138,16 @@ private[net] object SocketGroup { address: Option[Host], port: Option[Port], options: List[SocketOption] - ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = { + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + serverResourceShared(address, port, options).map { case (addr, clients) => + (addr, clients.flatMap(shared => Stream.resource(shared.resource))) + } + def serverResourceShared( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = { val setup: F[AsynchronousServerSocketChannel] = address.traverse(_.resolve[F]).flatMap { addr => Async[F].delay { diff --git a/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala b/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala index 9afa35f4d4..f9a2b849b4 100644 --- a/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala +++ b/io/src/test/scala/fs2/io/net/tcp/SocketSuite.scala @@ -50,12 +50,10 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val echoServer = server.map { shared => - Stream.resource(shared.resource).flatMap { socket => - socket.reads - .through(socket.writes) - .onFinalize(socket.endOfOutput) - } + val echoServer = server.map { socket => + socket.reads + .through(socket.writes) + .onFinalize(socket.endOfOutput) }.parJoinUnbounded val msgClients = clients @@ -88,13 +86,11 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val junkServer = server.map { shared => - Stream.resource(shared.resource).flatMap { socket => - Stream - .chunk(message) - .through(socket.writes) - .onFinalize(socket.endOfOutput) - } + val junkServer = server.map { socket => + Stream + .chunk(message) + .through(socket.writes) + .onFinalize(socket.endOfOutput) }.parJoinUnbounded val client = @@ -121,9 +117,7 @@ class SocketSuite extends Fs2Suite { Stream .resource(setup) .flatMap { case (server, clients) => - val readOnlyServer = server.map { shared => - Stream.resource(shared.resource).flatMap(_.reads) - }.parJoinUnbounded + val readOnlyServer = server.map(_.reads).parJoinUnbounded val client = clients.take(1).flatMap { socket => // concurrent writes diff --git a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala index cd89289717..5f8a58a802 100644 --- a/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala +++ b/io/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala @@ -117,12 +117,7 @@ class TLSSocketSuite extends TLSSuite { addressAndConnections <- Network[IO].serverResource(Some(ip"127.0.0.1")) (serverAddress, server) = addressAndConnections client <- Network[IO].client(serverAddress).flatMap(tlsContext.client(_)) - } yield { - val tlsServer = server.flatMap { shared => - Stream.resource(shared.resource.flatMap(tlsContext.server(_))) - } - tlsServer -> client - } + } yield server.flatMap(s => Stream.resource(tlsContext.server(s))) -> client Stream .resource(setup) From f34f47a4a0748940090fe089dc685e3b96af2cf2 Mon Sep 17 00:00:00 2001 From: Raas Ahsan Date: Thu, 25 Feb 2021 21:54:57 -0600 Subject: [PATCH 10/10] Add serverResourceShared for streaming shared sockets --- io/src/main/scala/fs2/io/net/Network.scala | 12 ++++++------ io/src/main/scala/fs2/io/net/SocketGroup.scala | 6 ++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/io/src/main/scala/fs2/io/net/Network.scala b/io/src/main/scala/fs2/io/net/Network.scala index 41310f33f2..ad3e9381c0 100644 --- a/io/src/main/scala/fs2/io/net/Network.scala +++ b/io/src/main/scala/fs2/io/net/Network.scala @@ -141,12 +141,12 @@ object Network { ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = globalSocketGroup.serverResource(address, port, options) - // def serverResourceShared( - // address: Option[Host], - // port: Option[Port], - // options: List[SocketOption] - // ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = - // globalSocketGroup.serverResourceShared(address, port, options) + def serverResourceShared( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] = + globalSocketGroup.serverResourceShared(address, port, options) def openDatagramSocket( address: Option[Host], diff --git a/io/src/main/scala/fs2/io/net/SocketGroup.scala b/io/src/main/scala/fs2/io/net/SocketGroup.scala index 06dcd585ae..4b2e0cfc8c 100644 --- a/io/src/main/scala/fs2/io/net/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/net/SocketGroup.scala @@ -77,6 +77,12 @@ trait SocketGroup[F[_]] { port: Option[Port] = None, options: List[SocketOption] = List.empty ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] + + def serverResourceShared( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Shared[F, Socket[F]]])] } private[net] object SocketGroup {