Skip to content

Optimize pipelining by parsing and prefetching multiple commands#2092

Merged
zuiderkwast merged 25 commits into
valkey-io:unstablefrom
zuiderkwast:lookahead
Aug 22, 2025
Merged

Optimize pipelining by parsing and prefetching multiple commands#2092
zuiderkwast merged 25 commits into
valkey-io:unstablefrom
zuiderkwast:lookahead

Conversation

@zuiderkwast

@zuiderkwast zuiderkwast commented May 16, 2025

Copy link
Copy Markdown
Contributor

Instead of parsing only one command per client before executing it, parse multiple commands from the query buffer and batch-prefetch the keys accessed by the commands in the queue before executing them.

This is an optimization for pipelined commands, both with and without I/O threads. The optimization is currently disabled for the replication stream, due to failures (probably caused by how the replication offset is calculated based on the query buffer offset).

  • When parsing commands from the input buffer, multiple commands are parsed and stored in a command queue per client.
  • In single-threaded mode (I/O threads off) keys are batch-prefetched before the commands in the queue are executed. Multi-key commands like MGET, MSET and DEL benefit from this even if pipelining is not used.
  • Prefetching when I/O threads are used does prefetching for multiple clients in parallel. This code takes client command queues into account, improving prefetching when pipelining is used. The batch size is controlled by the existing config prefetch-batch-max-size (default 16), which so far only was used together with I/O threads. The config is moved to a different section in valkey.conf.
  • When I/O threads are used and the maximum number of keys are prefetched, a client's command is executed, then the next one in the queue, etc. If there are more commands in the queue for which the keys have not been prefetched (say the client sends 16 pipelined MGET with 16 keys in each) keys for the next few commands in the queue are prefetched before the commands is executed if prefetching has not been done for the next command. (This utilizes the code path used in single-threaded mode.)

Code improvements:

  • Decoupling of command parser state and command execution state:
    • The variables reqtype, multibulklen and bulklen refer to the current position in the query buffer. These are no longer reset in resetClient (which runs after each command being executed). Instead, they are reset in the parser code after each completely parsed command.
    • The command parser code is partially decoupled from the client struct. The query buffer is still one per client, but the resulting argument vector is stored in caller-defined variables.

Benchmarking with and without I/O threads, see below.

Fixes #2044

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
@zuiderkwast zuiderkwast requested review from madolson and uriyage May 16, 2025 12:04
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Comment thread src/networking.c Fixed
@codecov

codecov Bot commented May 16, 2025

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 86.91099% with 25 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.21%. Comparing base (ac51515) to head (a4ad52e).
⚠️ Report is 1 commits behind head on unstable.

Files with missing lines Patch % Lines
src/memory_prefetch.c 0.00% 18 Missing ⚠️
src/networking.c 96.73% 5 Missing ⚠️
src/server.c 90.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #2092      +/-   ##
============================================
+ Coverage     72.02%   72.21%   +0.19%     
============================================
  Files           126      126              
  Lines         70493    70616     +123     
============================================
+ Hits          50771    50994     +223     
+ Misses        19722    19622     -100     
Files with missing lines Coverage Δ
src/kvstore.c 96.01% <ø> (ø)
src/module.c 9.52% <ø> (ø)
src/server.h 100.00% <ø> (ø)
src/server.c 88.44% <90.00%> (-0.03%) ⬇️
src/networking.c 88.28% <96.73%> (+0.36%) ⬆️
src/memory_prefetch.c 11.26% <0.00%> (-0.77%) ⬇️

... and 15 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
@zuiderkwast zuiderkwast changed the title Parse multiple commands Parse multiple commands and prefetch keys May 17, 2025
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Comment thread src/memory_prefetch.c Outdated
Comment thread src/server.h Outdated

@dvkashapov dvkashapov left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some benchmarks on AMD EPYC 7713 32 CPUs

  • config:
databases 1
save ""
appendonly no
rdbcompression no
activedefrag no
maxclients 1000
io-threads *
io-threads-do-reads yes
protected-mode no
maxmemory 2500mb
maxmemory-policy noeviction
hz 1
latency-monitor-threshold 0
  • cset shield
  • cpu bind
  • cache population
    valkey-benchmark -h <ip> -c 96 --threads 96 -d 256 -t set -n 3000000 -r 3000000 --sequential
  • benchmark itself
    valkey-benchmark -h <ip> -c 192 --threads 96 -d 256 -t get -n 10000000 -r 3000000 -P 10

io-threads 1 unstable result:

Summary:
  throughput summary: 352795.91 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
        5.396     0.048     5.279     5.727    10.455   124.287

io-threads 1 this pr result:

Summary:
  throughput summary: 604375.69 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
        3.100     0.040     2.935     3.431     5.791   132.991

io-threads 9 unstable result:

Summary:
  throughput summary: 383435.59 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
        4.981     0.056     4.959     5.255     5.503    79.039

io-threads 9 this pr result:

Summary:
  throughput summary: 738334.31 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
        2.553     0.032     2.567     2.751     2.919    36.703

Seems like a solid 2x improvement in pipelining workload! Also did not find any memory leaks or instabilities during testing, LGTM!

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>

@PingXie PingXie left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zuiderkwast, I think our changes are indeed very close, especially around the command queue concept. that being said (and I know I am super biased here :)), I think my PR is closer to the finishing line. Do you want to hop onto my branch and collaborate on mine next?

https://github.com/PingXie/valkey/tree/pipeline

Comment thread src/server.h
Comment thread src/server.h Outdated
Comment thread src/networking.c
Comment thread src/networking.c
Comment thread src/networking.c

@madolson madolson left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall approach makes sense to me, I have some minor thoughts but I haven't thought to much how to think about this vs Ping's PR yet.

Looking back at the original IO threading, we batch N keys, maybe we should change it to be batch N commands instead, since that will make sure we are fully parsing a command. That way we can probably more tightly unify the code.

Comment thread src/server.h
Comment thread src/networking.c Outdated
Comment thread src/server.h Outdated
Comment thread src/networking.c
@zuiderkwast

Copy link
Copy Markdown
Contributor Author

The overall approach makes sense to me, I have some minor thoughts but I haven't thought to much how to think about this vs Ping's PR yet.

Thanks @madolson for taking the time to look at this.

Looking back at the original IO threading, we batch N keys, maybe we should change it to be batch N commands instead, since that will make sure we are fully parsing a command. That way we can probably more tightly unify the code.

I think we shall prefetch full commands, but we can still configure it as the number of keys. If we go over the number N when adding a command to the batch, then we can allow prefetching a few more, let's say up to N*2. That way we prefetch by default 16 keys, but allow prefetching up to 32 keys to stick to full commands.

We should definitely unify the IO-threaded and single-threaded code paths for this code and more of the parse-lookup-prefetch-execute. I avoided refactoring to keep the diff small but I think it's necessary to at least rename some functions to keep this comprehensive.

@PingXie

PingXie commented May 29, 2025

Copy link
Copy Markdown
Member

Looking back at the original IO threading, we batch N keys, maybe we should change it to be batch N commands instead, since that will make sure we are fully parsing a command. That way we can probably more tightly unify the code.

I think we shall prefetch full commands, but we can still configure it as the number of keys. If we go over the number N when adding a command to the batch, then we can allow prefetching a few more, let's say up to N*2. That way we prefetch by default 16 keys, but allow prefetching up to 32 keys to stick to full commands.

my preference, for the sake of simplicity, is that we continue to count keys. as soon as we use up max_pretech_size we should just execute the command, irrespective of the number of the keys referenced by the command (e.g., mget) or the number of commands batached.

ultimately max_prefetch_size is determined by the L1 cache size. There is only so much we can prefetch. over-prefetching will lead to L1 cache thrashing, nullifying the effect.

the L1 data cache on Intel cpus is 32 KiB or 48 KiB, typically, with a cache line size of 64 bytes, which gives us 512 or 768 cache lines, respectively. I haven't counted carefully the number of memory accesses needed to prefetch a key-value pair of strings in the new hashtable but I guess it would be 5 (including both keys and hash table)? Assuming 5, prefetching 32 keys would consume 160 cache lines, which is not a trivial number compared to 512 already.

@secwall

secwall commented Jul 8, 2025

Copy link
Copy Markdown
Contributor

Hello, it seems that this pull request has some conflicts with the unstable branch. Do you need any assistance working on it? Merging it into 9.0 would be beneficial for many users (a lot of clients use pipelines automatically).

