Skip to content

[fix][broker] remove lock contention in delayed delivery stats read paths#25990

Draft
nodece wants to merge 4 commits into
apache:masterfrom
nodece:fix/broker-delayed-stats-remove-synchronization
Draft

[fix][broker] remove lock contention in delayed delivery stats read paths#25990
nodece wants to merge 4 commits into
apache:masterfrom
nodece:fix/broker-delayed-stats-remove-synchronization

Conversation

@nodece

@nodece nodece commented Jun 10, 2026

Copy link
Copy Markdown
Member

Motivation

Under a large delayed-delivery workload (~500M delayed messages), jstack analysis
shows the dispatcher lock is held while sealing delayed-delivery buckets.

A broker worker thread spends significant CPU time in:

TripleLongPriorityQueue.siftDown()
  -> pop()
  -> createImmutableBucketAndAsyncPersistent()

while holding both:

PersistentDispatcherMultipleConsumers
BucketDelayedDeliveryTracker

As a result, stats collection threads (Prometheus, admin APIs, getStatsAsync)
are blocked waiting for the dispatcher monitor.

The same pattern affects InMemoryDelayedDeliveryTracker, where
getBufferMemoryUsage() iterates the entire TreeMap<Long, TreeMap<Long, Roaring64Bitmap>>
while holding the dispatcher lock.

Modifications

1. BucketDelayedDeliveryTracker — lock-free bucket stats via ImmutableBucketIndex

Introduce ImmutableBucketIndex, a wrapper around Guava's TreeRangeMap<Long, ImmutableBucket>
that maintains two AtomicLong counters: bucket count and total snapshot length.

  • put() recomputes counters after each insertion (required because TreeRangeMap.put()
    silently removes/splits overlapping entries)
  • remove() decrements counters only when removal succeeds
  • genTopicMetricMap() reads counters without holding any lock

2. InMemoryDelayedDeliveryTracker — lock-free memory usage via delta tracking

Add AtomicLong memoryUsage that is updated by delta at each mutation point
(addMessage, getScheduledMessages, clear). getBufferMemoryUsage() returns
the cached value directly instead of iterating the nested TreeMap.

3. Dispatcher classes — remove synchronized from stats read paths

In both PersistentDispatcherMultipleConsumers and PersistentDispatcherMultipleConsumersClassic:

  • getNumberOfDelayedMessages(), getDelayedTrackerMemoryUsage(),
    getBucketDelayedIndexStats(), shouldPauseDeliveryForDelayTracker()
    removed synchronized; now use Optional.map() with the volatile field
  • delayedDeliveryTracker field changed to volatile for visibility

@nodece nodece requested review from dao-jun and lhotari June 10, 2026 13:10

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization was explicitly added in #25681 to fix race conditions. Did you take that into account? I think that synchronization cannot be removed from shouldPauseAllDeliveries. For other methods it's most likely fine.

public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
return delayedDeliveryTracker
.filter(BucketDelayedDeliveryTracker.class::isInstance)
.map(tracker -> ((BucketDelayedDeliveryTracker) tracker).genTopicMetricMap())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketDelayedDeliveryTracker.genTopicMetricMap is not thread safe. It should be made thread safe.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Fixed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I think this still needs serialization for correctness. This method is now reachable from an unsynchronized stats path, but it is not a pure read: it updates shared stats, calls stats.genTopicMetricMap() which drains counters with sumThenReset() / StatsBuckets.refresh(), and also reads sharedBucketPriorityQueue.size() / lastMutableBucket.size() from non-thread-safe queues. Concurrent stats scrapes can therefore race with each other and export incorrect samples. The minimal fix would be to keep this collection serialized, e.g. make genTopicMetricMap() synchronized; if we want it lock-free, the method needs to become a pure snapshot read from cached/atomic values.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@void-ptr974 Add synchronized for org.apache.pulsar.broker.delayed.bucket.BucketDelayedMessageIndexStats#genTopicMetricMap.

@lhotari

lhotari commented Jun 10, 2026

Copy link
Copy Markdown
Member

Under a large delayed-delivery workload (~500M delayed messages)

Is this in a single topic and single subscription?

FYI, there's a limit of tracking up to 30M backlogs (with default BK nettyMaxFrameSizeBytes) when using managedLedgerPersistIndividualAckAsLongArray=true. More can be stored to metadata when using LZ4 compression, but there will be failures each time cursor state is attempted to be saved. Issue is #25985.

@nodece nodece requested a review from lhotari June 11, 2026 11:24
@nodece

nodece commented Jun 11, 2026

Copy link
Copy Markdown
Member Author

Is this in a single topic and single subscription?

Yes.

FYI, there's a limit of tracking up to 30M backlogs (with default BK nettyMaxFrameSizeBytes) when using managedLedgerPersistIndividualAckAsLongArray=true. More can be stored to metadata when using LZ4 compression, but there will be failures each time cursor state is attempted to be saved. Issue is #25985.

I will check this later.

@nodece nodece force-pushed the fix/broker-delayed-stats-remove-synchronization branch from 7795eec to c8f77d9 Compare June 11, 2026 13:12
@nodece nodece marked this pull request as draft June 17, 2026 10:36
@nodece nodece force-pushed the fix/broker-delayed-stats-remove-synchronization branch from a86ce4a to bdfc915 Compare June 22, 2026 10:04
@nodece nodece marked this pull request as ready for review June 22, 2026 10:04
@nodece nodece requested a review from void-ptr974 June 22, 2026 13:10
public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
stats.recordNumOfBuckets((int) (immutableBuckets.count() + 1));
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method updates shared stats and reads sharedBucketPriorityQueue.size() / lastMutableBucket.size() without serialization. Since BucketDelayedMessageIndexStats#genTopicMetricMap() performs destructive reads (sumThenReset() / StatsBuckets.refresh()), concurrent stats collection can interleave and produce inconsistent or dropped metric samples.

immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
immutableBucket.asyncUpdateSnapshotLength()
.thenRun(() -> immutableBuckets.recomputeCounters());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls immutableBuckets.recomputeCounters() from an async callback without the tracker lock, while other paths mutate the same TreeRangeMap under that lock. TreeRangeMap is not thread-safe, so this can race with put/remove/clear and produce incorrect counters or fail during concurrent modification.

ImmutableBucket bucket = iterator.next();
futures.add(bucket.clear(stats));
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
iterator.remove();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes entries through the mutable asMapOfRanges() view, bypassing ImmutableBucketIndex.remove(). The cached count and totalSnapshotLength can stay stale after clearDelayedMessages(), so later metrics and decisions based on immutableBuckets.count() can observe an inconsistent index state.

@void-ptr974

Copy link
Copy Markdown
Contributor

At a higher level, moving stats reads away from the dispatcher lock looks like the right direction for this PR. The remaining concern is making sure the newly unsynchronized stats paths do not read or mutate tracker internals without a clear concurrency boundary.

One possible short-term approach is to use the tracker lock for the small fixed-size stats sections and unsafe TreeRangeMap access, while keeping the dispatcher lock out of these stats paths. Longer term, it may be worth moving more mutable TreeRangeMap access and cached-counter maintenance behind ImmutableBucketIndex APIs, so callers do not need to touch live map views directly and stats paths can read cached/atomic values.

@nodece nodece marked this pull request as draft June 23, 2026 10:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants