Skip to content

Refactor Subscriber #984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: series/1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ lazy val `redis4cats-streams` = project
prev.filter(artifact => VersionNumber(artifact.revision).matchesSemVer(SemanticSelector(">=1.8.0")))
}
)
.settings(libraryDependencies += Libraries.fs2Core)
.settings(libraryDependencies ++= List(Libraries.fs2Core, Libraries.collectionCompat))
.settings(Test / parallelExecution := false)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(`redis4cats-core`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.syntax.all._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect._
import dev.profunktor.redis4cats.pubsub.internals.{ LivePubSubCommands, PubSubState, Publisher, Subscriber }
import dev.profunktor.redis4cats.pubsub.internals.{ LivePubSubCommands, Publisher, Subscriber }
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

Expand Down Expand Up @@ -56,10 +56,9 @@ object PubSub {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
// One exclusive connection for subscriptions and another connection for publishing / stats
for {
state <- Resource.eval(PubSubState.make[F, K, V])
sConn <- Resource.make(acquire)(release)
subCommands <- Resource.make(acquire)(release).flatMap(Subscriber.make[F, K, V])
pConn <- Resource.make(acquire)(release)
} yield new LivePubSubCommands[F, K, V](state, sConn, pConn)
} yield new LivePubSubCommands[F, K, V](subCommands, pConn)
}

/** Creates a PubSub connection.
Expand All @@ -83,10 +82,7 @@ object PubSub {
codec: RedisCodec[K, V]
): Resource[F, SubscribeCommands[F, Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
for {
state <- Resource.eval(PubSubState.make[F, K, V])
conn <- Resource.make(acquire)(release)
} yield new Subscriber(state, conn)
Resource.make(acquire)(release).flatMap(Subscriber.make[F, K, V])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,15 @@ import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import dev.profunktor.redis4cats.pubsub.data.Subscription
import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
import dev.profunktor.redis4cats.effect.FutureLift
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
state: PubSubState[F, K, V],
subConnection: StatefulRedisPubSubConnection[K, V],
private[pubsub] class LivePubSubCommands[F[_]: Async, K, V](
subCommands: SubscribeCommands[F, Stream[F, *], K, V],
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PubSubCommands[F, Stream[F, *], K, V] {

private[redis4cats] val subCommands: SubscribeCommands[F, Stream[F, *], K, V] =
new Subscriber[F, K, V](state, subConnection)
private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,7 @@
package dev.profunktor.redis4cats.pubsub.internals

import scala.util.control.NoStackTrace
import cats.effect.std.Dispatcher
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import io.lettuce.core.pubsub.RedisPubSubListener
import io.lettuce.core.pubsub.RedisPubSubAdapter

object PubSubInternals {
case class DispatcherAlreadyShutdown() extends NoStackTrace

private[redis4cats] def channelListener[F[_], K, V](
channel: RedisChannel[K],
publish: V => F[Unit],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubAdapter[K, V] {
override def message(ch: K, msg: V): Unit =
if (ch == channel.underlying) {
try
dispatcher.unsafeRunSync(publish(msg))
catch {
case _: IllegalStateException => throw DispatcherAlreadyShutdown()
}
}

// Do not uncomment this, as if you will do this the channel listener will get a message twice
// override def message(pattern: K, channel: K, message: V): Unit = {}
}
private[redis4cats] def patternListener[F[_], K, V](
redisPattern: RedisPattern[K],
publish: RedisPatternEvent[K, V] => F[Unit],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubAdapter[K, V] {
override def message(pattern: K, channel: K, message: V): Unit =
if (pattern == redisPattern.underlying) {
try
dispatcher.unsafeRunSync(publish(RedisPatternEvent(pattern, channel, message)))
catch {
case _: IllegalStateException => throw DispatcherAlreadyShutdown()
}
}
}
}

This file was deleted.

This file was deleted.

Loading