-
Notifications
You must be signed in to change notification settings - Fork 90
Refactor Subscriber
#984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/1.x
Are you sure you want to change the base?
Refactor Subscriber
#984
Conversation
The main goal is to reduce the overhead when we have a lot of subscriptions. - Avoid locking for `subscribe` and `unsubscribe`. We should be able to subscribe to or unsubscribe from other channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked. We represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/key wait on state changes by using a `Deferred` as a signal. We don't go back to a single `Ref` with a `Map`, but switch to using a `MapRef` backed by a `ConcurrentHashMap` allowing us to reduce contention if we have a lot of concurrent state changes. - Avoid using multiple `RedisPubSubListener`s. The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.
Scala 3 is still unhappy with value classes and generic methods
Sorry for delaying the request for so long. Was busy, then got sick. I hope to review this this week. |
sub: RedisChannel[String] => IO[Unit], | ||
unsub: RedisChannel[String] => IO[Unit] | ||
): IO[Subscriber.SubscriptionMap[IO, RedisChannel[String], String]] = { | ||
// import effect.Log.Stdout._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
subscription <- map.subscribe(channel1).compile.toList.start | ||
_ <- waitOnFiber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of sleep which makes for long tests and race conditions in CI, how about cats effect's testcontrol? Otherwise, if we want to keep the exact runtime:
subscription <- map.subscribe(channel1).compile.toList.start | |
_ <- waitOnFiber | |
waitForSubscribe <- Deferred[IO, Unit] | |
subscription <- map.subscribe(channel1).compile.toList.flatTap(_ => waitForSubscribe.complete(())).start | |
_ <- waitForSubscribe.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the sleep
initially because that is what RedisPubSubSpec
uses, but unlike those tests we aren't actually communicating with redis here.
The complicating factor is that subscribe
is doing quite a lot: it is subscribing, but also returning the messages. In these tests we want to be subscribed (both to redis and to the Topic
) before we use any of the other operations where we assume there is a subscription.
I am working on a change to the internal subscribe
signature to make it easier to test (separating the subscription from the message consumption effect), but I still need to clean up these tests a bit more before I'll push that work. Thanks for this comment, I should have given this some more thought before.
On a side note, your Deferred
suggestion unfortunately doesn't work. The subscribe
method never terminates on it is own. We need something to call unsubscribe
or end the stream early.
// Lettuce calls the listeners one by one (for every subscribe, | ||
// unsubscribe, message, ...), so using multiple listeners when we can | ||
// find the right subscription easily, is inefficient. | ||
dispatcher <- Dispatcher.sequential[F] // we have no parallelism in the listener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with the pubsub code, but wonder if this code would be simpler if for each pubSub.subscribe(myChannel)
, we add a listener to Lettuce what creates a sequential dispatcher and puts messages on a queue which an FS2 stream consumes. This would delegate all the state handling to Lettuce which in turn uses Netty.
Looking at existing code, I think it's somewhat what I'd expect, where a listener and subscriber is created for each subscribe
call. All the listener does is publish to a fs2.Topic
(which I'm not familiar with, but guessing it's some queue). Also seems like it could use sequential and not parallel as all it does it add to Topic.
So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing code uses a listener for every subscription indeed. Lettuce calls every listener for every message though. So the more subscriptions we have, the more listeners will get called where all but one will ignore the message. That feels very inefficient. Since we have a map with all subscriptions, we can pass the message to the Topic
from the right subscription directly with a single listener and thus avoid calling N
listeners for every message (when we have N
distinct subscriptions).
This would delegate all the state handling to Lettuce which in turn uses Netty.
Not sure what you mean here. Lettuce only has a list of listeners, it doesn't know what those listeners are interested in (message
, subscribed
, unsubscribed
, ...).
- https://github.com/redis/lettuce/blob/main/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
- https://github.com/redis/lettuce/blob/8321f5d5b4877cde5a3274bcbbcddcb81a1c4589/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java#L251-L286
So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.
In the existing code a single sequential dispatcher would have worked too I think. Since the listeners get called one by one and we use unsafeRunSync
, there is no way for us to ever have multiple dispatcher calls at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, Lettuce does a linear scan, whereas we can do constant time lookup to route message to respective topic. This makes sense from an efficiency perspective, which isn't to say that the Lettuce design is bad.
My initial gut reaction would be to allow supporting both, such that end users could set up multiple listeners if they want multiple consumers for same key, but the default Subscriber goes for efficiency. Will refine these thoughts while reviewing.
Thanks for clarifying, will give PR another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already support for multiple subscribers for the same key. That is why the code is using a Topic
, which uses a Channel
for every subscription.
new RedisPubSubAdapter[K, V] { | ||
override def message(ch: K, msg: V): Unit = | ||
try | ||
dispatcher.unsafeRunSync(state.channelSubs.onMessage(RedisChannel(ch), msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mention this above, but since we funnel all messages through a single sequential dispatcher, which is backed by a single queue for the IO effects, this could create "head-of-line" blocking. There could exist a topic with a single subscriber blocking, and that would lead to block subscribers on all other topics. And this is b/c onMessage
just routes to a topic, then publishes to it.
I think solutions would be to add queues or do routing at the Java layer.
I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics. Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the |
The main goal is to reduce the overhead we add when we have a lot of subscriptions.
subscribe
andunsubscribe
. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked.In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using a
Deferred
as a signal.We don't go back to a single
Ref
with aMap
, but switch to using aMapRef
backed by aConcurrentHashMap
allowing us to reduce contention if we have a lot of concurrent state changes.RedisPubSubListener
s.The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.
This is a more ambitious version of what I started in #972.
Unfortunately this made the diff a lot larger than I wanted it to be.
unsubscribe
failing currently leaves us in a state where we can't retry, since we keep aRedis4CatsSubscription
with a single subscription, without actual subscribers to the topic).Subscriber
implementation into theSubscriber
companion object, which I think makes things easier (but that is probably subjective).The
Redis4CatsSubscription
class is now theActive
part ofSubscriptionState
.I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.
unsubscribe
instead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway).We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic.
We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)