Skip to content

Refactor Subscriber #984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: series/1.x
Choose a base branch
from

Conversation

peterneyens
Copy link

The main goal is to reduce the overhead we add when we have a lot of subscriptions.

  • Avoid locking for subscribe and unsubscribe. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked.
    In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using a Deferred as a signal.
    We don't go back to a single Ref with a Map, but switch to using a MapRef backed by a ConcurrentHashMap allowing us to reduce contention if we have a lot of concurrent state changes.
  • Avoid using multiple RedisPubSubListeners.
    The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.

This is a more ambitious version of what I started in #972.
Unfortunately this made the diff a lot larger than I wanted it to be.

  • While avoiding the lock clearly makes things more complicated, it also forces us to think more about the different states and state changes (for example unsubscribe failing currently leaves us in a state where we can't retry, since we keep a Redis4CatsSubscription with a single subscription, without actual subscribers to the topic).
  • I moved everything that is only used by the Subscriber implementation into the Subscriber companion object, which I think makes things easier (but that is probably subjective).
    The Redis4CatsSubscription class is now the Active part of SubscriptionState.

I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.

  • I think we could unsubscribe in unsubscribe instead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway).
  • There are some potential improvements with how we handle publishing messages.
    We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic.
    We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)

The main goal is to reduce the overhead when we have a lot of
subscriptions.

- Avoid locking for `subscribe` and `unsubscribe`. We should be able
  to subscribe to or unsubscribe from other channels/patterns
  independent of concurrent subscriptions to other channels/patterns
  without being blocked.
  We represent the subscription lifecycle as multiple states. We can
  move between those state without locking, but still keep operations
  on the same channel/key wait on state changes by using a `Deferred`
  as a signal.
  We don't go back to a single `Ref` with a `Map`, but switch to using
  a `MapRef` backed by a `ConcurrentHashMap` allowing us to reduce
  contention if we have a lot of concurrent state changes.
- Avoid using multiple `RedisPubSubListener`s. The listeners are only
  called one by one.
  Since we have a map with all subscriptions, we can look up the
  subscription and publish to its topic in one listener.
Scala 3 is still unhappy with value classes and generic methods
@arturaz
Copy link
Collaborator

arturaz commented Mar 27, 2025

Sorry for delaying the request for so long. Was busy, then got sick.

I hope to review this this week.

sub: RedisChannel[String] => IO[Unit],
unsub: RedisChannel[String] => IO[Unit]
): IO[Subscriber.SubscriptionMap[IO, RedisChannel[String], String]] = {
// import effect.Log.Stdout._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Comment on lines +37 to +38
subscription <- map.subscribe(channel1).compile.toList.start
_ <- waitOnFiber
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of sleep which makes for long tests and race conditions in CI, how about cats effect's testcontrol? Otherwise, if we want to keep the exact runtime:

Suggested change
subscription <- map.subscribe(channel1).compile.toList.start
_ <- waitOnFiber
waitForSubscribe <- Deferred[IO, Unit]
subscription <- map.subscribe(channel1).compile.toList.flatTap(_ => waitForSubscribe.complete(())).start
_ <- waitForSubscribe.get()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the sleep initially because that is what RedisPubSubSpec uses, but unlike those tests we aren't actually communicating with redis here.

The complicating factor is that subscribe is doing quite a lot: it is subscribing, but also returning the messages. In these tests we want to be subscribed (both to redis and to the Topic) before we use any of the other operations where we assume there is a subscription.

I am working on a change to the internal subscribe signature to make it easier to test (separating the subscription from the message consumption effect), but I still need to clean up these tests a bit more before I'll push that work. Thanks for this comment, I should have given this some more thought before.

On a side note, your Deferred suggestion unfortunately doesn't work. The subscribe method never terminates on it is own. We need something to call unsubscribe or end the stream early.

// Lettuce calls the listeners one by one (for every subscribe,
// unsubscribe, message, ...), so using multiple listeners when we can
// find the right subscription easily, is inefficient.
dispatcher <- Dispatcher.sequential[F] // we have no parallelism in the listener
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not familiar with the pubsub code, but wonder if this code would be simpler if for each pubSub.subscribe(myChannel), we add a listener to Lettuce what creates a sequential dispatcher and puts messages on a queue which an FS2 stream consumes. This would delegate all the state handling to Lettuce which in turn uses Netty.

Looking at existing code, I think it's somewhat what I'd expect, where a listener and subscriber is created for each subscribe call. All the listener does is publish to a fs2.Topic (which I'm not familiar with, but guessing it's some queue). Also seems like it could use sequential and not parallel as all it does it add to Topic.

So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code uses a listener for every subscription indeed. Lettuce calls every listener for every message though. So the more subscriptions we have, the more listeners will get called where all but one will ignore the message. That feels very inefficient. Since we have a map with all subscriptions, we can pass the message to the Topic from the right subscription directly with a single listener and thus avoid calling N listeners for every message (when we have N distinct subscriptions).

This would delegate all the state handling to Lettuce which in turn uses Netty.

Not sure what you mean here. Lettuce only has a list of listeners, it doesn't know what those listeners are interested in (message, subscribed, unsubscribed, ...).

So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.

In the existing code a single sequential dispatcher would have worked too I think. Since the listeners get called one by one and we use unsafeRunSync, there is no way for us to ever have multiple dispatcher calls at the same time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, Lettuce does a linear scan, whereas we can do constant time lookup to route message to respective topic. This makes sense from an efficiency perspective, which isn't to say that the Lettuce design is bad.

My initial gut reaction would be to allow supporting both, such that end users could set up multiple listeners if they want multiple consumers for same key, but the default Subscriber goes for efficiency. Will refine these thoughts while reviewing.

Thanks for clarifying, will give PR another look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already support for multiple subscribers for the same key. That is why the code is using a Topic, which uses a Channel for every subscription.

new RedisPubSubAdapter[K, V] {
override def message(ch: K, msg: V): Unit =
try
dispatcher.unsafeRunSync(state.channelSubs.onMessage(RedisChannel(ch), msg))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mention this above, but since we funnel all messages through a single sequential dispatcher, which is backed by a single queue for the IO effects, this could create "head-of-line" blocking. There could exist a topic with a single subscriber blocking, and that would lead to block subscribers on all other topics. And this is b/c onMessage just routes to a topic, then publishes to it.

I think solutions would be to add queues or do routing at the Java layer.

@mmienko
Copy link
Collaborator

mmienko commented Apr 17, 2025

The main goal is to reduce the overhead we add when we have a lot of subscriptions. (1) Avoid locking for subscribe and unsubscribe....(2) Avoid using multiple RedisPubSubListeners....

I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics.

Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the Subscribing and Unsubscribing states. Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants