Skip to content

Commit

Permalink
Merge pull request #3701 from samspills/dispatcher-remove-alive
Browse files Browse the repository at this point in the history
Dispatcher: remove `alive` in favour of nullifying queues
  • Loading branch information
djspiewak authored Jun 24, 2023
2 parents 849c6f5 + b28a843 commit 9c00cf1
Showing 1 changed file with 52 additions and 58 deletions.
110 changes: 52 additions & 58 deletions std/shared/src/main/scala/cats/effect/std/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,22 @@ object Dispatcher {
supervisor <- Supervisor[F](await, Some((_: Outcome[F, Throwable, _]) => true))

_ <- {
def step(state: Array[AtomicReference[List[Registration]]], await: F[Unit]): F[Unit] =
def step(
state: Array[AtomicReference[List[Registration]]],
await: F[Unit],
doneR: AtomicBoolean): F[Unit] =
for {
done <- F.delay(doneR.get())
regs <- F delay {
val buffer = mutable.ListBuffer.empty[Registration]
var i = 0
while (i < workers) {
val st = state(i)
if (st.get() ne Nil) {
val list = st.getAndSet(Nil)
buffer ++= list.reverse // FIFO order here is a form of fairness
if (st.get() ne null) {
val list = if (done) st.getAndSet(null) else st.getAndSet(Nil)
if ((list ne null) && (list ne Nil)) {
buffer ++= list.reverse // FIFO order here is a form of fairness
}
}
i += 1
}
Expand Down Expand Up @@ -294,7 +300,7 @@ object Dispatcher {
F.delay(latch.set(Noop)) *> // reset latch
// if we're marked as done, yield immediately to give other fibers a chance to shut us down
// we might loop on this a few times since we're marked as done before the supervisor is canceled
F.delay(doneR.get()).ifM(F.cede, step(state, await))
F.delay(doneR.get()).ifM(F.cede, step(state, await, doneR))
}

0.until(workers).toList traverse_ { n =>
Expand All @@ -303,15 +309,11 @@ object Dispatcher {
val worker = dispatcher(doneR, latch, states(n))
val release = F.delay(latch.getAndSet(Open)())
Resource.make(supervisor.supervise(worker)) { _ =>
F.delay(doneR.set(true)) *> step(states(n), F.unit) *> release
F.delay(doneR.set(true)) *> step(states(n), F.unit, doneR) *> release
}
}
}
}

// Alive is the innermost resource so that when releasing
// the very first thing we do is set dispatcher to un-alive
alive <- Resource.make(F.delay(new AtomicBoolean(true)))(ref => F.delay(ref.set(false)))
} yield {
new Dispatcher[F] {
override def unsafeRunAndForget[A](fa: F[A]): Unit = {
Expand Down Expand Up @@ -361,65 +363,57 @@ object Dispatcher {
@tailrec
def enqueue(state: AtomicReference[List[Registration]], reg: Registration): Unit = {
val curr = state.get()
val next = reg :: curr

if (!state.compareAndSet(curr, next)) enqueue(state, reg)
if (curr eq null) {
throw new IllegalStateException("dispatcher already shutdown")
} else {
val next = reg :: curr
if (!state.compareAndSet(curr, next)) enqueue(state, reg)
}
}

if (alive.get()) {
val (state, lt) = if (workers > 1) {
val rand = ThreadLocalRandom.current()
val dispatcher = rand.nextInt(workers)
val inner = rand.nextInt(workers)
val (state, lt) = if (workers > 1) {
val rand = ThreadLocalRandom.current()
val dispatcher = rand.nextInt(workers)
val inner = rand.nextInt(workers)

(states(dispatcher)(inner), latches(dispatcher))
} else {
(states(0)(0), latches(0))
}
(states(dispatcher)(inner), latches(dispatcher))
} else {
(states(0)(0), latches(0))
}

val reg = Registration(action, registerCancel _)
enqueue(state, reg)
val reg = Registration(action, registerCancel _)
enqueue(state, reg)

if (lt.get() ne Open) {
val f = lt.getAndSet(Open)
f()
}
if (lt.get() ne Open) {
val f = lt.getAndSet(Open)
f()
}

val cancel = { () =>
reg.lazySet(false)

@tailrec
def loop(): Future[Unit] = {
val state = cancelState.get()
state match {
case CancelInit =>
val promise = Promise[Unit]()
if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) {
loop()
} else {
promise.future
}
case CanceledNoToken(promise) =>
val cancel = { () =>
reg.lazySet(false)

@tailrec
def loop(): Future[Unit] = {
val state = cancelState.get()
state match {
case CancelInit =>
val promise = Promise[Unit]()
if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) {
loop()
} else {
promise.future
case CancelToken(cancelToken) =>
cancelToken()
}
}
case CanceledNoToken(promise) =>
promise.future
case CancelToken(cancelToken) =>
cancelToken()
}

loop()
}

// double-check after we already put things in the structure
if (alive.get()) {
(promise.future, cancel)
} else {
// we were shutdown *during* the enqueue
cancel()
throw new IllegalStateException("dispatcher already shutdown")
}
} else {
throw new IllegalStateException("dispatcher already shutdown")
loop()
}

(promise.future, cancel)
}
}
}
Expand Down

0 comments on commit 9c00cf1

Please sign in to comment.