diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index 994107f4e9..e866c5e933 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -52,12 +52,18 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, Socket[F]] = (for { - sock <- F - .delay( - new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) + sock <- Resource + .make( + F.delay( + new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) + ) + )(sock => + F.delay { + if (!sock.destroyed) + sock.destroy() + } ) - .flatTap(setSocketOptions(options)) - .toResource + .evalTap(setSocketOptions(options)) socket <- Socket.forAsync(sock) _ <- F .async[Unit] { cb => diff --git a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala index be71523c9b..198852d76b 100644 --- a/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala @@ -41,15 +41,22 @@ private[unixsocket] trait UnixSocketsCompanionPlatform { override def client(address: UnixSocketAddress): Resource[F, Socket[F]] = Resource - .eval(for { - socket <- F.delay( + .make( + F.delay( new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true }) ) - _ <- F.async_[Unit] { cb => + )(socket => + F.delay { + if (!socket.destroyed) + socket.destroy() + } + ) + .evalTap { socket => + F.async_[Unit] { cb => socket.connect(address.path, () => cb(Right(()))) () } - } yield socket) + } .flatMap(Socket.forAsync[F]) override def server( diff --git a/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index 25c5e5bb20..52cecf3442 100644 --- a/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -48,12 +48,13 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, Socket[F]] = { def setup: Resource[F, AsynchronousSocketChannel] = - Resource.make(Async[F].delay { - val ch = - AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup) - options.foreach(opt => ch.setOption(opt.key, opt.value)) - ch - })(ch => Async[F].delay(if (ch.isOpen) ch.close else ())) + Resource + .make( + Async[F].delay( + AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup) + ) + )(ch => Async[F].delay(if (ch.isOpen) ch.close else ())) + .evalTap(ch => Async[F].delay(options.foreach(opt => ch.setOption(opt.key, opt.value)))) def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] = to.resolve[F].flatMap { ip => @@ -80,24 +81,27 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = { - val setup: F[AsynchronousServerSocketChannel] = - address.traverse(_.resolve[F]).flatMap { addr => - Async[F].delay { - val ch = - AsynchronousChannelProvider.provider.openAsynchronousServerSocketChannel(channelGroup) - ch.bind( - new InetSocketAddress( - addr.map(_.toInetAddress).orNull, - port.map(_.value).getOrElse(0) + val setup: Resource[F, AsynchronousServerSocketChannel] = + Resource.eval(address.traverse(_.resolve[F])).flatMap { addr => + Resource + .make( + Async[F].delay( + AsynchronousChannelProvider.provider + .openAsynchronousServerSocketChannel(channelGroup) + ) + )(sch => Async[F].delay(if (sch.isOpen) sch.close())) + .evalTap(ch => + Async[F].delay( + ch.bind( + new InetSocketAddress( + addr.map(_.toInetAddress).orNull, + port.map(_.value).getOrElse(0) + ) + ) ) ) - ch - } } - def cleanup(sch: AsynchronousServerSocketChannel): F[Unit] = - Async[F].delay(if (sch.isOpen) sch.close()) - def acceptIncoming( sch: AsynchronousServerSocketChannel ): Stream[F, Socket[F]] = { @@ -137,7 +141,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => } } - Resource.make(setup)(cleanup).map { sch => + setup.map { sch => val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress] val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress) (localAddress, acceptIncoming(sch))