[dogfood] Enable multi-command parsing for replicated clients (mirrors upstream #3597)#7
[dogfood] Enable multi-command parsing for replicated clients (mirrors upstream #3597)#7avifenesh wants to merge 2 commits into
Conversation
Multi-command parsing in parseMultibulkBuffer (introduced valkey-io#2092) was disabled for replicated clients because the per-command replication offset relied on c->qb_pos as the right boundary of the just-applied command. Replication stream is actually a big pipeline, so if it can be supported, the processing speed of the replica can be improved. Decouple parsing position from application position by recording, for every parsed command, the qb_pos snapshot taken right after that command finished parsing: - parsedCommand.qb_end_pos: snapshot stored in the queue entry, set by parseMultibulkBuffer when a command is pushed into cmd_queue. - client.qb_applied: snapshot of the command currently being processed, set by the parsers (for c->argv) and by consumeCommandQueue (when popping a queued command). commandProcessed() now uses c->qb_applied instead of c->qb_pos to advance reploff by exactly the bytes of the just-applied command. beforeNextClient shifts qb_end_pos of pending queue entries when the replicated client's querybuf is trimmed, keeping subsequent reploff updates consistent. Both fields are populated unconditionally so that a client transitioning to replicated mid-command (e.g. SYNCSLOTS ESTABLISH installs slot_migration_job inside its own handler) still has a valid value when commandProcessed() runs. Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Binbin <binloveplay1314@qq.com>
There was a problem hiding this comment.
Summary
Intent: Re-enables multi-command parsing for replicated clients (which was disabled in valkey-io#2092 because the per-command replication offset was derived from c->qb_pos, and pipelined parsing makes qb_pos run ahead of the command currently being applied). The PR decouples parse position from apply position by snapshotting qb_pos per parsed command (parsedCommand.qb_end_pos) and into the client (client.qb_applied), and making commandProcessed() use the latter when updating reploff. A shift loop in beforeNextClient keeps queued snapshots consistent when the querybuf is trimmed. This should measurably improve replica apply throughput under pipelined replication.
Public API / breaking-change impact: None — both new fields are internal to client/parsedCommand. No config, no command, no wire-protocol changes. Added +16 bytes per client on 64-bit (one size_t in client + one per queued parsedCommand); acceptable.
RDB/AOF safety: Not affected directly. Replication offset accounting is affected; the new tests cover chained replication (sub-replica) convergence, which is the most sensitive downstream of reploff.
Concerns raised inline:
consumeCommandQueueunconditionally copiesqb_end_posintoqb_applied— the "0 means not completed" sentinel documented inserver.his not defensively handled here and could silently rewindreploffif the invariant is ever broken.- The
qb_end_pos == 0skip inbeforeNextClient's shift loop may be dead code; worth an invariant comment or an assert. c->qb_applieditself is not shifted inbeforeNextClient; relies on lifetime assumptions that should be documented.- Tests only assert final convergence; they'd pass under the old (disabled) code too. Adding mid-pipeline
reploffassertions or forcing a querybuf trim would more directly exercise the new bookkeeping. - Sentinel value
0forqb_end_posis fragile; document or use a clearer sentinel.
Verdict: Approach is sound and the change is localized. No blocking correctness issues found, but I'd like to see the consumeCommandQueue sentinel handling clarified (either asserted or handled) and the test coverage strengthened to actually regress-guard the specific bug being fixed. Otherwise LGTM pending those clarifications.
| c->net_input_bytes_curr_cmd = p->input_bytes; | ||
| c->parsed_cmd = p->cmd; | ||
| c->slot = p->slot; | ||
| /* Restore qb_pos snapshot so commandProcessed() can update reploff precisely. */ |
There was a problem hiding this comment.
consumeCommandQueue() unconditionally copies p->qb_end_pos into c->qb_applied, but the struct-field comment in server.h says qb_end_pos is "Set to 0 if parsing wasn't completed." If an entry with incomplete parsing is ever popped here (e.g. an error/short-read path that still pushes an entry), c->qb_applied would become 0 and commandProcessed() would then compute reploff = read_reploff - sdslen(querybuf) + 0, grossly rewinding the replication offset.
Looking at the current parser, entries are only pushed after parseMultibulk returns, and entries with a parse error still have a meaningful c->qb_pos snapshot — but then the invariant is really "qb_end_pos is always set to a valid snapshot for any entry that reaches consumeCommandQueue", which contradicts the header comment. Please either:
- Assert
p->qb_end_pos != 0here (and update the comment), or - Handle the
qb_end_pos == 0case explicitly (e.g. skip the reploff update).
Otherwise the "Set to 0 if parsing wasn't completed" branch documented in server.h is a latent correctness trap for replicated clients.
| for (uint16_t i = queue->off; i < queue->len; i++) { | ||
| /* Skip entries with qb_end_pos == 0 (parsing not completed) */ | ||
| if (queue->cmds[i].qb_end_pos == 0) continue; | ||
|
|
There was a problem hiding this comment.
Two concerns about the shift loop:
-
The
queue->cmds[i].qb_end_pos == 0skip is arguably unreachable — any successfully parsed command consumes at least a few bytes soc->qb_pos > 0when the snapshot is taken, and the queue is cleared when fully consumed. But if it is reachable (partially parsed trailing entry), then not shifting it means when it later getsqb_end_posfilled in (by a subsequent parse pass using the already-trimmed querybuf), the value will be consistent. Please confirm this invariant — i.e. that trailing not-yet-completed entries never pre-existed in the queue with a non-zeroqb_end_posthat would need shifting. A comment stating this invariant would help. -
c->qb_applieditself (for the command currently being executed incommandProcessed) is not shifted here. For replicated clients,beforeNextClientruns between commands, soqb_appliedfrom the previously-finished command is stale but unused by the time we trim. However, if any code path readsqb_appliedafterbeforeNextClienttrims and before the nextconsumeCommandQueue/parser runs, it will be off byrepl_applied. Worth a brief comment confirming the lifetime, or explicitly resettingc->qb_appliedhere to avoid action-at-a-distance bugs later.
| &c->argv_len_sum, &c->net_input_bytes_curr_cmd); | ||
| c->read_flags |= flag; | ||
|
|
||
| /* Record qb_pos so commandProcessed() can update reploff precisely. */ |
There was a problem hiding this comment.
Setting c->qb_applied = c->qb_pos only when READ_FLAGS_PARSING_COMPLETED is set is correct for the common case, but note: before this change, replicated clients returned early just below and never reached the pipelined loop. Now they fall through. If the first parseMultibulk at the top of the function did not complete (short read), c->qb_applied retains its previous value (from the last successfully parsed command). That's fine if commandProcessed() is never called in that iteration — which is enforced by READ_FLAGS_PARSING_COMPLETED gating in processInputBuffer. Please double-check and consider a short comment asserting that invariant, since the correctness of the reploff calculation for replicated clients now depends on it.
| } | ||
| assert_primary_replica_consistent $primary $replica | ||
| $rd close | ||
| } |
There was a problem hiding this comment.
These tests only assert final convergence (digest/dbsize/reploff after everything is applied). They would also pass if multi-command parsing were still disabled, since the old single-command path was already correct at steady state. To specifically exercise the new qb_end_pos / qb_applied bookkeeping, consider adding an assertion that the replica's master_repl_offset advances monotonically per command (e.g. via DEBUG SLEEP interleaved in the pipeline and checking INFO replication master_repl_offset mid-pipeline), or a test that forces a querybuf trim (large pipeline > PROTO_INLINE_MAX_SIZE/PROTO_MBULK_BIG_ARG threshold) to exercise the new beforeNextClient shift loop. The PROTO_MBULK_BIG_ARG test does exercise the trim path indirectly, but an explicit assertion that the reploff at several intermediate points matches primary would strengthen coverage of the regression this PR targets.
| size_t qb_end_pos; /* querybuf position right after this command (snapshot | ||
| * of c->qb_pos taken when the parser finished parsing | ||
| * this command). When this command is popped from the | ||
| * queue, this value is copied into client->qb_applied |
There was a problem hiding this comment.
The "Set to 0 if parsing wasn't completed" sentence implies 0 is a valid "unset" marker, but 0 is technically also a valid qb_pos snapshot (empty buffer). In practice no command completes at qb_pos==0, so the sentinel works — but it's fragile. Consider either documenting that "no successfully parsed command ever ends at qb_pos==0, hence 0 is safe as an unset sentinel", or use an explicit boolean / SIZE_MAX sentinel. As-is, the value is only ever read (in consumeCommandQueue) for entries that reached READ_FLAGS_PARSING_COMPLETED, so correctness hinges on that invariant holding forever.
There was a problem hiding this comment.
Code Review
This pull request enables multi-command parsing for replicated clients by introducing precise tracking of the query buffer position for each command. By adding qb_applied to the client structure and qb_end_pos to the parsed command structure, the replication offset is now updated correctly even when multiple commands are parsed in a single batch. The changes also include logic to maintain these positions during query buffer trimming and new integration tests to ensure replication consistency. I have no feedback to provide as there were no review comments.
Dogfood mirror of valkey-io/valkey#3597.
Tests cairn's PR-review pipeline against upstream Valkey review traffic.
Upstream PR body
Multi-command parsing in parseMultibulkBuffer (introduced valkey-io#2092) was
disabled for replicated clients because the per-command replication
offset relied on c->qb_pos as the right boundary of the just-applied
command. Replication stream is actually a big pipeline, so if it can
be supported, the processing speed of the replica can be improved.
Decouple parsing position from application position by recording, for
every parsed command, the qb_pos snapshot taken right after that
command finished parsing:
by parseMultibulkBuffer when a command is pushed into cmd_queue.
processed, set by the parsers (for c->argv) and by
consumeCommandQueue (when popping a queued command).
commandProcessed() now uses c->qb_applied instead of c->qb_pos to
advance reploff by exactly the bytes of the just-applied command.
beforeNextClient shifts qb_end_pos of pending queue entries when the
replicated client's querybuf is trimmed, keeping subsequent reploff
updates consistent.
Both fields are populated unconditionally so that a client transitioning
to replicated mid-command (e.g. SYNCSLOTS ESTABLISH installs slot_migration_job
inside its own handler) still has a valid value when commandProcessed() runs.