Skip to content

Commit

Permalink
Merge branch 'master' into update/zio-2.0.19
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman authored Nov 12, 2023
2 parents b29a1dd + 357652d commit d26824e
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 279 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
build:
strategy:
matrix:
scalaVersion: ["2.13.11", "3.3.1"]
scalaVersion: ["2.13.12", "3.3.1"]
runs-on: ubuntu-22.04

steps:
Expand Down Expand Up @@ -44,7 +44,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-22.04]
scala: [2.13.11]
scala: [2.13.12]
java: [[email protected]]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
runner.dialect = scala213
version = "3.7.15"
version = "3.7.16"
maxColumn = 140
trailingCommas = always
align.preset = most
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.raquo.airstream.ownership.internalcolibri

import com.raquo.airstream.ownership._
import scala.scalajs.js
import com.raquo.ew.JsArray

object NoopOwner extends Owner {
override protected[this] val subscriptions: js.Array[Subscription] = null
override protected[this] val subscriptions: JsArray[Subscription] = null
override protected[this] def killSubscriptions(): Unit = ()
override protected[this] def onOwned(subscription: Subscription): Unit = ()
override private[ownership] def onKilledExternally(subscription: Subscription): Unit = ()
Expand Down
17 changes: 7 additions & 10 deletions airstream/src/main/scala/colibri/ext/airstream/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package colibri.ext

import com.raquo.airstream.core.{Observable, Observer}
import com.raquo.airstream.custom.{CustomSource, CustomStreamSource}
import com.raquo.airstream.core.{Observable, Observer, EventStream}
import com.raquo.airstream.ownership.Subscription
import com.raquo.airstream.ownership.internalcolibri.NoopOwner

Expand Down Expand Up @@ -30,14 +29,12 @@ package object airstream {

implicit object liftSource extends colibri.LiftSource[Observable] {
def lift[H[_]: colibri.Source, A](source: H[A]): Observable[A] = {
val stream = CustomStreamSource[A] { (fireValue, fireError, _, _) =>
var cancelable = colibri.Cancelable.empty
CustomSource.Config(
onStart = () => cancelable = colibri.Source[H].unsafeSubscribe(source)(colibri.Observer.create(fireValue, fireError)),
onStop = () => cancelable.unsafeCancel(),
)
}
stream
var cancelable = colibri.Cancelable.empty
EventStream.fromCustomSource[A](
start = (fireValue, fireError, _, _) =>
cancelable = colibri.Source[H].unsafeSubscribe(source)(colibri.Observer.create(fireValue, fireError)),
stop = _ => cancelable.unsafeCancel(),
)
}
}

Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ inThisBuild(
)

lazy val commonSettings = Seq(
crossScalaVersions := Seq("2.13.11", "3.3.1"),
scalaVersion := "2.13.11",
crossScalaVersions := Seq("2.13.12", "3.3.1"),
scalaVersion := "2.13.12",
libraryDependencies ++= (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((3, _)) => Seq.empty
case _ =>
Expand Down Expand Up @@ -110,7 +110,7 @@ lazy val rx = project
.settings(commonSettings)
.settings(
name := "colibri-rx",
crossScalaVersions := Seq("2.13.11"), // no scala3, because scala.rx uses scala2 macros
crossScalaVersions := Seq("2.13.12"), // no scala3, because scala.rx uses scala2 macros
libraryDependencies ++= Seq(
"com.lihaoyi" %% "scalarx" % "0.4.3",
),
Expand All @@ -124,7 +124,7 @@ lazy val airstream = project
.settings(
name := "colibri-airstream",
libraryDependencies ++= Seq(
"com.raquo" %%% "airstream" % "0.14.5",
"com.raquo" %%% "airstream" % "16.0.0",
),
)

Expand Down
3 changes: 0 additions & 3 deletions colibri/src/main/scala/colibri/CanCancel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package colibri

trait CanCancel[-T] {
def unsafeCancel(cancelable: T): Unit

@deprecated("Use unsafeCancel instead", "0.2.7")
@inline final def cancel(cancelable: T): Unit = unsafeCancel(cancelable)
}
object CanCancel {
@inline def apply[T](implicit unsafeCancel: CanCancel[T]): CanCancel[T] = unsafeCancel
Expand Down
3 changes: 0 additions & 3 deletions colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ trait Cancelable {
def isEmpty(): Boolean
def unsafeCancel(): Unit

@deprecated("Use unsafeCancel() instead", "0.2.7")
@inline final def cancel(): Unit = unsafeCancel()

final def cancelF[F[_]: Sync]: F[Unit] = Sync[F].delay(unsafeCancel())
final def cancelIO: IO[Unit] = cancelF[IO]
final def cancelSyncIO: SyncIO[Unit] = cancelF[SyncIO]
Expand Down
48 changes: 9 additions & 39 deletions colibri/src/main/scala/colibri/Connectable.scala
Original file line number Diff line number Diff line change
@@ -1,55 +1,25 @@
package colibri

final class Connectable[+T] private (val value: T, val connect: () => Cancelable) {
def map[A](f: T => A): Connectable[A] = new Connectable(f(value), connect)
final class Connectable[+T] private (val value: T, val unsafeConnect: () => Cancelable) {
def map[A](f: T => A): Connectable[A] = new Connectable(f(value), unsafeConnect)
def flatMap[A](f: T => Connectable[A]): Connectable[A] = {
val connectable = f(value)
new Connectable(connectable.value, () => Cancelable.composite(connect(), connectable.connect()))
new Connectable(connectable.value, () => Cancelable.composite(unsafeConnect(), connectable.unsafeConnect()))
}
}
object Connectable {
def apply[T](value: T, connect: () => Cancelable) = {
val cancelable = Cancelable.refCount(connect)
def apply[T](value: T, unsafeConnect: () => Cancelable) = {
val cancelable = Cancelable.refCount(unsafeConnect)
new Connectable(value, cancelable.ref)
}

@inline implicit class ConnectableObservableOperations[A](val source: Connectable[Observable[A]]) extends AnyVal {
def refCount: Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect())
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
@deprecated("Use unsafeHot instead", "0.5.0")
def hot: Observable.Hot[A] = unsafeHot()
def unsafeHot(): Observable.Hot[A] = new Observable.Hot[A] {
val cancelable = source.connect()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
}
}

@inline implicit class ConnectableObservableValueOperations[A](val source: Connectable[Observable.Value[A]]) extends AnyVal {
def refCount: Observable.Value[A] = new Observable.Value[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotValue[A] = unsafeHot()
def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] {
val cancelable = source.connect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
}
}

@inline implicit class ConnectableObservableMaybeValueOperations[A](val source: Connectable[Observable.MaybeValue[A]]) extends AnyVal {
def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotMaybeValue[A] = unsafeHot()
def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] {
val cancelable = source.connect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
def unsafeHot(): Observable[A] = {
val _ = source.unsafeConnect()
source.value
}
}
}
Loading

0 comments on commit d26824e

Please sign in to comment.