Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub in a concurrent scenario may result in the inability to use subscriptions anymore, which can ultimately lead to memory leaks #3080

Open
caiuswang opened this issue Dec 19, 2024 · 0 comments
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@caiuswang
Copy link

caiuswang commented Dec 19, 2024

Problem Description

  • In certain concurrent scenarios, we have observed that the pub/sub mechanism can result in subscription failures and subsequently lead to memory leaks.#2425

Code Execution Process

addListener Method

  1. Initially, the listener is added to the listenerTopics collection.
  2. Subsequently, the subscribeChannel method is invoked to establish the subscription.

org.springframework.data.redis.connection.util.AbstractSubscription#subscribe Method

  1. The method checks if the alive flag is set to true. If it is not, a RedisInvalidSubscriptionException is thrown.

removeMessageListener Method

  1. The listener is first removed from the listenerTopics collection.
  2. Next, it is verified whether listenerTopics is empty. If so, the stopListening method is called.
  3. Following this, the channel is removed from the subscription's channels, and the closeIfUnsubscribed method is invoked.

org.springframework.data.redis.connection.util.AbstractSubscription#closeIfUnsubscribed Method

  1. The method checks if the channels collection is empty. If it is, the alive flag is set to false using alive.compareAndSet(true, false).

Bug Analysis

  • Steps 2 and 3 of the removeMessageListener method introduce a potential race condition. During the interval between these steps, it is possible for step 2 to detect that listenerTopics is not empty while step 3 detects that the channel is empty (resulting in setting alive to false).
  • Consequently, any subsequent calls to addListener will fail because the subscription process is deemed unsuccessful. As a result, applications typically do not invoke removeMessageListener again, leading to the listener remaining in the listenerTopics collection indefinitely.

Reproduction

  • I made some changes to the code to make the issue easier to reproduce.

Modify Code (RedisMessageListenerContainer)

  • The first invocation of addListener executes successfully and promptly.
  • The second invocation of addListener experiences a delay between Step 1 and Step 2. Execution resumes after the first call to removeMessageListener.
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
    // ==============add  code start
	private static final AtomicInteger counter = new AtomicInteger(0);
	private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
    // ==============add  code end

    // ==============add  code start
	private static final AtomicInteger counter = new AtomicInteger(0);
	private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
   
    private void addListener(MessageListener listener, Collection<? extends Topic> topics) {
        // ....
        for (Topic topic : topics) {
            // ...
        }
        //  ============== add code start
        boolean wasListening = isListening();
        counter.incrementAndGet();
        while (counter.get() == 2 && !firstListenerHasRemoved.get()) { }
        // ============== add code end
        if (isRunning()) {
            lazyListen();
            // .....
        }
    }

Add test case to RedisMessageListenerContainerIntegrationTests

	@ParameterizedRedisTest
	void subscribeErrorAfterStart() throws Exception {

		CompositeListener listener1 = (message, pattern) -> { };
		CompositeListener listener2 = (message, pattern) -> { };

		container.start();
		// the first subscribe will succeed
		container.addMessageListener(listener1, new ChannelTopic("a"));
		CompletableFuture<Exception> subscribeExceptionFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				container.addMessageListener(listener2, new ChannelTopic("b"));
				subscribeExceptionFuture.complete(null);
			} catch (Exception e) {
				subscribeExceptionFuture.complete(e);
			}
		}).start();
		new Thread(() -> {
			container.removeMessageListener(listener1);
		}).start();

		// the second subscribe will fail (not expected)
		subscribeExceptionFuture.get();
		CompositeListener listener3 = (message, pattern) -> { };

		// after the exception, we should still be able to subscribe to channels
		CompletableFuture<Exception> otherSubscribeExceptionFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				container.addMessageListener(listener3, new ChannelTopic("c"));
				otherSubscribeExceptionFuture.complete(null);
			} catch (Exception e) {
				otherSubscribeExceptionFuture.complete(e);
			}
		}).start();
		assertThat(otherSubscribeExceptionFuture.get()).isNull();

		container.destroy();
	}
SCR-20241220-bbsb
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

2 participants