Skip to content

[fix][broker] Correct multiple race conditions in PersistentDispatcherMultipleConsumers#25681

Draft
chamons wants to merge 1 commit intoapache:masterfrom
chamons:broker_race_condition_fix
Draft

[fix][broker] Correct multiple race conditions in PersistentDispatcherMultipleConsumers#25681
chamons wants to merge 1 commit intoapache:masterfrom
chamons:broker_race_condition_fix

Conversation

@chamons
Copy link
Copy Markdown

@chamons chamons commented May 5, 2026

Fixes #25617

Motivation

DelayedDeliveryTracker is not thread safe, and any access to it must be done holding the object lock.

This is a significant issue, as exceptions here were uncaught and put the broker into an invalid state that prevented some delayed messages from being delivered until restart.

Modifications

There were five cases I found, three of them I could correct just by adding synchronized to the method declaration. Two of them were overrides and were corrected using a manual locking scope.

Verifying this change

  • Make sure that the change passes the CI checks.

The included unit test failed 100% of the time when run locally without the fix, and we were able to run https://github.com/chamons/pulsar-scheduled-exception-repro without issues with the fix included.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…rMultipleConsumers

- apache#25617

DelayedDeliveryTracker is not thread safe, and any access to it must be done
holding the object lock. There were five cases I found, three of them I could
correct just by adding synchronized to the method declaration. Two of them were
overrides and were corrected using a manual locking scope.

This is a significant issue, as exceptions here were uncaught and put
the broker into an invalid state that prevented some delayed messages
from being delivered until restart.

The included unit test failed 100% of the time when run locally
without the fix, and we were able to run https://github.com/chamons/pulsar-scheduled-exception-repro
without issues with the fix included.
}

@Test
public void testRaceConditionInTrackDelayedDelivery() throws Exception {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is based on pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java

@Override
public long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
synchronized (this) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be synchronized on the method itself

.setPublishTime(System.currentTimeMillis())
.setDeliverAtTime(deliverAt);

ExecutorService executorService = Executors.newFixedThreadPool(32);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are many threads for a unit test :)

The executor needs to get shutdown. Use @Cleanup("shutdown")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] NoSuchElementException in InMemoryDelayedDeliveryTracker.nextDeliveryTime broke message dispatching until broker was restarted

2 participants