Skip to content

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jun 19, 2025

Motivation

Reason for Revert:

This PR has introduced a significant performance regression in the Pulsar broker.
The attached flame graph visually demonstrates increased CPU utilization and time
spent in the code paths related to DelayedDeliveryTracker and stream operations.

While the intention was to optimize memory usage, the current implementation
appears to have an adverse effect on CPU performance, leading to overall degraded
broker throughput and increased latency.

Impact:

This regression is impacting the stability and performance of our Pulsar clusters,
especially when you have large-scale delayed messages. Reverting the change will
allow us to restore the previous performance characteristics while we investigate
a more robust and performant solution for DelayedDeliveryTracker memory optimization.

Root Cause:

https://github.com/apache/pulsar/pull/23611/files#diff-f159b4e262ff6213bba19d20e6dc01d07ea8c19f4675524e4ceb1470f456e8fcR229-R231
Here is the change that caused this issue. Each message added to the
DelayedMessageTracker will call getNumberOfDelayedMessages() method
which will iterate the map defined in InMemoryDelayedDeliveryTracker

protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap = new Long2ObjectAVLTreeMap<>();

More contexts:

Flamegraph:
delayed-tracker-flamegraph

Jstack:

"broker-topic-workers-OrderedExecutor-1-0" #51 [95] prio=5 os_prio=0 cpu=1678706.57ms elapsed=15712.43s tid=0x00007fa0da32b820 nid=95 runnable  [0x00007fa0d6c00000]
   java.lang.Thread.State: RUNNABLE
        at java.util.stream.AbstractPipeline.wrapSink([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto([email protected]/Unknown Source)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.evaluate([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.reduce([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.sum([email protected]/Unknown Source)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown Source)
        at java.util.stream.ReferencePipeline$5$1.accept([email protected]/Unknown Source)
        at java.util.Iterator.forEachRemaining([email protected]/Unknown Source)
        at it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
        at java.util.stream.AbstractPipeline.copyInto([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto([email protected]/Unknown Source)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.evaluate([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.reduce([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.sum([email protected]/Unknown Source)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
        at org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.addMessage(InMemoryDelayedDeliveryTracker.java:128)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trackDelayedDelivery(PersistentDispatcherMultipleConsumers.java:1314)
        - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:228)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:864)
        - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:769)
        - eliminated <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:729)
        - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown Source)
        at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
        at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.runWith([email protected]/Unknown Source)
        at java.lang.Thread.run([email protected]/Unknown Source)

"broker-topic-workers-OrderedExecutor-2-0" #52 [96] prio=5 os_prio=0 cpu=1797445.47ms elapsed=15712.43s tid=0x00007fa0d9419fc0 nid=96 runnable  [0x00007fa0d6aff000]
   java.lang.Thread.State: RUNNABLE
        at java.util.stream.AbstractPipeline.copyInto([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto([email protected]/Unknown Source)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.evaluate([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.reduce([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.sum([email protected]/Unknown Source)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown Source)
        at java.util.stream.ReferencePipeline$5$1.accept([email protected]/Unknown Source)
        at java.util.Iterator.forEachRemaining([email protected]/Unknown Source)
        at it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
        at java.util.stream.AbstractPipeline.copyInto([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto([email protected]/Unknown Source)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential([email protected]/Unknown Source)
        at java.util.stream.AbstractPipeline.evaluate([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.reduce([email protected]/Unknown Source)
        at java.util.stream.LongPipeline.sum([email protected]/Unknown Source)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
        at org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
        at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getScheduledMessages(InMemoryDelayedDeliveryTracker.java:217)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:1327)
        - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:386)
        - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:743)
        - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown Source)
        at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
        at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.runWith([email protected]/Unknown Source)
        at java.lang.Thread.run([email protected]/Unknown Source)

Modifications

Revert PR #23611 and the related PR #24035

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codelipenghui codelipenghui self-assigned this Jun 19, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 19, 2025
@codelipenghui codelipenghui added cherry-picked/branch-4.0 ready-to-test and removed doc-not-needed Your PR changes do not impact docs labels Jun 19, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 19, 2025
@codelipenghui codelipenghui changed the title [branch-4.0] Revert "[improve][broker] Reduce memory occupation of the delayed message queue (#23611)" [fix][broker][branch-4.0] Revert "[improve][broker] Reduce memory occupation of the delayed message queue (#23611)" Jun 19, 2025
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor

The attached flame graph visually demonstrates increased CPU utilization and time
spent in the code paths related to DelayedDeliveryTracker

Where is the flame graph?

@codelipenghui
Copy link
Contributor Author

@BewareMyPower Uploaded. I have been attached to the mailing list before. Forgot to add them here.

@codelipenghui codelipenghui merged commit f962d22 into apache:branch-4.0 Jun 19, 2025
78 of 85 checks passed
@codelipenghui codelipenghui deleted the penghui/revert-pr-23611 branch June 19, 2025 03:25
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 19, 2025
…upation of the delayed message queue (apache#23611)" (apache#24429)

(cherry picked from commit f962d22)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 30, 2025
…upation of the delayed message queue (apache#23611)" (apache#24429)

(cherry picked from commit f962d22)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants