Skip to content

Fix GH-3080: recover from RedisInvalidSubscriptionException on concurrent pub/sub#3337

Open
suuuuuuminnnnnn wants to merge 1 commit intospring-projects:mainfrom
suuuuuuminnnnnn:fix/gh-3080-redis-invalid-subscription-recovery
Open

Fix GH-3080: recover from RedisInvalidSubscriptionException on concurrent pub/sub#3337
suuuuuuminnnnnn wants to merge 1 commit intospring-projects:mainfrom
suuuuuuminnnnnn:fix/gh-3080-redis-invalid-subscription-recovery

Conversation

@suuuuuuminnnnnn
Copy link
Copy Markdown

@suuuuuuminnnnnn suuuuuuminnnnnn commented Mar 25, 2026

Problem

Concurrent addMessageListener + removeMessageListener calls can cause a race
condition in RedisMessageListenerContainer:

  1. Thread A calls addMessageListener(listener2, "b") → updates channelMapping
    • listenerTopics, then prepares to call subscribeChannel(["b"])
  2. Thread B calls removeMessageListener(listener1, "a")
    unsubscribeChannel(["a"])AbstractSubscription.channels becomes empty
    closeIfUnsubscribed()close()alive = false
  3. Thread A resumes → subscribeChannel(["b"])checkPulse()
    alive == falseRedisInvalidSubscriptionException propagates
  4. listener2 remains in listenerTopics without an active subscription
    memory leak

This race only manifests with async drivers (Lettuce). Jedis uses
BlockingSubscriber which sets connection = null after doSubscribe,
making incremental subscribeChannel a no-op.

Fix

In the wasListening branch of addListener(), catch
RedisInvalidSubscriptionException and recover:

try {
    if (!channels.isEmpty()) {
        getRequiredSubscriber().subscribeChannel(...);
    }
    if (!patterns.isEmpty()) {
        getRequiredSubscriber().subscribePattern(...);
    }
} catch (RedisInvalidSubscriptionException ex) {
    stopListening();  // dispose dead connection
    lazyListen();     // fresh connection + re-subscribe all from channelMapping/patternMapping
    return;
}

stopListening() does not change the started flag, so the container remains
running. lazyListen() re-subscribes all topics including the newly added one
(already in channelMapping before the exception).

Testing

Added unit test shouldRecoverWhenSubscriptionDiesOnConcurrentRemove that:

  • Simulates the race by making subscriptionMock.subscribe() throw
    RedisInvalidSubscriptionException
  • Verifies no exception propagates to the caller
  • Verifies recovery triggers a second connection.subscribe() call
  • Verifies listenerTopics contains the new listener (no memory leak)
  • Verifies isListening() == true after recovery

Closes #3080


  • You have read the Spring Data contribution guidelines.
  • You use the code formatters provided here and have them applied to your changes. Don’t submit any formatting related changes.
  • You submit test cases (unit or integration tests) that back your changes.
  • You added yourself as author in the headers of the classes you touched. Amend the date range in the Apache license header if needed. For new types, add the license header (copy from another file and set the current year only).

…ption on concurrent pub/sub

When addMessageListener and removeMessageListener are called concurrently,
AbstractSubscription.closeIfUnsubscribed() can set alive=false on the
connection-level subscription while addListener is about to call
subscribeChannel/subscribePattern. This causes RedisInvalidSubscriptionException
to propagate and leaves the listener in listenerTopics without an active
subscription (memory leak).

Fix: catch RedisInvalidSubscriptionException in the wasListening branch of
addListener(), call stopListening() to dispose the dead connection, then
lazyListen() to open a fresh connection and re-subscribe all channels/patterns
from channelMapping/patternMapping, which already contain the newly added topics.

Closes spring-projects#3080

Signed-off-by: suuuuuuminnnnnn <sumin45402214@gmail.com>
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

status: waiting-for-triage An issue we've not yet triaged

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pub/Sub in a concurrent scenario may result in the inability to use subscriptions anymore, which can ultimately lead to memory leaks

2 participants