Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rudimentary zio-stream support #113

Closed
wants to merge 5 commits into from
Closed

Conversation

busti
Copy link
Contributor

@busti busti commented Aug 31, 2021

This is still a work in progress and is missing tests and some implementations. Feedback welcome.
That being said, zio streams don't translate well to a reactive-streams style api. Especially ZSink is kinda dodgy.
Zio streams are completely functionally pure, while reactive streams rely on impure behavior here and there. Especially when subscribing, cancelling and in subjects.
Speaking of which, I have no Idea how I should implement subjects because ZStream and ZSink both are abstract classes and not traits, so they cannot be combined.

This approach currently includes the following type aliases:

type TSink[A]   = ZSink[Any, Throwable, A, A, Unit]
type TStream[A] = ZStream[Any, Throwable, A]

These are necessary to pass the type to the type parameter of Sink and Source respectively.
I thought this would look a little nicer than using type lambdas.
The T in TSink and TStream stands for Task since their static type parameters are identical to zio's type Task[A] = ZIO[Any, Throwable, A]

@cornerman
Copy link
Owner

That is awesome! ❤️

Thank you so much, I always wanted zio-stream to be supported.

What this PR also shows is that we should really return effect types for the function onNext, onError on Observer and subscribe on Observable. All the usages of unsafeRun show this problem.

@cornerman
Copy link
Owner

In the long-run, we could think about support the Environment type for streams. That would make a super nice integration for ZIO.

@cornerman
Copy link
Owner

cornerman commented Sep 8, 2021

Speaking of which, I have no Idea how I should implement subjects because ZStream and ZSink both are abstract classes and not traits, so they cannot be combined.

I think, it is okay to leave out Subjects. I am not even sure, whether the typeclasses around Subject like CreateSubject belong in this library. We can always create a colibri subject from anything with a Source and Sink instances. It just gets too complex and, as you say, do not translate well in all cases.

@busti
Copy link
Contributor Author

busti commented Sep 9, 2021

What this PR also shows is that we should really return effect types for the function onNext, onError on Observer and subscribe on Observable. All the usages of unsafeRun show this problem.

Yes, not returning F[_] on those is annoying to work with when implementing a library that tries to be as pure as zio does. I can imagine that the same problems will also arise when implementing fs2.
Right now the runtime also has to be passed implicitly which is not very zio-like.
Zio.runtime.map(runtime => ???): URIO[R, A] would be the way to go, if we need to use a runtime at all. That would also be compatible with cats if I am not mistaken.

Supporting environments is a different beast, but I am not sure if we should at all. Especially in outwatch it would require to pass the environment through the entire library using something like cats-mtl's Ask. Or maybe using something like izumi's BIO, but that looks insane to me. I am not sure if I'd like the idea of having generic trifunctor types allover the library, although it will probably be faster than mtl.
Right now I am providing the environment to the stream before giving it to outwatch which looks like this:

object Example extends zio.App {
  def run(args: List[String]): ZManaged[zio.ZEnv, Nothing, ExitCode] = for {
    stream = ZStream.fromSchedule(Schedule.spaced(1.second) >>> Schedule.forever)
    streamWithEnv <- ZIO.access.apply(env => stream.provide(env))
    _ <- OutWatch.renderReplace[Task]("#app", div(streamWithEnv)).ignore
  } yield ExitCode.success
}

I do not use any Subjects at all in my own code. I am experimenting with the actor pattern instead, especially zio-actors looks interesting.
I have this typed actor message passing project which I would like to release at some point, but currently it's not much of a library.

@cornerman
Copy link
Owner

Supporting environments is a different beast, but I am not sure if we should at all. Especially in outwatch it would require to pass the environment through the entire library using something like cats-mtl's Ask. Or maybe using something like izumi's BIO, but that looks insane to me. I am not sure if I'd like the idea of having generic trifunctor types allover the library, although it will probably be faster than mtl.

Actually, we are kind of doing this in a PR in outwatch. We make the VDomModifier[Environment] have a type parameter for the Environment. So normally, you work with VDomModifier[Any]. But you can actually render Kleisli or ZIO in your dom tree and provide the environment only at the end, when rendering :)

@busti
Copy link
Contributor Author

busti commented Sep 10, 2021

Maybe it would also be possible to wrap the whole VDomModifier file in a trait since the environment doesn't change down the tree, right?

But maybe this would be a good point to continue the discussion in gitter since I am also working on a PR that would add 1 type parameter to VDomModifier and 3 to VNode...

@busti
Copy link
Contributor Author

busti commented Sep 23, 2021

What this PR also shows is that we should really return effect types for the function onNext, onError on Observer and subscribe on Observable. All the usages of unsafeRun show this problem.

I feel like this won't be as trivial as implementing some unsafe zio stream conversions.
Generally what you mean is to implement sink as follows, right?

trait Sink[F[_], -G[_]] {
  def onNext[A](sink: G[A])(value: A): F[Unit]
  def onError[A](sink: G[A])(error: Throwable): F[Unit]
}

Problem is that this results in the need to add polymorphic F[_] to everything, right?

@cornerman
Copy link
Owner

Generally what you mean is to implement sink as follows, right?

Yes, kind of. But I would pull the F[_] into the method, like this:

trait Sink[-G[_]] {
  def onNext[F[_], A](sink: G[A])(value: A): F[Unit]
  def onError[F[_], A](sink: G[A])(error: Throwable): F[Unit]
}

Thereby trying to reduce the number of times F[_] has to be mentioned. To me it makes sense, that the effect type can be selected for each method. What do you think?

@busti
Copy link
Contributor Author

busti commented Jan 1, 2022

This PR has a soft dependency on #126
While it definitely does work without, introducing polymorphic F[_] would allow us to get rid of the implicit runtime passed to the conversion methods.

@cornerman
Copy link
Owner

I took the liberty to rebase this PR on the current master in #170. Thank you so much :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants