Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racing DSL #3411

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft

Racing DSL #3411

wants to merge 8 commits into from

Conversation

nomisRev
Copy link
Member

@nomisRev nomisRev commented Apr 25, 2024

This PR proposes a small racing DSL, it comes up several times in discussions and comparison with different ecosystems.

Racing and stopping at the first value is desired in many cases, often lower level or UI development. This kind of racing is often a 'forever' kind-of operation, versus an actual operation.

  • UI example: race(::showLoader, ::loadData) where you want to cancel showLoader when the data is loaded.
  • Low-level example: race(interruption, operation) where operation needs to be cancelled when interruption is completed.

This however doesn't represent what we often want to do in application code, which is get successful results. So another popular use-case for racing is fetching data from x sources in parallel, and finish on the first successful results. That also means that if a source cannot be reached, we want to ignore that exception.

Having to manually facilitate the second use-case on top of the first one is a bit annoying, and boilerplatey.

This DSL aims to solve that by serving both use-cases, and allowing more flexibility to combine two techniques into a single program. It uses a DSL, so allows for more efficient re-using of the underlying state machines from KotlinX and fits right into the rest of Arrow (and Kotlin, including context parameters).

I propose to not remove raceN, but to let it exist alongside the new racing DSL. We currently only support race2, and race3 so the combined amount of code is not problematic IMO.

TODO

  • Test suite

@nomisRev nomisRev requested review from serras, kyay10 and a team April 25, 2024 08:33
@nomisRev nomisRev added the 2.0.0 Tickets / PRs belonging to Arrow 2.0 label Apr 25, 2024
Copy link
Contributor

github-actions bot commented Apr 25, 2024

Kover Report

File Coverage [0.00%]
arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt 0.00%
Total Project Coverage 45.84%

