Skip to content

Commit 5830cb2

Browse files
committed
Minimize changes
1 parent 1612af4 commit 5830cb2

File tree

4 files changed

+19
-22
lines changed

4 files changed

+19
-22
lines changed

shared/src/main/scala/async/AsyncOperations.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package gears.async
22

33
import language.experimental.captureChecking
44

5+
import gears.async.AsyncOperations.sleep
6+
57
import java.util.concurrent.TimeoutException
68
import scala.concurrent.duration.FiniteDuration
79

@@ -19,14 +21,14 @@ object AsyncOperations:
1921
* @param millis
2022
* The duration to suspend, in milliseconds. Must be a positive integer.
2123
*/
22-
def sleep(millis: Long)(using AsyncOperations, Async): Unit =
24+
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
2325
summon[AsyncOperations].sleep(millis)
2426

2527
/** Suspends the current [[Async]] context for `duration`.
2628
* @param duration
2729
* The duration to suspend. Must be positive.
2830
*/
29-
def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
31+
inline def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
3032
sleep(duration.toMillis)
3133

3234
/** Runs `op` with a timeout. When the timeout occurs, `op` is cancelled through the given [[Async]] context, and
@@ -36,7 +38,7 @@ def withTimeout[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOperatio
3638
Async.group:
3739
Async.select(
3840
Future(op).handle(_.get),
39-
Future(AsyncOperations.sleep(timeout)).handle: _ =>
41+
Future(sleep(timeout)).handle: _ =>
4042
throw TimeoutException()
4143
)
4244

@@ -47,5 +49,5 @@ def withTimeoutOption[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOp
4749
Async.group:
4850
Async.select(
4951
Future(op).handle(v => Some(v.get)),
50-
Future(AsyncOperations.sleep(timeout)).handle(_ => None)
52+
Future(sleep(timeout)).handle(_ => None)
5153
)

shared/src/main/scala/async/Cancellable.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@ package gears.async
22

33
import language.experimental.captureChecking
44

5-
import java.util.concurrent.atomic.AtomicLong
6-
75
/** A trait for cancellable entities that can be grouped. */
86
trait Cancellable:
9-
val id = Cancellable.Id()
107
private var group: CompletionGroup = CompletionGroup.Unlinked
118

129
/** Issue a cancel request */
@@ -15,9 +12,9 @@ trait Cancellable:
1512
/** Add this cancellable to the given group after removing it from the previous group in which it was.
1613
*/
1714
def link(group: CompletionGroup): this.type = synchronized:
18-
this.group.drop(this)
15+
this.group.drop(this.unsafeAssumePure)
1916
this.group = group
20-
this.group.add(this)
17+
this.group.add(this.unsafeAssumePure)
2118
this
2219

2320
/** Link this cancellable to the cancellable group of the current async context.
@@ -29,14 +26,13 @@ trait Cancellable:
2926
def unlink(): this.type =
3027
link(CompletionGroup.Unlinked)
3128

29+
/** Assume that the [[Cancellable]] is pure, in the case that cancellation does *not* refer to captured resources.
30+
*/
31+
inline def unsafeAssumePure: Cancellable = caps.unsafe.unsafeAssumePure(this)
32+
3233
end Cancellable
3334

3435
object Cancellable:
35-
opaque type Id = Long
36-
private object Id:
37-
private val gen = AtomicLong(0)
38-
def apply(): Id = gen.incrementAndGet()
39-
4036
/** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */
4137
trait Tracking extends Cancellable:
4238
def isCancelled: Boolean

shared/src/main/scala/async/CompletionGroup.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ import scala.collection.mutable
55
import scala.util.Success
66

77
import Future.Promise
8-
import scala.annotation.unchecked.uncheckedCaptures
98

109
/** A group of cancellable objects that are completed together. Cancelling the group means cancelling all its
1110
* uncompleted members.
1211
*/
1312
class CompletionGroup extends Cancellable.Tracking:
14-
private val members: mutable.Set[(Cancellable^) @uncheckedCaptures] = mutable.Set[(Cancellable^) @uncheckedCaptures]()
13+
private val members: mutable.Set[Cancellable] = mutable.Set()
1514
private var canceled: Boolean = false
1615
private var cancelWait: Option[Promise[Unit]] = None
1716

@@ -32,14 +31,14 @@ class CompletionGroup extends Cancellable.Tracking:
3231
unlink()
3332

3433
/** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */
35-
def add(member: Cancellable^): Unit =
34+
def add(member: Cancellable): Unit =
3635
val alreadyCancelled = synchronized:
3736
members += member // Add this member no matter what since we'll wait for it still
3837
canceled
3938
if alreadyCancelled then member.cancel()
4039

4140
/** Remove given member from the members set if it is an element */
42-
def drop(member: Cancellable^): Unit = synchronized:
41+
def drop(member: Cancellable): Unit = synchronized:
4342
members -= member
4443
if members.isEmpty && cancelWait.isDefined then cancelWait.get.complete(Success(()))
4544

@@ -53,8 +52,8 @@ object CompletionGroup:
5352
object Unlinked extends CompletionGroup:
5453
override def cancel(): Unit = ()
5554
override def waitCompletion()(using Async): Unit = ()
56-
override def add(member: Cancellable^): Unit = ()
57-
override def drop(member: Cancellable^): Unit = ()
55+
override def add(member: Cancellable): Unit = ()
56+
override def drop(member: Cancellable): Unit = ()
5857
end Unlinked
5958

6059
end CompletionGroup

shared/src/main/scala/async/Listener.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ trait Listener[-T]:
5252

5353
object Listener:
5454
/** A simple [[Listener]] that always accepts the item and sends it to the consumer. */
55-
def acceptingListener[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} =
55+
/* inline bug */ def acceptingListener[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} =
5656
new Listener[T]:
5757
val lock = null
5858
def complete(data: T, source: SourceSymbol[T]) = consumer(data, source)
@@ -64,7 +64,7 @@ object Listener:
6464
* [[Async.Source.dropListener]] these listeners are compared for equality by the hash of the source and the inner
6565
* listener.
6666
*/
67-
abstract case class ForwardingListener[T](src: Async.Source[?]^, inner: Listener[?]^) extends Listener[T]
67+
abstract case class ForwardingListener[-T](src: Async.Source[?]^, inner: Listener[?]^) extends Listener[T]
6868

6969
object ForwardingListener:
7070
/** Creates an empty [[ForwardingListener]] for equality comparison. */

0 commit comments

Comments
 (0)