From b2f177088006513d8b020f59ba4665a4e34a1852 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 30 Aug 2022 03:35:46 +0000 Subject: [PATCH 1/3] Prevent socket leaks due to post-open exceptions --- .../fs2/io/net/SocketGroupPlatform.scala | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index 25c5e5bb20..52cecf3442 100644 --- a/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -48,12 +48,13 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, Socket[F]] = { def setup: Resource[F, AsynchronousSocketChannel] = - Resource.make(Async[F].delay { - val ch = - AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup) - options.foreach(opt => ch.setOption(opt.key, opt.value)) - ch - })(ch => Async[F].delay(if (ch.isOpen) ch.close else ())) + Resource + .make( + Async[F].delay( + AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup) + ) + )(ch => Async[F].delay(if (ch.isOpen) ch.close else ())) + .evalTap(ch => Async[F].delay(options.foreach(opt => ch.setOption(opt.key, opt.value)))) def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] = to.resolve[F].flatMap { ip => @@ -80,24 +81,27 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = { - val setup: F[AsynchronousServerSocketChannel] = - address.traverse(_.resolve[F]).flatMap { addr => - Async[F].delay { - val ch = - AsynchronousChannelProvider.provider.openAsynchronousServerSocketChannel(channelGroup) - ch.bind( - new InetSocketAddress( - addr.map(_.toInetAddress).orNull, - port.map(_.value).getOrElse(0) + val setup: Resource[F, AsynchronousServerSocketChannel] = + Resource.eval(address.traverse(_.resolve[F])).flatMap { addr => + Resource + .make( + Async[F].delay( + AsynchronousChannelProvider.provider + .openAsynchronousServerSocketChannel(channelGroup) + ) + )(sch => Async[F].delay(if (sch.isOpen) sch.close())) + .evalTap(ch => + Async[F].delay( + ch.bind( + new InetSocketAddress( + addr.map(_.toInetAddress).orNull, + port.map(_.value).getOrElse(0) + ) + ) ) ) - ch - } } - def cleanup(sch: AsynchronousServerSocketChannel): F[Unit] = - Async[F].delay(if (sch.isOpen) sch.close()) - def acceptIncoming( sch: AsynchronousServerSocketChannel ): Stream[F, Socket[F]] = { @@ -137,7 +141,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => } } - Resource.make(setup)(cleanup).map { sch => + setup.map { sch => val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress] val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress) (localAddress, acceptIncoming(sch)) From 8c617c5d71e77cad50e3370f72e7606d44925e49 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 30 Aug 2022 03:54:08 +0000 Subject: [PATCH 2/3] ... and the same for JS --- .../scala/fs2/io/net/SocketGroupPlatform.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index 994107f4e9..e866c5e933 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -52,12 +52,18 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, Socket[F]] = (for { - sock <- F - .delay( - new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) + sock <- Resource + .make( + F.delay( + new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) + ) + )(sock => + F.delay { + if (!sock.destroyed) + sock.destroy() + } ) - .flatTap(setSocketOptions(options)) - .toResource + .evalTap(setSocketOptions(options)) socket <- Socket.forAsync(sock) _ <- F .async[Unit] { cb => From 44738aa542982d4f9e3f6be0af1a4b4b0bae9238 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 30 Aug 2022 04:01:54 +0000 Subject: [PATCH 3/3] Fix another one --- .../io/net/unixsocket/UnixSocketsPlatform.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala index be71523c9b..198852d76b 100644 --- a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala @@ -41,15 +41,22 @@ private[unixsocket] trait UnixSocketsCompanionPlatform { override def client(address: UnixSocketAddress): Resource[F, Socket[F]] = Resource - .eval(for { - socket <- F.delay( + .make( + F.delay( new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) ) - _ <- F.async_[Unit] { cb => + )(socket => + F.delay { + if (!socket.destroyed) + socket.destroy() + } + ) + .evalTap { socket => + F.async_[Unit] { cb => socket.connect(address.path, () => cb(Right(()))) () } - } yield socket) + } .flatMap(Socket.forAsync[F]) override def server(