Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/cluster_migrateslots.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,9 @@ slotMigrationJob *createSlotImportJob(client *c,
if (!job->client->querybuf) {
job->client->querybuf = generateSyncSlotsEstablishCommand(job);
job->client->qb_pos = sdslen(job->client->querybuf);
/* The backfilled ESTABLISH command is already applied, so qb_applied
* must match qb_pos for commandProcessed() to advance reploff. */
job->client->qb_applied = job->client->qb_pos;
}
job->client->repl_data->read_reploff = sdslen(job->client->querybuf);

Expand Down
36 changes: 27 additions & 9 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ client *createClient(connection *conn) {
c->buf_peak = c->buf_usable_size;
c->buf_peak_last_reset_time = server.unixtime;
c->qb_pos = 0;
c->qb_applied = 0;
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
Expand Down Expand Up @@ -2278,6 +2279,7 @@ void resetSharedQueryBuf(client *c) {
c->querybuf = NULL;
sdsclear(thread_shared_qb);
c->qb_pos = 0;
c->qb_applied = 0;
}

/* Trims the client query buffer to the current position. */
Expand All @@ -2295,6 +2297,7 @@ void trimClientQueryBuffer(client *c) {
if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
c->qb_applied = 0;
}
}

Expand Down Expand Up @@ -2325,6 +2328,11 @@ void beforeNextClient(client *c) {
if (c->repl_data->repl_applied) {
sdsrange(c->querybuf, c->repl_data->repl_applied, -1);
c->qb_pos -= c->repl_data->repl_applied;
/* qb_applied is an absolute offset, shift it together with the trimmed
* querybuf. commandProcessed() advances repl_applied to qb_applied after
* each command, so qb_applied >= repl_applied should always true here. */
serverAssert(c->qb_applied >= (size_t)c->repl_data->repl_applied);
c->qb_applied -= c->repl_data->repl_applied;
c->repl_data->repl_applied = 0;
}
} else {
Expand Down Expand Up @@ -3506,6 +3514,8 @@ void parseInlineBuffer(client *c) {
* */
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
/* Record qb_pos so commandProcessed() can update reploff precisely. */
c->qb_applied = c->qb_pos;
c->reqtype = 0;
}

Expand Down Expand Up @@ -3559,19 +3569,17 @@ void parseMultibulkBuffer(client *c) {
&c->argv_len_sum, &c->net_input_bytes_curr_cmd);
c->read_flags |= flag;

/* Record qb_pos for commandProcessed(). Written unconditionally because a
* client may become replicated mid-command (e.g. CLUSTER SYNCSLOTS ESTABLISH
* installs slot_migration_job inside its own handler). */
if (c->read_flags & READ_FLAGS_PARSING_COMPLETED) c->qb_applied = c->qb_pos;

if (c->read_flags & READ_FLAGS_AUTH_REQUIRED) {
/* Execute client's AUTH command before parsing more, because it affects
* parser limits for max allowed bulk and multibulk lengths. */
return;
}

if (isReplicatedClient(c)) {
/* TODO: some change is required for replication offset which is
* computed from c->qb_pos, assuming we only parse one command at a
* time. Disable multi-command parsing for replication for now. */
return;
}

/* Try parsing pipelined commands. */
cmdQueue *queue = &c->cmd_queue;
serverAssert(queue->len == 0);
Expand Down Expand Up @@ -3760,6 +3768,7 @@ static int parseMultibulk(client *c,
}
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
c->qb_applied = 0;
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, ll + 2 - sdslen(c->querybuf));
Expand Down Expand Up @@ -3846,8 +3855,16 @@ void commandProcessed(client *c) {

long long prev_offset = c->repl_data->reploff;
if (isReplicatedClient(c) && !c->flag.multi) {
/* Update the applied replication offset of our primary. */
c->repl_data->reploff = c->repl_data->read_reploff - sdslen(c->querybuf) + c->qb_pos;
/* Advance reploff by exactly this command's bytes. Use qb_applied
* (current command's right boundary) instead of qb_pos, which may
* have run ahead due to multi-command parsing. */
c->repl_data->reploff = c->repl_data->read_reploff - sdslen(c->querybuf) + c->qb_applied;
/* Every applied non-MULTI command should consume >0 bytes from the
* replication stream, so reploff must strictly advance. A no-op
* advance means qb_applied was not maintained for the command we
* just processed (e.g. a command was backfilled into querybuf without
* updating qb_applied). */
serverAssert(c->repl_data->reploff > prev_offset);
}

/* If the client is replicated we need to compute the difference
Expand Down Expand Up @@ -4023,6 +4040,7 @@ static bool consumeCommandQueue(client *c) {
c->net_input_bytes_curr_cmd = p->input_bytes;
c->parsed_cmd = p->cmd;
c->slot = p->slot;
c->qb_applied += p->input_bytes;
if (queue->off == queue->len) {
/* The queue is empty. Don't free it here, because if parsing is done in
* I/O threads, we want to free it in I/O threads too, to avoid
Expand Down
1 change: 1 addition & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4764,6 +4764,7 @@ void replicationCachePrimary(client *c) {
* pending outputs to the primary. */
sdsclear(server.primary->querybuf);
server.primary->qb_pos = 0;
server.primary->qb_applied = 0;
Comment thread
enjoy-binbin marked this conversation as resolved.
server.primary->repl_data->repl_applied = 0;
server.primary->repl_data->read_reploff = server.primary->repl_data->reploff;
if (c->flag.multi) discardTransaction(c);
Expand Down
4 changes: 4 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,10 @@ typedef struct client {
/* Input buffer and command parsing fields */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
size_t qb_applied; /* Right boundary of the *current* command in querybuf.
* qb_pos may run ahead due to multi-command parsing, so
* we use qb_applied (replicated clients only) to advance
* reploff by exactly this command's bytes. */
robj **argv; /* Arguments of current command. */
int argc; /* Num of arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
Expand Down
215 changes: 215 additions & 0 deletions tests/integration/replication-multi-cmd.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Tests for multi-command parsing on the replication stream.
# Verifies that pipelined commands replicated to a replica are parsed in
# batches into c->cmd_queue while keeping per-command qb_applied tracking
# (advanced by parsedCommand.input_bytes), so the replication offset stays
# exact and chained replication converges.

# Assert primary and replica converge: same digest, dbsize, reploff.
proc assert_primary_replica_consistent {primary replica} {
wait_for_ofs_sync $primary $replica
assert_equal [$primary debug digest] [$replica debug digest]
assert_equal [$primary dbsize] [$replica dbsize]
Comment thread
enjoy-binbin marked this conversation as resolved.
}

# Pipeline correctness on a single primary/replica pair.
start_server {tags {"repl external:skip"}} {
start_server {} {
set primary [srv -1 client]
set primary_host [srv -1 host]
set primary_port [srv -1 port]
set replica [srv 0 client]

$replica replicaof $primary_host $primary_port
wait_for_sync $replica
wait_for_ofs_sync $primary $replica

test {Multi-command parsing: large SET pipeline keeps reploff exact} {
set rd [valkey_deferring_client -1]
for {set i 0} {$i < 500} {incr i} {
$rd set key1 value1
$rd set key2 value2
}
for {set i 0} {$i < 1000} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
$rd close
}

test {Multi-command parsing: MULTI/EXEC across pipelined batch} {
set rd [valkey_deferring_client -1]
for {set i 0} {$i < 200} {incr i} {
$rd multi
$rd set key1 value1
$rd set key2 value2
$rd set key3 value3
$rd exec
}
for {set i 0} {$i < 1000} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
$rd close
}

test {Multi-command parsing: PROTO_MBULK_BIG_ARG in pipeline} {
set rd [valkey_deferring_client -1]
set big_string [string repeat X 100000]
for {set i 0} {$i < 50} {incr i} {
$rd set key1 $big_string
$rd set key2 $big_string
}
for {set i 0} {$i < 100} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
$rd close
}

test {Multi-command parsing: pipeline survives PSYNC reconnect} {
set rd [valkey_deferring_client -1]
for {set i 0} {$i < 200} {incr i} {
$rd set kx$i vx$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

# Force replica to drop the primary connection.
$replica client kill type primary
wait_for_sync $replica

# Push another batch of pipelined writes after the reconnect.
for {set i 0} {$i < 200} {incr i} {
$rd set ky$i vy$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

assert_primary_replica_consistent $primary $replica
$rd close
}
}
}

# Chained replication (subreplica -> replica -> primary).
# Verifies the replica forwards the exact byte stream to its own replicas.
start_server {tags {"repl external:skip"}} {
start_server {} {
start_server {} {
set primary [srv -2 client]
set primary_host [srv -2 host]
set primary_port [srv -2 port]
set replica [srv -1 client]
set replica_host [srv -1 host]
set replica_port [srv -1 port]
set subreplica [srv 0 client]

$replica replicaof $primary_host $primary_port
$subreplica replicaof $replica_host $replica_port
wait_for_sync $replica
wait_for_sync $subreplica
wait_for_ofs_sync $primary $replica
wait_for_ofs_sync $replica $subreplica

test {Chained replication: pipelined SETs converge on all 3 nodes} {
set rd [valkey_deferring_client -2]
for {set i 0} {$i < 500} {incr i} {
$rd set key1 value1
$rd set key2 value2
}
for {set i 0} {$i < 1000} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
assert_primary_replica_consistent $replica $subreplica
$rd close
}

test {Chained replication: MULTI/EXEC propagates correctly} {
set rd [valkey_deferring_client -2]
for {set i 0} {$i < 200} {incr i} {
$rd multi
$rd set key1 value1
$rd set key2 value2
$rd set key3 value3
$rd exec
}
for {set i 0} {$i < 1000} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
assert_primary_replica_consistent $replica $subreplica
$rd close
}

test {Chained replication: PROTO_MBULK_BIG_ARG in pipeline} {
set rd [valkey_deferring_client -2]
set big_string [string repeat X 100000]
for {set i 0} {$i < 50} {incr i} {
$rd set key1 $big_string
$rd set key2 $big_string
}
for {set i 0} {$i < 100} {incr i} {
$rd read
}
assert_primary_replica_consistent $primary $replica
assert_primary_replica_consistent $replica $subreplica
$rd close
}

test {Chained replication: pipeline survives PSYNC reconnect} {
set rd [valkey_deferring_client -2]
for {set i 0} {$i < 200} {incr i} {
$rd set kx$i vx$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

# Force replica to drop the primary connection.
$replica client kill type primary
wait_for_sync $replica

# Push another batch of pipelined writes after the reconnect.
for {set i 0} {$i < 200} {incr i} {
$rd set ky$i vy$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

assert_primary_replica_consistent $primary $replica
assert_primary_replica_consistent $replica $subreplica
$rd close
}

test {Chained replication: pipeline survives subreplica<-replica reconnect} {
set rd [valkey_deferring_client -2]
for {set i 0} {$i < 200} {incr i} {
$rd set kp$i vp$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

# Force replica to drop the primary connection.
$subreplica client kill type primary
Comment thread
enjoy-binbin marked this conversation as resolved.
wait_for_sync $subreplica

# Push another batch of pipelined writes after the reconnect.
for {set i 0} {$i < 200} {incr i} {
$rd set kq$i vq$i
}
for {set i 0} {$i < 200} {incr i} {
$rd read
}

assert_primary_replica_consistent $primary $replica
assert_primary_replica_consistent $replica $subreplica
$rd close
}
}
}
}
Loading