diff --git a/redis/client.py b/redis/client.py index d3d8fd9ccd..815fdc63ac 100755 --- a/redis/client.py +++ b/redis/client.py @@ -4,6 +4,7 @@ import threading import time import warnings +from collections import defaultdict from itertools import chain from typing import Optional @@ -14,6 +15,7 @@ list_or_args, ) from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection +from redis.crc import key_slot from redis.credentials import CredentialProvider from redis.exceptions import ( ConnectionError, @@ -1447,11 +1449,14 @@ def on_connect(self, connection): } self.psubscribe(**patterns) if self.shard_channels: - shard_channels = { - self.encoder.decode(k, force=True): v - for k, v in self.shard_channels.items() - } - self.ssubscribe(**shard_channels) + channels_by_slot = defaultdict(dict) + for k, v in self.shard_channels.items(): + key = self.encoder.decode(k, force=True) + slot = key_slot(self.encoder.encode(key)) + channels_by_slot[slot][key] = v + + for slot, channels in channels_by_slot.items(): + self.ssubscribe(**channels) @property def subscribed(self): @@ -1672,8 +1677,8 @@ def ssubscribe(self, *args, target_node=None, **kwargs): args = list_or_args(args[0], args[1:]) new_s_channels = dict.fromkeys(args) new_s_channels.update(kwargs) - for channel in new_s_channels: # We should send ssubscribe one by one on redis cluster to prevent CROSSSLOT error - self.execute_command("SSUBSCRIBE", channel) + ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) + # update the s_channels dict AFTER we send the command. we don't want to # subscribe twice to these channels, once for the command and again # for the reconnection. @@ -1685,6 +1690,7 @@ def ssubscribe(self, *args, target_node=None, **kwargs): # Clear the health check counter self.health_check_response_counter = 0 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) + return ret_val def sunsubscribe(self, *args, target_node=None): """