@zuiderkwast

Copy link
Copy Markdown
Contributor Author

After discussion with @PingXie and others, I wanted to rewrite it a bit and unify the prefetching code paths for io-threaded and single-threaded modes. I probably can't make it to 9.0 though.

How would folks (@PingXie, @madolson) feel about merging this more or less as is for 9.0 and improving it later?

@PingXie

PingXie commented Jul 9, 2025

Copy link
Copy Markdown
Member

How would folks (@PingXie, @madolson) feel about merging this more or less as is for 9.0 and improving it later?

works for me. Will review this pr next.

    parseCommand -> parseInputBuffer
    processMultibulkBuffer -> parseMultibulkBuffer
    processInlineBuffer -> parseInlineBuffer

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
@madolson

madolson commented Aug 19, 2025

Copy link
Copy Markdown
Member

We're getting de-sync errors in the test:

24786:S 19 Aug 2025 09:52:35.066 # Protocol error (Master using the inline protocol. Desync?) from client: id=32 addr=127.0.0.1:21116 laddr=127.0.0.1:61112 fd=25 name= age=11 idle=0 flags=M capa= db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=16010 qbuf-free=49520 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=67136 events=r cmd=set user=(superuser) redir=-1 resp=2 lib-name= lib-ver= tot-net-in=12800433 tot-net-out=451 tot-cmds=20579. Query buffer: '6ZJ}:226..*2..$6..UNLINK..$8..{6ZJ}:22..*2..$6..UNLINK..$8..{6ZJ' (... more 14880 bytes ...) '..*2..$6..UNLINK..$9..{6ZJ}:207..*2..$6..UNLINK..$9..{6ZJ}:280..'

Later in the day update:
The target eventually stops processing incoming requests, but it is not consistent. There doesn't seem to be anything non-deterministic about it that I can see outside of the speed of processing. @murphyjacob4 Since your back, if you have any thoughts as well for the test failures.

Just to comment since Viktor messaged me on slack. Apparently there is something getting incorrectly updated for replication offset. This will have impact for other deeply pipelined replication streams, but it looks like only the slot migration tests are triggering that.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
@zuiderkwast zuiderkwast merged commit 09fb436 into valkey-io:unstable Aug 22, 2025
56 of 57 checks passed
@github-project-automation github-project-automation Bot moved this from In Progress to Done in Valkey 9.0 Aug 22, 2025
@zuiderkwast zuiderkwast added the release-notes This issue should get a line item in the release notes label Aug 22, 2025
@murphyjacob4

Copy link
Copy Markdown
Contributor

Apparently there is something getting incorrectly updated for replication offset. This will have impact for other deeply pipelined replication streams, but it looks like only the slot migration tests are triggering that.

Since slot migration uses chaining replication, if the replication offset is wrong, what is sent to the replicas of the target would be wrong:

https://github.com/valkey-io/valkey/blob/unstable/src/networking.c#L3677-L3689

So that would make sense why we see the desync errors

@zuiderkwast zuiderkwast deleted the lookahead branch August 25, 2025 08:52
zuiderkwast added a commit that referenced this pull request Aug 27, 2025
Try to fix the failures seen for `test "PSYNC2 #3899 regression: verify
consistency"`.

This change resets the query buffer parser state in
`replicationCachePrimary()` which is called when the connection to the
primary is lost. Before #2092, this was done by `resetClient()`.

The solution was inspired by the discussion about the regression
mentioned (discussion from 2017) and the related commits from that time:
6bc6bd4,
469d6e2,
c180bc7.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
@enjoy-binbin enjoy-binbin moved this from Done to RC2 in Valkey 9.0 Aug 29, 2025
@madolson madolson moved this from RC2 to Done in Valkey 9.0 Sep 3, 2025
rjd15372 pushed a commit to rjd15372/valkey that referenced this pull request Sep 19, 2025
…key-io#2092)

Instead of parsing only one command per client before executing it,
parse multiple commands from the query buffer and batch-prefetch the
keys accessed by the commands in the queue before executing them.

This is an optimization for pipelined commands, both with and without
I/O threads. The optimization is currently disabled for the replication
stream, due to failures (probably caused by how the replication offset
is calculated based on the query buffer offset).

