1
1
package chrome .interop
2
2
3
3
import _root_ .fs2 ._
4
- import _root_ .fs2 .util ._
5
- import _root_ .fs2 .util .syntax ._
6
4
import _root_ .fs2 .async .mutable .Queue
5
+ import cats .effect .{Async , Effect , IO }
6
+ import cats .effect .implicits ._
7
+ import cats .implicits ._
7
8
import chrome .events .bindings .Event
8
9
10
+ import scala .concurrent .ExecutionContext
9
11
import scala .language .higherKinds
10
-
11
12
import scala .scalajs .js
12
13
13
14
package object fs2 {
14
15
15
16
implicit class Event1FS2Ops [T1 ](val event : Event [js.Function1 [T1 , _]])
16
17
extends AnyVal {
17
18
18
- def single [F [_]: Async : Suspendable ]: F [T1 ] =
19
- Async [F ].async[T1 ] { callback =>
20
- Suspendable [F ].delay {
19
+ def single [F [_]: Effect ]: F [T1 ] =
20
+ Effect [F ].async[T1 ] { callback =>
21
+ Effect [F ].delay {
21
22
var fn = (t : T1 ) => ()
22
23
fn = t => {
23
24
event.removeListener(fn)
@@ -27,15 +28,15 @@ package object fs2 {
27
28
}
28
29
}
29
30
30
- def toStream [F [_]: Async ] : Stream [F , T1 ] =
31
+ def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , T1 ] =
31
32
toStream(async.unboundedQueue[F , T1 ])
32
33
33
- def toStream [F [_]: Async ](queue : F [Queue [F , T1 ]]): Stream [F , T1 ] = {
34
- Stream .bracket{
34
+ def toStream [F [_]: Effect ](queue : F [Queue [F , T1 ]]): Stream [F , T1 ] = {
35
+ Stream .bracket {
35
36
queue.map { q =>
36
- val callback = (t : T1 ) => q.offer1(t).unsafeRunAsync(_ => ())
37
+ val callback = (t : T1 ) => q.offer1(t).runAsync(_ => IO .unit). unsafeRunAsync(_ => ())
37
38
event.addListener(callback)
38
- val release = Suspendable [F ].delay(event.removeListener(callback))
39
+ val release = Async [F ].delay(event.removeListener(callback))
39
40
(q, release)
40
41
}
41
42
}(_._1.dequeue, _._2)
@@ -46,9 +47,9 @@ package object fs2 {
46
47
implicit class Event2FS2Ops [T1 , T2 ](val event : Event [js.Function2 [T1 , T2 , _]])
47
48
extends AnyVal {
48
49
49
- def single [F [_]: Async : Suspendable ]: F [(T1 , T2 )] =
50
- Async [F ].async[(T1 , T2 )] { callback =>
51
- Suspendable [F ].delay {
50
+ def single [F [_]: Effect ]: F [(T1 , T2 )] =
51
+ Effect [F ].async[(T1 , T2 )] { callback =>
52
+ Effect [F ].delay {
52
53
var fn = (t1 : T1 , t2 : T2 ) => ()
53
54
fn = (t1, t2) => {
54
55
event.removeListener(fn)
@@ -58,15 +59,15 @@ package object fs2 {
58
59
}
59
60
}
60
61
61
- def toStream [F [_]: Async ] : Stream [F , (T1 , T2 )] =
62
+ def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , (T1 , T2 )] =
62
63
toStream(async.unboundedQueue[F , (T1 , T2 )])
63
64
64
- def toStream [F [_]: Async ](queue : F [Queue [F , (T1 , T2 )]]): Stream [F , (T1 , T2 )] = {
65
- Stream .bracket{
65
+ def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 )]]): Stream [F , (T1 , T2 )] = {
66
+ Stream .bracket {
66
67
queue.map { q =>
67
- val callback = (t1 : T1 , t2 : T2 ) => q.offer1((t1, t2)).unsafeRunAsync(_ => ())
68
+ val callback = (t1 : T1 , t2 : T2 ) => q.offer1((t1, t2)).runAsync(_ => IO .unit). unsafeRunAsync(_ => ())
68
69
event.addListener(callback)
69
- val release = Suspendable [F ].delay(event.removeListener(callback))
70
+ val release = Async [F ].delay(event.removeListener(callback))
70
71
(q, release)
71
72
}
72
73
}(_._1.dequeue, _._2)
@@ -77,9 +78,9 @@ package object fs2 {
77
78
implicit class Event3FS2Ops [T1 , T2 , T3 ](val event : Event [js.Function3 [T1 , T2 , T3 , _]])
78
79
extends AnyVal {
79
80
80
- def single [F [_]: Async : Suspendable ]: F [(T1 , T2 , T3 )] =
81
- Async [F ].async[(T1 , T2 , T3 )] { callback =>
82
- Suspendable [F ].delay {
81
+ def single [F [_]: Effect ]: F [(T1 , T2 , T3 )] =
82
+ Effect [F ].async[(T1 , T2 , T3 )] { callback =>
83
+ Effect [F ].delay {
83
84
var fn = (t1 : T1 , t2 : T2 , t3 : T3 ) => ()
84
85
fn = (t1, t2, t3) => {
85
86
event.removeListener(fn)
@@ -89,15 +90,15 @@ package object fs2 {
89
90
}
90
91
}
91
92
92
- def toStream [F [_]: Async ] : Stream [F , (T1 , T2 , T3 )] =
93
+ def toStream [F [_]: Effect ]( implicit EC : ExecutionContext ) : Stream [F , (T1 , T2 , T3 )] =
93
94
toStream(async.unboundedQueue[F , (T1 , T2 , T3 )])
94
95
95
- def toStream [F [_]: Async ](queue : F [Queue [F , (T1 , T2 , T3 )]]): Stream [F , (T1 , T2 , T3 )] = {
96
- Stream .bracket{
96
+ def toStream [F [_]: Effect ](queue : F [Queue [F , (T1 , T2 , T3 )]]): Stream [F , (T1 , T2 , T3 )] = {
97
+ Stream .bracket {
97
98
queue.map { q =>
98
- val callback = (t1 : T1 , t2 : T2 , t3 : T3 ) => q.offer1((t1, t2, t3)).unsafeRunAsync(_ => ())
99
+ val callback = (t1 : T1 , t2 : T2 , t3 : T3 ) => q.offer1((t1, t2, t3)).runAsync(_ => IO .unit). unsafeRunAsync(_ => ())
99
100
event.addListener(callback)
100
- val release = Suspendable [F ].delay(event.removeListener(callback))
101
+ val release = Async [F ].delay(event.removeListener(callback))
101
102
(q, release)
102
103
}
103
104
}(_._1.dequeue, _._2)
0 commit comments