Skip to content

Commit 4ce0c75

Browse files
poorbarcodelhotari
andauthored
[fix] Key_Shared mode consumption latency when low traffic (#23340)
Co-authored-by: Lari Hotari <[email protected]>
1 parent 501dfde commit 4ce0c75

File tree

4 files changed

+10
-8
lines changed

4 files changed

+10
-8
lines changed

conf/broker.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,12 +489,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
489489
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
490490
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
491491
# delay. This parameter sets the initial backoff delay in milliseconds.
492-
dispatcherRetryBackoffInitialTimeInMs=100
492+
dispatcherRetryBackoffInitialTimeInMs=1
493493

494494
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
495495
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
496496
# delay. This parameter sets the maximum backoff delay in milliseconds.
497-
dispatcherRetryBackoffMaxTimeInMs=1000
497+
dispatcherRetryBackoffMaxTimeInMs=10
498498

499499
# Precise dispatcher flow control according to history message number of each entry
500500
preciseDispatcherFlowControl=false

conf/standalone.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
305305
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
306306
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
307307
# delay. This parameter sets the initial backoff delay in milliseconds.
308-
dispatcherRetryBackoffInitialTimeInMs=100
308+
dispatcherRetryBackoffInitialTimeInMs=1
309309

310310
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
311311
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
312312
# delay. This parameter sets the maximum backoff delay in milliseconds.
313-
dispatcherRetryBackoffMaxTimeInMs=1000
313+
dispatcherRetryBackoffMaxTimeInMs=10
314314

315315
# Precise dispatcher flow control according to history message number of each entry
316316
preciseDispatcherFlowControl=false

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,14 +1231,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
12311231
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
12321232
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
12331233
+ "delay. This parameter sets the initial backoff delay in milliseconds.")
1234-
private int dispatcherRetryBackoffInitialTimeInMs = 100;
1234+
private int dispatcherRetryBackoffInitialTimeInMs = 1;
12351235

12361236
@FieldContext(
12371237
category = CATEGORY_POLICIES,
12381238
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
12391239
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
12401240
+ "delay. This parameter sets the maximum backoff delay in milliseconds.")
1241-
private int dispatcherRetryBackoffMaxTimeInMs = 1000;
1241+
private int dispatcherRetryBackoffMaxTimeInMs = 10;
12421242

12431243
@FieldContext(
12441244
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -729,11 +729,13 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType,
729729
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
730730
int entriesDispatched = lastNumberOfEntriesDispatched;
731731
updatePendingBytesToDispatch(-totalBytesSize);
732+
if (entriesDispatched > 0) {
733+
// Reset the backoff when we successfully dispatched messages
734+
retryBackoff.reset();
735+
}
732736
if (triggerReadingMore) {
733737
if (entriesDispatched > 0 || skipNextBackoff) {
734738
skipNextBackoff = false;
735-
// Reset the backoff when we successfully dispatched messages
736-
retryBackoff.reset();
737739
// Call readMoreEntries in the same thread to trigger the next read
738740
readMoreEntries();
739741
} else if (entriesDispatched == 0) {

0 commit comments

Comments
 (0)