Skip to content

Commit

Permalink
Use Dispatcher and tagless instead of direct IORuntime operations (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Feb 15, 2025
1 parent e940573 commit 1985ae8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 51 deletions.
18 changes: 18 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version = 3.8.6
runner.dialect = scala3
preset=defaultWithAlign

maxColumn = 110

indent.main = 2
indent.defnSite = 2
indent.ctrlSite = 2

docstrings.style = Asterisk

newlines.implicitParamListModifierPrefer = before
newlines.selectChains.style = keep

rewrite.rules = [Imports, RedundantBraces, PreferCurlyFors]
rewrite.imports.sort = scalastyle
rewrite.trailingCommas.style = "multiple"
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package io.github.cats_effect_simple_di

import cats.effect.IO
import cats.Applicative
import cats.effect.{IO, Sync}

import scala.reflect.ClassTag

trait AllocationLifecycleListener {
def onInit[A: ClassTag](a: A): IO[Unit]
trait AllocationLifecycleListener[F[_]] {
def onInit[A: ClassTag](a: A): F[Unit]

def onShutdown[A: ClassTag](a: A): IO[Unit]
def onShutdown[A: ClassTag](a: A): F[Unit]
}

object NoOpListener extends AllocationLifecycleListener {
override def onInit[A: ClassTag](a: A): IO[Unit] = IO.unit
class NoOpListener[F[_]: Applicative] extends AllocationLifecycleListener[F] {
override def onInit[A: ClassTag](a: A): F[Unit] = Applicative[F].unit

override def onShutdown[A: ClassTag](a: A): IO[Unit] = IO.unit
override def onShutdown[A: ClassTag](a: A): F[Unit] = Applicative[F].unit
}

object LogbackAllocationListener extends AllocationLifecycleListener {
class LogbackAllocationListener[F[_]: Sync] extends AllocationLifecycleListener[F] {

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

private val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val logger: Logger[F] = Slf4jLogger.getLogger[F]

override def onInit[A: ClassTag](a: A): IO[Unit] =
override def onInit[A: ClassTag](a: A): F[Unit] =
logger.info(s"Allocated $a")

override def onShutdown[A: ClassTag](a: A): IO[Unit] =
override def onShutdown[A: ClassTag](a: A): F[Unit] =
logger.info(s"Shutting down $a")
}
}
76 changes: 42 additions & 34 deletions src/main/scala/io/github/cats_effect_simple_di/Allocator.scala
Original file line number Diff line number Diff line change
@@ -1,52 +1,60 @@
package io.github.cats_effect_simple_di

import cats.effect.implicits.*
import cats.effect.std.Dispatcher
import cats.effect.unsafe.IORuntime
import cats.effect.{IO, Ref, Resource}
import cats.effect.*
import cats.implicits.*

import scala.reflect.ClassTag

object Allocator {

def create(
runtime: IORuntime,
listener: AllocationLifecycleListener = NoOpListener,
): Resource[IO, Allocator] = {
val acquire = IO(unsafeCreate(runtime, listener))
val release = (a: Allocator) => a.shutdownAll
Resource.make(acquire)(release)
}
def create[F[_]: Async](): Resource[F, Allocator[F]] =
for
dispatcher <- Dispatcher.parallel[F]
shutdownRef <- Ref.of(Async[F].unit).toResource
allocator <- {
val acquire = Async[F].delay(Allocator(dispatcher, shutdownRef, NoOpListener[F]))
val release = (a: Allocator[F]) => a.shutdownAll

Resource.make(acquire)(release)
}
yield allocator

/**
* When using this method you must call [[shutdownAll]] manually after you finished working with dependencies.
* When using this method you must call [[shutdownAll]] manually after you finished working with
* dependencies.
*/
def unsafeCreate(
runtime: IORuntime,
listener: AllocationLifecycleListener = NoOpListener,
): Allocator =
new Allocator(runtime, listener)
def unsafeCreate(runtime: IORuntime): Allocator[IO] =
create[IO]().allocated.unsafeRunSync()(runtime)._1

}

class Allocator private(
runtime: IORuntime,
listener: AllocationLifecycleListener,
class Allocator[F[_]: Sync] private (
dispatcher: Dispatcher[F],
shutdown: Ref[F, F[Unit]],
listener: AllocationLifecycleListener[F],
) {

private val shutdown: Ref[IO, IO[Unit]] = Ref.unsafe(IO.unit)

def allocate[A: ClassTag](resource: Resource[IO, A]): A =
resource
.allocated
.flatMap { case (a, release) =>
listener.onInit(a) *>
shutdown.update(listener.onShutdown(a) *> release *> _) *> // Shutdown this resource, and after shutdown all previous
IO.pure(a)
}.unsafeRunSync()(runtime)

def allocate[A: ClassTag](io: IO[A]): A = allocate(io.toResource)

def shutdownAll: IO[Unit] = {
shutdown.getAndSet(IO.unit).flatten
}
def withListener(listener: AllocationLifecycleListener[F]): Allocator[F] =
new Allocator(dispatcher, shutdown, listener)

def allocate[A: ClassTag](resource: Resource[F, A]): A =
dispatcher.unsafeRunSync {
resource
.allocated
.flatMap { (acquired, release) =>
listener.onInit(acquired) *>
shutdown.update(listener.onShutdown(acquired) *> release *> _) *>
// Shutdown this resource, and after shutdown all previous
Sync[F].pure(acquired)
}
}

def allocate[A: ClassTag](fa: F[A]): A = allocate(fa.toResource)

def shutdownAll: F[Unit] =
shutdown.getAndSet(Sync[F].unit).flatten

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class AllocatorTest extends AnyFlatSpec {

trait ctx {
object TestDependencies {
def apply(runtime: IORuntime): Resource[IO, TestDependencies] = {
Allocator.create(runtime, LogbackAllocationListener)
def apply(runtime: IORuntime): Resource[IO, TestDependencies] =
Allocator.create[IO]()
.map(_.withListener(new LogbackAllocationListener[IO]))
.map(new TestDependencies(_))
}

val shutdownOrderCapturer: Ref[IO, Seq[String]] = Ref.unsafe(Seq.empty)
}

class TestDependencies(allocator: Allocator) {
class TestDependencies(allocator: Allocator[IO]) {

import TestDependencies.*

Expand All @@ -39,7 +39,7 @@ class AllocatorTest extends AnyFlatSpec {

"Allocator" should "allocate a resource" in new ctx {
val testDependencies = TestDependencies(global)
val testResource = testDependencies.use { deps =>
val testResource = testDependencies.use { deps =>
IO.pure(deps.testResourceA)
}.unsafeRunSync()

Expand Down

0 comments on commit 1985ae8

Please sign in to comment.