* When parsing commands from the input buffer, multiple commands are
parsed and stored in a command queue per client.
* In single-threaded mode (I/O threads off) keys are batch-prefetched
before the commands in the queue are executed. Multi-key commands like
MGET, MSET and DEL benefit from this even if pipelining is not used.
* Prefetching when I/O threads are used does prefetching for multiple
clients in parallel. This code takes client command queues into account,
improving prefetching when pipelining is used. The batch size is
controlled by the existing config `prefetch-batch-max-size` (default
16), which so far only was used together with I/O threads. The config is
moved to a different section in `valkey.conf`.
* When I/O threads are used and the maximum number of keys are
prefetched, a client's command is executed, then the next one in the
queue, etc. If there are more commands in the queue for which the keys
have not been prefetched (say the client sends 16 pipelined MGET with 16
keys in each) keys for the next few commands in the queue are prefetched
before the commands is executed if prefetching has not been done for the
next command. (This utilizes the code path used in single-threaded
mode.)

Code improvements:

* Decoupling of command parser state and command execution state:
  * The variables reqtype, multibulklen and bulklen refer to the current
    position in the query buffer. These are no longer reset in resetClient
    (which runs after each command being executed). Instead, they are
    reset in the parser code after each completely parsed command.
  * The command parser code is partially decoupled from the client struct.
    The query buffer is still one per client, but the resulting argument
    vector is stored in caller-defined variables.

Fixes valkey-io#2044

---------

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
rjd15372 pushed a commit to rjd15372/valkey that referenced this pull request Sep 19, 2025
Try to fix the failures seen for `test "PSYNC2 valkey-io#3899 regression: verify
consistency"`.

This change resets the query buffer parser state in
`replicationCachePrimary()` which is called when the connection to the
primary is lost. Before valkey-io#2092, this was done by `resetClient()`.

The solution was inspired by the discussion about the regression
mentioned (discussion from 2017) and the related commits from that time:
6bc6bd4,
469d6e2,
c180bc7.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
rjd15372 pushed a commit that referenced this pull request Sep 23, 2025
Instead of parsing only one command per client before executing it,
parse multiple commands from the query buffer and batch-prefetch the
keys accessed by the commands in the queue before executing them.

This is an optimization for pipelined commands, both with and without
I/O threads. The optimization is currently disabled for the replication
stream, due to failures (probably caused by how the replication offset
is calculated based on the query buffer offset).

* When parsing commands from the input buffer, multiple commands are
parsed and stored in a command queue per client.
* In single-threaded mode (I/O threads off) keys are batch-prefetched
before the commands in the queue are executed. Multi-key commands like
MGET, MSET and DEL benefit from this even if pipelining is not used.
* Prefetching when I/O threads are used does prefetching for multiple
clients in parallel. This code takes client command queues into account,
improving prefetching when pipelining is used. The batch size is
controlled by the existing config `prefetch-batch-max-size` (default
16), which so far only was used together with I/O threads. The config is
moved to a different section in `valkey.conf`.
* When I/O threads are used and the maximum number of keys are
prefetched, a client's command is executed, then the next one in the
queue, etc. If there are more commands in the queue for which the keys
have not been prefetched (say the client sends 16 pipelined MGET with 16
keys in each) keys for the next few commands in the queue are prefetched
before the commands is executed if prefetching has not been done for the
next command. (This utilizes the code path used in single-threaded
mode.)

Code improvements:

* Decoupling of command parser state and command execution state:
  * The variables reqtype, multibulklen and bulklen refer to the current
    position in the query buffer. These are no longer reset in resetClient
    (which runs after each command being executed). Instead, they are
    reset in the parser code after each completely parsed command.
  * The command parser code is partially decoupled from the client struct.
    The query buffer is still one per client, but the resulting argument
    vector is stored in caller-defined variables.

Fixes #2044

---------

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
rjd15372 pushed a commit that referenced this pull request Sep 23, 2025
Try to fix the failures seen for `test "PSYNC2 #3899 regression: verify
consistency"`.

This change resets the query buffer parser state in
`replicationCachePrimary()` which is called when the connection to the
primary is lost. Before #2092, this was done by `resetClient()`.

