[fix][broker] remove lock contention in delayed delivery stats read paths#25990
[fix][broker] remove lock contention in delayed delivery stats read paths#25990nodece wants to merge 4 commits into
Conversation
| public Map<String, TopicMetricBean> getBucketDelayedIndexStats() { | ||
| return delayedDeliveryTracker | ||
| .filter(BucketDelayedDeliveryTracker.class::isInstance) | ||
| .map(tracker -> ((BucketDelayedDeliveryTracker) tracker).genTopicMetricMap()) |
There was a problem hiding this comment.
BucketDelayedDeliveryTracker.genTopicMetricMap is not thread safe. It should be made thread safe.
There was a problem hiding this comment.
Thanks for the update. I think this still needs serialization for correctness. This method is now reachable from an unsynchronized stats path, but it is not a pure read: it updates shared stats, calls stats.genTopicMetricMap() which drains counters with sumThenReset() / StatsBuckets.refresh(), and also reads sharedBucketPriorityQueue.size() / lastMutableBucket.size() from non-thread-safe queues. Concurrent stats scrapes can therefore race with each other and export incorrect samples. The minimal fix would be to keep this collection serialized, e.g. make genTopicMetricMap() synchronized; if we want it lock-free, the method needs to become a pure snapshot read from cached/atomic values.
There was a problem hiding this comment.
@void-ptr974 Add synchronized for org.apache.pulsar.broker.delayed.bucket.BucketDelayedMessageIndexStats#genTopicMetricMap.
Is this in a single topic and single subscription? FYI, there's a limit of tracking up to 30M backlogs (with default BK |
Yes.
I will check this later. |
7795eec to
c8f77d9
Compare
a86ce4a to
bdfc915
Compare
| public Map<String, TopicMetricBean> genTopicMetricMap() { | ||
| stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1); | ||
| stats.recordNumOfBuckets((int) (immutableBuckets.count() + 1)); | ||
| stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size()); |
There was a problem hiding this comment.
This method updates shared stats and reads sharedBucketPriorityQueue.size() / lastMutableBucket.size() without serialization. Since BucketDelayedMessageIndexStats#genTopicMetricMap() performs destructive reads (sumThenReset() / StatsBuckets.refresh()), concurrent stats collection can interleave and produce inconsistent or dropped metric samples.
| immutableBucket.setSnapshotSegments(null); | ||
| immutableBucket.asyncUpdateSnapshotLength(); | ||
| immutableBucket.asyncUpdateSnapshotLength() | ||
| .thenRun(() -> immutableBuckets.recomputeCounters()); |
There was a problem hiding this comment.
This calls immutableBuckets.recomputeCounters() from an async callback without the tracker lock, while other paths mutate the same TreeRangeMap under that lock. TreeRangeMap is not thread-safe, so this can race with put/remove/clear and produce incorrect counters or fail during concurrent modification.
| ImmutableBucket bucket = iterator.next(); | ||
| futures.add(bucket.clear(stats)); | ||
| numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages()); | ||
| iterator.remove(); |
There was a problem hiding this comment.
This removes entries through the mutable asMapOfRanges() view, bypassing ImmutableBucketIndex.remove(). The cached count and totalSnapshotLength can stay stale after clearDelayedMessages(), so later metrics and decisions based on immutableBuckets.count() can observe an inconsistent index state.
|
At a higher level, moving stats reads away from the dispatcher lock looks like the right direction for this PR. The remaining concern is making sure the newly unsynchronized stats paths do not read or mutate tracker internals without a clear concurrency boundary. One possible short-term approach is to use the tracker lock for the small fixed-size stats sections and unsafe |
Motivation
Under a large delayed-delivery workload (~500M delayed messages), jstack analysis
shows the dispatcher lock is held while sealing delayed-delivery buckets.
A broker worker thread spends significant CPU time in:
while holding both:
As a result, stats collection threads (Prometheus, admin APIs, getStatsAsync)
are blocked waiting for the dispatcher monitor.
The same pattern affects
InMemoryDelayedDeliveryTracker, wheregetBufferMemoryUsage()iterates the entireTreeMap<Long, TreeMap<Long, Roaring64Bitmap>>while holding the dispatcher lock.
Modifications
1.
BucketDelayedDeliveryTracker— lock-free bucket stats viaImmutableBucketIndexIntroduce
ImmutableBucketIndex, a wrapper around Guava'sTreeRangeMap<Long, ImmutableBucket>that maintains two
AtomicLongcounters: bucket count and total snapshot length.put()recomputes counters after each insertion (required becauseTreeRangeMap.put()silently removes/splits overlapping entries)
remove()decrements counters only when removal succeedsgenTopicMetricMap()reads counters without holding any lock2.
InMemoryDelayedDeliveryTracker— lock-free memory usage via delta trackingAdd
AtomicLong memoryUsagethat is updated by delta at each mutation point(
addMessage,getScheduledMessages,clear).getBufferMemoryUsage()returnsthe cached value directly instead of iterating the nested
TreeMap.3. Dispatcher classes — remove
synchronizedfrom stats read pathsIn both
PersistentDispatcherMultipleConsumersandPersistentDispatcherMultipleConsumersClassic:getNumberOfDelayedMessages(),getDelayedTrackerMemoryUsage(),getBucketDelayedIndexStats(),shouldPauseDeliveryForDelayTracker()—removed
synchronized; now useOptional.map()with thevolatilefielddelayedDeliveryTrackerfield changed tovolatilefor visibility