Fix GH-3080: recover from RedisInvalidSubscriptionException on concurrent pub/sub#3337
Open
suuuuuuminnnnnn wants to merge 1 commit intospring-projects:mainfrom
Conversation
…ption on concurrent pub/sub When addMessageListener and removeMessageListener are called concurrently, AbstractSubscription.closeIfUnsubscribed() can set alive=false on the connection-level subscription while addListener is about to call subscribeChannel/subscribePattern. This causes RedisInvalidSubscriptionException to propagate and leaves the listener in listenerTopics without an active subscription (memory leak). Fix: catch RedisInvalidSubscriptionException in the wasListening branch of addListener(), call stopListening() to dispose the dead connection, then lazyListen() to open a fresh connection and re-subscribe all channels/patterns from channelMapping/patternMapping, which already contain the newly added topics. Closes spring-projects#3080 Signed-off-by: suuuuuuminnnnnn <sumin45402214@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Concurrent
addMessageListener+removeMessageListenercalls can cause a racecondition in
RedisMessageListenerContainer:addMessageListener(listener2, "b")→ updateschannelMappinglistenerTopics, then prepares to callsubscribeChannel(["b"])removeMessageListener(listener1, "a")→unsubscribeChannel(["a"])→AbstractSubscription.channelsbecomes empty→
closeIfUnsubscribed()→close()→alive = falsesubscribeChannel(["b"])→checkPulse()→alive == false→RedisInvalidSubscriptionExceptionpropagateslistener2remains inlistenerTopicswithout an active subscription→ memory leak
This race only manifests with async drivers (Lettuce). Jedis uses
BlockingSubscriberwhich setsconnection = nullafterdoSubscribe,making incremental
subscribeChannela no-op.Fix
In the
wasListeningbranch ofaddListener(), catchRedisInvalidSubscriptionExceptionand recover:stopListening()does not change thestartedflag, so the container remainsrunning.
lazyListen()re-subscribes all topics including the newly added one(already in
channelMappingbefore the exception).Testing
Added unit test
shouldRecoverWhenSubscriptionDiesOnConcurrentRemovethat:subscriptionMock.subscribe()throwRedisInvalidSubscriptionExceptionconnection.subscribe()calllistenerTopicscontains the new listener (no memory leak)isListening() == trueafter recoveryCloses #3080