The solution was inspired by the discussion about the regression
mentioned (discussion from 2017) and the related commits from that time:
6bc6bd4,
469d6e2,
c180bc7.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
hpatro pushed a commit to hpatro/valkey that referenced this pull request Oct 3, 2025
…key-io#2092)

Instead of parsing only one command per client before executing it,
parse multiple commands from the query buffer and batch-prefetch the
keys accessed by the commands in the queue before executing them.

This is an optimization for pipelined commands, both with and without
I/O threads. The optimization is currently disabled for the replication
stream, due to failures (probably caused by how the replication offset
is calculated based on the query buffer offset).

* When parsing commands from the input buffer, multiple commands are
parsed and stored in a command queue per client.
* In single-threaded mode (I/O threads off) keys are batch-prefetched
before the commands in the queue are executed. Multi-key commands like
MGET, MSET and DEL benefit from this even if pipelining is not used.
* Prefetching when I/O threads are used does prefetching for multiple
clients in parallel. This code takes client command queues into account,
improving prefetching when pipelining is used. The batch size is
controlled by the existing config `prefetch-batch-max-size` (default
16), which so far only was used together with I/O threads. The config is
moved to a different section in `valkey.conf`.
* When I/O threads are used and the maximum number of keys are
prefetched, a client's command is executed, then the next one in the
queue, etc. If there are more commands in the queue for which the keys
have not been prefetched (say the client sends 16 pipelined MGET with 16
keys in each) keys for the next few commands in the queue are prefetched
before the commands is executed if prefetching has not been done for the
next command. (This utilizes the code path used in single-threaded
mode.)

Code improvements:

* Decoupling of command parser state and command execution state:
  * The variables reqtype, multibulklen and bulklen refer to the current
    position in the query buffer. These are no longer reset in resetClient
    (which runs after each command being executed). Instead, they are
    reset in the parser code after each completely parsed command.
  * The command parser code is partially decoupled from the client struct.
    The query buffer is still one per client, but the resulting argument
    vector is stored in caller-defined variables.

Fixes valkey-io#2044

---------

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
hpatro pushed a commit to hpatro/valkey that referenced this pull request Oct 3, 2025
Try to fix the failures seen for `test "PSYNC2 valkey-io#3899 regression: verify
consistency"`.

This change resets the query buffer parser state in
`replicationCachePrimary()` which is called when the connection to the
primary is lost. Before valkey-io#2092, this was done by `resetClient()`.

The solution was inspired by the discussion about the regression
mentioned (discussion from 2017) and the related commits from that time:
6bc6bd4,
469d6e2,
c180bc7.

Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
enjoy-binbin added a commit that referenced this pull request Jun 13, 2026
Multi-command parsing in parseMultibulkBuffer (introduced #2092) was
disabled for replicated clients because the per-command replication
offset relied on c->qb_pos as the right boundary of the just-applied
command. Replication stream is actually a big pipeline, so if it can
be supported, the processing speed of the replica can be improved.

Decouple parsing position from application position by introducing a
single per-client field that tracks the currently processed command's
right boundary in querybuf:

client.qb_applied: right boundary of the current command in
querybuf. Set by the parsers (for c->argv, = c->qb_pos) and
advanced incrementally by consumeCommandQueue() using each queued
command's input_bytes (qb_applied += p->input_bytes).
parsedCommand.input_bytes already records the number of querybuf bytes
consumed to parse one command (multibulk header + each bulk-length
line + arg bytes + per-arg CRLFs), see parseMultibulk for mode details.
It is a relative quantity, independent of where querybuf has been trimmed
since parsing, so it is exactly what's needed to advance qb_applied across
multi-command parsing, so no extra per-command snapshot field is required.

commandProcessed() now uses c->qb_applied instead of c->qb_pos to
advance reploff by exactly the bytes of the just-applied command.
beforeNextClient shifts only c->qb_applied (a single variable) when
the replicated client's querybuf is trimmed; pending queue entries
carry only relative input_bytes and therefore need no fix-up.

qb_applied is populated unconditionally on the first-command path so
that a client transitioning to replicated mid-command (e.g.
SYNCSLOTS ESTABLISH installs slot_migration_job inside its own handler)
still has a valid value when commandProcessed() runs.

Signed-off-by: Binbin <binloveplay1314@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-notes This issue should get a line item in the release notes run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP)

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

Parse multiple commands (look ahead)

9 participants