try {
block()
} catch (e: RaiseCancellationException) {
// `Raise<E>` error is ignored... Can we do better here?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe raise shouldn't be considered a failure? Perhaps raising in such a way is conceptually the same as returning a value. Alternatively, we can have a raceOrRaise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe raise shouldn't be considered a failure? Perhaps raising in such a way is conceptually the same as returning a value. Alternatively, we can have a raceOrRaise

Hmm. That kind-of makes sense, but I'd still expect user to want to 'select' E and not A but supporting both would be great.

So how do you imagine the API:

  • race (first A)
  • raceOrRaise ( first A or E)
  • raceOrThrow (first A, E or Throwable)

?? Do we need something that 'selects' first A or Throwable?

Potentially we can do a different extension when you reside within Raise, so an extension Raise<E>.racing(...) but then it's probably still possible to easily select the non-Raise variant 🤔.

suspend fun Raise<E>.racing(...): A
suspend fun racing(..): A

This probably doesn't yield the benefit we'd want since the non-Raise racing would get suggested when Raise doesn't match or Raise.racing has an additional param.

@nomisRev nomisRev changed the title Small racing DSL Racing DSL Apr 30, 2024
@nomisRev nomisRev added 2.x.x and removed 2.0.0 Tickets / PRs belonging to Arrow 2.0 labels Oct 2, 2024
@nomisRev
Copy link
Member Author

nomisRev commented Oct 2, 2024

@serras I added a 2.x.x label, and this can come after 2.0.0. Since it's a new API/DSL.

@kyay10
Copy link
Collaborator

kyay10 commented Nov 3, 2024

Can we somehow integrate this with SelectBuilder directly? We can get the exception handler from coroutine context. Maybe we'd need contexts to access CoroutineScope? Food for thought

@kyay10
Copy link
Collaborator

kyay10 commented Nov 3, 2024

Not sure whether to love or hate what I've created:

/**
 * A DSL that allows racing many `suspend` functions in parallel against each-other,
 * it yields a final result of [A] based on the first function that yields a result.
 * A racer can yield a result based on [race], or [raceOrFail].
 *
 * [race] will call the current [CoroutineExceptionHandler] in case of an exception,
 * and then await **another successful result** but not cancel the race. Whilst [raceOrFail] will cancel the race,
 * and rethrow the exception that occurred and thus cancel the race and all participating racers.
 *
 * <!--- INCLUDE
 * import arrow.fx.coroutines.race
 * import arrow.fx.coroutines.raceOrFail
 * import kotlinx.coroutines.coroutineScope
 * import kotlinx.coroutines.delay
 * import kotlinx.coroutines.selects.select
 * -->
 * ```kotlin
 * suspend fun winner(): String = coroutineScope {
 *   select {
 *     race { delay(1000); "Winner" }
 *     race { throw RuntimeException("Loser") }
 *   }
 * } // Winner (logged RuntimeException)
 *
 * suspend fun winner2(): String = coroutineScope {
 *   select {
 *     race { delay(1000); "Winner" }
 *     raceOrFail { throw RuntimeException("Loser") }
 *   }
 * } // RuntimeException
 * ```
 *
 * **Important:** a racing program with no racers will hang forever.
 * ```kotlin
 * suspend fun never(): Nothing = select { }
 * ```
 * <!--- KNIT example-racing-01.kt -->
 *
 * @param block the body of the DSL that describes the racing logic
 * @return the winning value of [A].
 */
public val <A> SelectBuilder<A>.raceOrFail: CoroutineScope.(suspend CoroutineScope.() -> A) -> Unit get () = { block ->
  /* First we create a lazy racer,
   * and we add it in front of the existing racers such that we maintain correct order.
   * After we've successfully registered the racer, we check for race conditions,
   * and 'start' racing.
   */
  val racer = async(
    start = CoroutineStart.LAZY,
    block = block
  )
  if (isActive) {
    require(racer.start()) { "Racer not started" }
    racer.onAwait(::identity)
  }
}

public val <A> SelectBuilder<A>.race: CoroutineScope.(suspend CoroutineScope.() -> A) -> Unit get() = {  block ->
  raceOrFail {
    try {
      block()
    } catch (e: Throwable) {
      currentExceptionHandler().handleException(currentCoroutineContext(), e.nonFatalOrThrow())
      awaitCancellation()
    }
  }
}

The idea is simple: no custom receiver type. Instead, we want to have in context a CoroutineScope and a SelectBuilder, and then the race and raceOrFail simply need them in context. We can thus have racing simply bring in those contexts. To have this work without contexts, the user has to call select and coroutineScope on their own (see examples), but then we take both contexts through some weird currying.

@kyay10
Copy link
Collaborator

kyay10 commented Nov 4, 2024

That previous comment forgot to cancel the other deferreds when the selection is over, which sounds suspiciously similar to releasing a resource:

public suspend fun <A> racing(block: suspend RacingScope<A>.() -> A): A =
  coroutineScope {
    resourceScope {
      val scope = DefaultRacingScope<A>(this@coroutineScope, this)
      block(scope)
    }
  }

/**
 * A DSL that allows racing many `suspend` functions in parallel against each-other,
 * it yields a final result of [A] based on the first function that yields a result.
 * A racer can yield a result based on [race], or [raceOrFail].
 *
 * [race] will call the current [CoroutineExceptionHandler] in case of an exception,
 * and then await **another successful result** but not cancel the race. Whilst [raceOrFail] will cancel the race,
 * and rethrow the exception that occurred and thus cancel the race and all participating racers.
 *
 * <!--- INCLUDE
 * import arrow.fx.coroutines.racing
 * import kotlinx.coroutines.delay
 * import kotlinx.coroutines.selects.select
 * -->
 * ```kotlin
 * suspend fun winner(): String = racing {
 *   select {
 *     race { delay(1000); "Winner" }
 *     race { throw RuntimeException("Loser") }
 *   }
 * } // Winner (logged RuntimeException)
 *
 * suspend fun winner2(): String = racing {
 *   select {
 *     race { delay(1000); "Winner" }
 *     raceOrFail { throw RuntimeException("Loser") }
 *   }
 * } // RuntimeException
 * ```
 *
 * **Important:** a racing program with no racers will hang forever.
 * ```kotlin
 * suspend fun never(): Nothing = racing { select { } }
 * ```
 * <!--- KNIT example-racing-01.kt -->
 *
 * @param block the body of the DSL that describes the racing logic
 * @return the winning value of [A].
 */
public interface RacingScope<A> {
  public fun SelectBuilder<A>.race(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> A
  ): Unit = raceOrFail {
    try {
      block()
    } catch (e: Throwable) {
      currentExceptionHandler().handleException(currentCoroutineContext(), e.nonFatalOrThrow())
      awaitCancellation()
    }
  }

  public fun SelectBuilder<A>.raceOrFail(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> A
  )
}

public class DefaultRacingScope<A>(private val coroutineScope: CoroutineScope, private val resourceScope: ResourceScope) : RacingScope<A> {
  override fun SelectBuilder<A>.raceOrFail(context: CoroutineContext, block: suspend CoroutineScope.() -> A) {
    /* First we create a lazy racer,
    * and we add it in front of the existing racers such that we maintain correct order.
    * After we've successfully registered the racer, we check for race conditions,
    * and 'start' racing.
    */
    val racer = coroutineScope.async(
      start = CoroutineStart.LAZY,
      block = block
    )
    resourceScope.onRelease { racer.cancelAndJoin() }
    if (coroutineScope.isActive) {
      require(racer.start()) { "Racer not started" }
      racer.onAwait(::identity)
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants