Skip to content

Commit 072d8d8

Browse files
committed
chore(txPipeline): refactor slottedCommands impl
1 parent 50d1484 commit 072d8d8

File tree

1 file changed

+52
-53
lines changed

1 file changed

+52
-53
lines changed

osscluster.go

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,79 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15261526
return err
15271527
}
15281528

1529-
cmdsMap := map[int][]Cmder{}
1529+
slottedCmds := c.slottedCommands(cmds)
15301530
slot := -1
1531-
// get only the keyed commands
1532-
keyedCmds := c.keyedCmds(cmds)
1533-
if len(keyedCmds) == 0 {
1534-
// no keyed commands try random slot
1531+
switch len(slottedCmds) {
1532+
case 0:
15351533
slot = hashtag.RandomSlot()
1536-
} else {
1537-
// keyed commands, get slot from them
1538-
// if more than one slot, return cross slot error
1539-
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
1540-
if len(cmdsBySlot) > 1 {
1541-
// cross slot error, we have more than one slot for keyed commands
1542-
setCmdsErr(cmds, ErrCrossSlot)
1543-
return ErrCrossSlot
1544-
}
1545-
// get the slot, should be only one
1546-
for sl := range cmdsBySlot {
1534+
case 1:
1535+
for sl := range slottedCmds {
15471536
slot = sl
15481537
break
15491538
}
1550-
}
1551-
// slot was not determined, try random one
1552-
if slot == -1 {
1553-
slot = hashtag.RandomSlot()
1554-
}
1555-
cmdsMap[slot] = cmds
1556-
1557-
// TxPipeline does not support cross slot transaction.
1558-
// double check the commands are in the same slot
1559-
if len(cmdsMap) > 1 {
1539+
default:
1540+
// TxPipeline does not support cross slot transaction.
15601541
setCmdsErr(cmds, ErrCrossSlot)
15611542
return ErrCrossSlot
15621543
}
15631544

1564-
for slot, cmds := range cmdsMap {
1565-
node, err := state.slotMasterNode(slot)
1566-
if err != nil {
1567-
setCmdsErr(cmds, err)
1568-
continue
1569-
}
1545+
node, err := state.slotMasterNode(slot)
1546+
if err != nil {
1547+
setCmdsErr(cmds, err)
1548+
return err
1549+
}
15701550

1571-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1572-
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1573-
if attempt > 0 {
1574-
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1575-
setCmdsErr(cmds, err)
1576-
return err
1577-
}
1551+
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1552+
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1553+
if attempt > 0 {
1554+
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1555+
setCmdsErr(cmds, err)
1556+
return err
15781557
}
1558+
}
15791559

1580-
failedCmds := newCmdsMap()
1581-
var wg sync.WaitGroup
1560+
failedCmds := newCmdsMap()
1561+
var wg sync.WaitGroup
15821562

1583-
for node, cmds := range cmdsMap {
1584-
wg.Add(1)
1585-
go func(node *clusterNode, cmds []Cmder) {
1586-
defer wg.Done()
1587-
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1588-
}(node, cmds)
1589-
}
1563+
for node, cmds := range cmdsMap {
1564+
wg.Add(1)
1565+
go func(node *clusterNode, cmds []Cmder) {
1566+
defer wg.Done()
1567+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1568+
}(node, cmds)
1569+
}
15901570

1591-
wg.Wait()
1592-
if len(failedCmds.m) == 0 {
1593-
break
1594-
}
1595-
cmdsMap = failedCmds.m
1571+
wg.Wait()
1572+
if len(failedCmds.m) == 0 {
1573+
break
15961574
}
1575+
cmdsMap = failedCmds.m
15971576
}
15981577

15991578
return cmdsFirstErr(cmds)
16001579
}
16011580

1581+
func (c *ClusterClient) slottedCommands(cmds []Cmder) map[int][]Cmder {
1582+
cmdsSlots := map[int][]Cmder{}
1583+
1584+
prefferedRandomSlot := -1
1585+
for _, cmd := range cmds {
1586+
if cmdFirstKeyPos(cmd) == 0 {
1587+
continue
1588+
}
1589+
1590+
slot := c.cmdSlot(cmd, prefferedRandomSlot)
1591+
if prefferedRandomSlot == -1 {
1592+
prefferedRandomSlot = slot
1593+
}
1594+
1595+
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
1596+
}
1597+
1598+
return cmdsSlots
1599+
}
1600+
16021601
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
16031602
cmdsMap := make(map[int][]Cmder)
16041603
preferredRandomSlot := -1

0 commit comments

Comments
 (0)