diff --git a/docs/clustering.rst b/docs/clustering.rst index cf257d8ad5..3c28b9ee16 100644 --- a/docs/clustering.rst +++ b/docs/clustering.rst @@ -187,8 +187,8 @@ When a ClusterPubSub instance is created without specifying a node, a single node will be transparently chosen for the pubsub connection on the first command execution. The node will be determined by: 1. Hashing the channel name in the request to find its keyslot 2. Selecting a node -that handles the keyslot: If read_from_replicas is set to true, a -replica can be selected. +that handles the keyslot: If read_from_replicas is set to true or +load_balancing_strategy is provided, a replica can be selected. Known PubSub Limitations ------------------------ @@ -216,9 +216,12 @@ By default, Redis Cluster always returns MOVE redirection response on accessing a replica node. You can overcome this limitation and scale read commands by triggering READONLY mode. -To enable READONLY mode pass read_from_replicas=True to RedisCluster -constructor. When set to true, read commands will be assigned between +To enable READONLY mode pass read_from_replicas=True or define +a load_balancing_strategy to RedisCluster constructor. +When read_from_replicas is set to true read commands will be assigned between the primary and its replications in a Round-Robin manner. +With load_balancing_strategy you can define a custom strategy for +assigning read commands to the replicas and primary nodes. READONLY mode can be set at runtime by calling the readonly() method with target_nodes=‘replicas’, and read-write access can be restored by diff --git a/docs/retry.rst b/docs/retry.rst index acf198ec94..0f2e318022 100644 --- a/docs/retry.rst +++ b/docs/retry.rst @@ -13,25 +13,25 @@ Retry in Redis Standalone >>> from redis.client import Redis >>> from redis.exceptions import ( >>> BusyLoadingError, ->>> ConnectionError, ->>> TimeoutError +>>> RedisError, >>> ) >>> >>> # Run 3 retries with exponential backoff strategy >>> retry = Retry(ExponentialBackoff(), 3) ->>> # Redis client with retries on custom errors ->>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, ConnectionError, TimeoutError]) ->>> # Redis client with retries on TimeoutError only ->>> r_only_timeout = Redis(host='localhost', port=6379, retry=retry, retry_on_timeout=True) +>>> # Redis client with retries on custom errors in addition to the errors +>>> # that are already retried by default +>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, RedisError]) -As you can see from the example above, Redis client supports 3 parameters to configure the retry behaviour: +As you can see from the example above, Redis client supports 2 parameters to configure the retry behaviour: * ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries -* ``retry_on_error``: list of :ref:`exceptions-label` to retry on -* ``retry_on_timeout``: if ``True``, retry on :class:`~.TimeoutError` only + * The :class:`~.Retry` instance has default set of :ref:`exceptions-label` to retry on, + which can be overridden by passing a tuple with :ref:`exceptions-label` to the ``supported_errors`` parameter. +* ``retry_on_error``: list of additional :ref:`exceptions-label` to retry on -If either ``retry_on_error`` or ``retry_on_timeout`` are passed and no ``retry`` is given, -by default it uses a ``Retry(NoBackoff(), 1)`` (meaning 1 retry right after the first failure). + +If no ``retry`` is provided, a default one is created with :class:`~.ExponentialWithJitterBackoff` as backoff strategy +and 3 retries. Retry in Redis Cluster @@ -44,12 +44,18 @@ Retry in Redis Cluster >>> # Run 3 retries with exponential backoff strategy >>> retry = Retry(ExponentialBackoff(), 3) >>> # Redis Cluster client with retries ->>> rc = RedisCluster(host='localhost', port=6379, retry=retry, cluster_error_retry_attempts=2) +>>> rc = RedisCluster(host='localhost', port=6379, retry=retry) Retry behaviour in Redis Cluster is a little bit different from Standalone: -* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(NoBackoff(), 0)`` -* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered, default value is ``3`` +* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(ExponentialWithJitterBackoff(base=1, cap=10), cluster_error_retry_attempts)`` +* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError`, :class:`~.ConnectionError`, :class:`~.ClusterDownError` or :class:`~.SlotNotCoveredError` are encountered, default value is ``3`` + * This argument is deprecated - it is used to initialize the number of retries for the retry object, + only in the case when the ``retry`` object is not provided. + When the ``retry`` argument is provided, the ``cluster_error_retry_attempts`` argument is ignored! + +* The retry object is not yet fully utilized in the cluster client. + The retry object is used only to determine the number of retries for the cluster level calls. Let's consider the following example: @@ -57,14 +63,11 @@ Let's consider the following example: >>> from redis.retry import Retry >>> from redis.cluster import RedisCluster >>> ->>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6), cluster_error_retry_attempts=1) +>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6)) >>> rc.set('foo', 'bar') #. the client library calculates the hash slot for key 'foo'. #. given the hash slot, it then determines which node to connect to, in order to execute the command. #. during the connection, a :class:`~.ConnectionError` is raised. -#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the client tries to reconnect to the node up to 6 times, with an exponential backoff between each attempt. -#. even after 6 retries, the client is still unable to connect. -#. because we set ``cluster_error_retry_attempts=1``, before giving up, the client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster. -#. after the cluster has been re-initialized, it starts a new cycle of retries, up to 6 retries, with an exponential backoff. -#. if the client can connect, we're good. Otherwise, the exception is finally raised to the caller, because we've run out of attempts. \ No newline at end of file +#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the cluster client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster. +#. the cluster client retries the command until it either succeeds or the max number of retries is reached. \ No newline at end of file diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 457642b792..cc5e15bb63 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -29,7 +29,7 @@ from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry from redis.auth.token import TokenInterface -from redis.backoff import default_backoff +from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -143,19 +143,23 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand To avoid reinitializing the cluster on moved errors, set reinitialize_steps to 0. :param cluster_error_retry_attempts: - | Number of times to retry before raising an error when :class:`~.TimeoutError` - or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered - :param connection_error_retry_attempts: - | Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered. - The default backoff strategy will be set if Retry object is not passed (see - default_backoff in backoff.py). To change it, pass a custom Retry object - using the "retry" keyword. + | @deprecated - Please configure the 'retry' object instead + In case 'retry' object is set - this argument is ignored! + + Number of times to retry before raising an error when :class:`~.TimeoutError`, + :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` + or :class:`~.ClusterDownError` are encountered + :param retry: + | A retry object that defines the retry strategy and the number of + retries for the cluster client. + In current implementation for the cluster client (starting form redis-py version 6.0.0) + the retry object is not yet fully utilized, instead it is used just to determine + the number of retries for the cluster client. + In the future releases the retry object will be used to handle the cluster client retries! :param max_connections: | Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, a - :class:`~.MaxConnectionsError` is raised. This error may be retried as defined - by :attr:`connection_error_retry_attempts` + :class:`~.MaxConnectionsError` is raised. :param address_remap: | An optional callable which, when provided with an internal network address of a node, e.g. a `(host, port)` tuple, will return the address @@ -211,10 +215,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": __slots__ = ( "_initialize", "_lock", - "cluster_error_retry_attempts", + "retry", "command_flags", "commands_parser", - "connection_error_retry_attempts", "connection_kwargs", "encoder", "node_flags", @@ -231,6 +234,13 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": reason="Please configure the 'load_balancing_strategy' instead", version="5.3.0", ) + @deprecated_args( + args_to_warn=[ + "cluster_error_retry_attempts", + ], + reason="Please configure the 'retry' object instead", + version="6.0.0", + ) def __init__( self, host: Optional[str] = None, @@ -242,8 +252,9 @@ def __init__( load_balancing_strategy: Optional[LoadBalancingStrategy] = None, reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 3, max_connections: int = 2**31, + retry: Optional["Retry"] = None, + retry_on_error: Optional[List[Type[Exception]]] = None, # Client related kwargs db: Union[str, int] = 0, path: Optional[str] = None, @@ -263,8 +274,6 @@ def __init__( socket_keepalive: bool = False, socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, - retry: Optional["Retry"] = None, - retry_on_error: Optional[List[Type[Exception]]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -318,7 +327,6 @@ def __init__( "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "socket_timeout": socket_timeout, - "retry": retry, "protocol": protocol, } @@ -342,17 +350,15 @@ def __init__( # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect - self.retry = retry - if retry or retry_on_error or connection_error_retry_attempts > 0: - # Set a retry object for all cluster nodes - self.retry = retry or Retry( - default_backoff(), connection_error_retry_attempts + if retry: + self.retry = retry + else: + self.retry = Retry( + backoff=ExponentialWithJitterBackoff(base=1, cap=10), + retries=cluster_error_retry_attempts, ) - if not retry_on_error: - # Default errors for retrying - retry_on_error = [ConnectionError, TimeoutError] + if retry_on_error: self.retry.update_supported_errors(retry_on_error) - kwargs.update({"retry": self.retry}) kwargs["response_callbacks"] = _RedisCallbacks.copy() if kwargs.get("protocol") in ["3", 3]: @@ -389,8 +395,6 @@ def __init__( self.read_from_replicas = read_from_replicas self.load_balancing_strategy = load_balancing_strategy self.reinitialize_steps = reinitialize_steps - self.cluster_error_retry_attempts = cluster_error_retry_attempts - self.connection_error_retry_attempts = connection_error_retry_attempts self.reinitialize_counter = 0 self.commands_parser = AsyncCommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() @@ -561,15 +565,8 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" return self.connection_kwargs - def get_retry(self) -> Optional["Retry"]: - return self.retry - - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.retry = retry - for node in self.get_nodes(): - node.connection_kwargs.update({"retry": retry}) - for conn in node._connections: - conn.retry = retry def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" @@ -688,8 +685,8 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: """ Execute a raw command on the appropriate cluster node or target_nodes. - It will retry the command as specified by :attr:`cluster_error_retry_attempts` & - then raise an exception. + It will retry the command as specified by the retries property of + the :attr:`retry` & then raise an exception. :param args: | Raw command args @@ -705,7 +702,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: command = args[0] target_nodes = [] target_nodes_specified = False - retry_attempts = self.cluster_error_retry_attempts + retry_attempts = self.retry.get_retries() passed_targets = kwargs.pop("target_nodes", None) if passed_targets and not self._is_node_flag(passed_targets): @@ -1048,7 +1045,23 @@ def acquire_connection(self) -> Connection: return self._free.popleft() except IndexError: if len(self._connections) < self.max_connections: - connection = self.connection_class(**self.connection_kwargs) + # We are configuring the connection pool not to retry + # connections on lower level clients to avoid retrying + # connections to nodes that are not reachable + # and to avoid blocking the connection pool. + # The only error that will have some handling in the lower + # level clients is ConnectionError which will trigger disconnection + # of the socket. + # The retries will be handled on cluster client level + # where we will have proper handling of the cluster topology + retry = Retry( + backoff=NoBackoff(), + retries=0, + supported_errors=(ConnectionError,), + ) + connection_kwargs = self.connection_kwargs.copy() + connection_kwargs["retry"] = retry + connection = self.connection_class(**connection_kwargs) self._connections.append(connection) return connection @@ -1544,7 +1557,7 @@ async def execute( """ Execute the pipeline. - It will retry the commands as specified by :attr:`cluster_error_retry_attempts` + It will retry the commands as specified by retries specified in :attr:`retry` & then raise an exception. :param raise_on_error: @@ -1560,7 +1573,7 @@ async def execute( return [] try: - retry_attempts = self._client.cluster_error_retry_attempts + retry_attempts = self._client.retry.get_retries() while True: try: if self._client._initialize: diff --git a/redis/asyncio/retry.py b/redis/asyncio/retry.py index 7c5e3b0e7d..a20f8b4849 100644 --- a/redis/asyncio/retry.py +++ b/redis/asyncio/retry.py @@ -43,6 +43,18 @@ def update_supported_errors(self, specified_errors: list): set(self._supported_errors + tuple(specified_errors)) ) + def get_retries(self) -> int: + """ + Get the number of retries. + """ + return self._retries + + def update_retries(self, value: int) -> None: + """ + Set the number of retries. + """ + self._retries = value + async def call_with_retry( self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any] ) -> T: diff --git a/redis/cluster.py b/redis/cluster.py index ae9720652a..c79f8e429d 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -9,7 +9,7 @@ from redis._parsers import CommandsParser, Encoder from redis._parsers.helpers import parse_scan -from redis.backoff import default_backoff +from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface from redis.client import CaseInsensitiveDict, PubSub, Redis from redis.commands import READ_COMMANDS, RedisClusterCommands @@ -179,7 +179,7 @@ def parse_cluster_myshardid(resp, **options): "cache", "cache_config", ) -KWARGS_DISABLED_KEYS = ("host", "port") +KWARGS_DISABLED_KEYS = ("host", "port", "retry") def cleanup_kwargs(**kwargs): @@ -436,7 +436,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None: # Choose a primary if the cluster contains different primaries self.nodes_manager.default_node = random.choice(primaries) else: - # Otherwise, hoose a primary if the cluster contains different primaries + # Otherwise, choose a primary if the cluster contains different primaries replicas = [node for node in self.get_replicas() if node != curr_node] if replicas: self.nodes_manager.default_node = random.choice(replicas) @@ -492,6 +492,13 @@ class initializer. In the case of conflicting arguments, querystring reason="Please configure the 'load_balancing_strategy' instead", version="5.3.0", ) + @deprecated_args( + args_to_warn=[ + "cluster_error_retry_attempts", + ], + reason="Please configure the 'retry' object instead", + version="6.0.0", + ) def __init__( self, host: Optional[str] = None, @@ -549,9 +556,19 @@ def __init__( If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. :param cluster_error_retry_attempts: + @deprecated - Please configure the 'retry' object instead + In case 'retry' object is set - this argument is ignored! + Number of times to retry before raising an error when - :class:`~.TimeoutError` or :class:`~.ConnectionError` or + :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or :class:`~.ClusterDownError` are encountered + :param retry: + A retry object that defines the retry strategy and the number of + retries for the cluster client. + In current implementation for the cluster client (starting form redis-py version 6.0.0) + the retry object is not yet fully utilized, instead it is used just to determine + the number of retries for the cluster client. + In the future releases the retry object will be used to handle the cluster client retries! :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs @@ -571,7 +588,8 @@ def __init__( :**kwargs: Extra arguments that will be sent into Redis instance when created - (See Official redis-py doc for supported kwargs + (See Official redis-py doc for supported kwargs - the only limitation + is that you can't provide 'retry' object as part of kwargs. [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) Some kwargs are not supported and will raise a RedisClusterException: @@ -586,6 +604,15 @@ def __init__( "Argument 'db' is not possible to use in cluster mode" ) + if "retry" in kwargs: + # Argument 'retry' is not possible to be used in kwargs when in cluster mode + # the kwargs are set to the lower level connections to the cluster nodes + # and there we provide retry configuration without retries allowed. + # The retries should be handled on cluster client level. + raise RedisClusterException( + "The 'retry' argument cannot be used in kwargs when running in cluster mode." + ) + # Get the startup node/s from_url = False if url is not None: @@ -628,9 +655,11 @@ def __init__( kwargs = cleanup_kwargs(**kwargs) if retry: self.retry = retry - kwargs.update({"retry": self.retry}) else: - kwargs.update({"retry": Retry(default_backoff(), 0)}) + self.retry = Retry( + backoff=ExponentialWithJitterBackoff(base=1, cap=10), + retries=cluster_error_retry_attempts, + ) self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -641,7 +670,6 @@ def __init__( if (cache_config or cache) and protocol not in [3, "3"]: raise RedisError("Client caching is only supported with RESP version 3") - self.cluster_error_retry_attempts = cluster_error_retry_attempts self.command_flags = self.__class__.COMMAND_FLAGS.copy() self.node_flags = self.__class__.NODE_FLAGS.copy() self.read_from_replicas = read_from_replicas @@ -772,13 +800,8 @@ def set_default_node(self, node): self.nodes_manager.default_node = node return True - def get_retry(self) -> Optional["Retry"]: - return self.retry - - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.retry = retry - for node in self.get_nodes(): - node.redis_connection.set_retry(retry) def monitor(self, target_node=None): """ @@ -825,10 +848,11 @@ def pipeline(self, transaction=None, shard_hint=None): startup_nodes=self.nodes_manager.startup_nodes, result_callbacks=self.result_callbacks, cluster_response_callbacks=self.cluster_response_callbacks, - cluster_error_retry_attempts=self.cluster_error_retry_attempts, + cluster_error_retry_attempts=self.retry.get_retries(), read_from_replicas=self.read_from_replicas, load_balancing_strategy=self.load_balancing_strategy, reinitialize_steps=self.reinitialize_steps, + retry=self.retry, lock=self._lock, ) @@ -1090,8 +1114,8 @@ def _internal_execute_command(self, *args, **kwargs): """ Wrapper for ERRORS_ALLOW_RETRY error handling. - It will try the number of times specified by the config option - "self.cluster_error_retry_attempts" which defaults to 3 unless manually + It will try the number of times specified by the retries property from + config option "self.retry" which defaults to 3 unless manually configured. If it reaches the number of times, the command will raise the exception @@ -1117,9 +1141,7 @@ def _internal_execute_command(self, *args, **kwargs): # execution since the nodes may not be valid anymore after the tables # were reinitialized. So in case of passed target nodes, # retry_attempts will be set to 0. - retry_attempts = ( - 0 if target_nodes_specified else self.cluster_error_retry_attempts - ) + retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() # Add one for the first execution execute_attempts = 1 + retry_attempts for _ in range(execute_attempts): @@ -1333,8 +1355,12 @@ def __eq__(self, obj): return isinstance(obj, ClusterNode) and obj.name == self.name def __del__(self): - if self.redis_connection is not None: - self.redis_connection.close() + try: + if self.redis_connection is not None: + self.redis_connection.close() + except Exception: + # Ignore errors when closing the connection + pass class LoadBalancingStrategy(Enum): @@ -1585,17 +1611,32 @@ def create_redis_connections(self, nodes): ) def create_redis_node(self, host, port, **kwargs): + # We are configuring the connection pool not to retry + # connections on lower level clients to avoid retrying + # connections to nodes that are not reachable + # and to avoid blocking the connection pool. + # The only error that will have some handling in the lower + # level clients is ConnectionError which will trigger disconnection + # of the socket. + # The retries will be handled on cluster client level + # where we will have proper handling of the cluster topology + node_retry_config = Retry( + backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) + ) + if self.from_url: # Create a redis node with a costumed connection pool kwargs.update({"host": host}) kwargs.update({"port": port}) kwargs.update({"cache": self._cache}) + kwargs.update({"retry": node_retry_config}) r = Redis(connection_pool=self.connection_pool_class(**kwargs)) else: r = Redis( host=host, port=port, cache=self._cache, + retry=node_retry_config, **kwargs, ) return r @@ -2039,6 +2080,13 @@ class ClusterPipeline(RedisCluster): TryAgainError, ) + @deprecated_args( + args_to_warn=[ + "cluster_error_retry_attempts", + ], + reason="Please configure the 'retry' object instead", + version="6.0.0", + ) def __init__( self, nodes_manager: "NodesManager", @@ -2050,6 +2098,7 @@ def __init__( load_balancing_strategy: Optional[LoadBalancingStrategy] = None, cluster_error_retry_attempts: int = 3, reinitialize_steps: int = 5, + retry: Optional[Retry] = None, lock=None, **kwargs, ): @@ -2066,9 +2115,16 @@ def __init__( self.load_balancing_strategy = load_balancing_strategy self.command_flags = self.__class__.COMMAND_FLAGS.copy() self.cluster_response_callbacks = cluster_response_callbacks - self.cluster_error_retry_attempts = cluster_error_retry_attempts self.reinitialize_counter = 0 self.reinitialize_steps = reinitialize_steps + if retry is not None: + self.retry = retry + else: + self.retry = Retry( + backoff=ExponentialWithJitterBackoff(base=1, cap=10), + retries=self.cluster_error_retry_attempts, + ) + self.encoder = Encoder( kwargs.get("encoding", "utf-8"), kwargs.get("encoding_errors", "strict"), @@ -2194,7 +2250,7 @@ def send_cluster_commands( - refereh_table_asap set to True It will try the number of times specified by - the config option "self.cluster_error_retry_attempts" + the retries in config option "self.retry" which defaults to 3 unless manually configured. If it reaches the number of times, the command will @@ -2202,7 +2258,7 @@ def send_cluster_commands( """ if not stack: return [] - retry_attempts = self.cluster_error_retry_attempts + retry_attempts = self.retry.get_retries() while True: try: return self._send_cluster_commands( diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 01077e6b88..0e717b31d6 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -120,7 +120,7 @@ def pipeline(self, transaction=True, shard_hint=None): startup_nodes=self.client.nodes_manager.startup_nodes, result_callbacks=self.client.result_callbacks, cluster_response_callbacks=self.client.cluster_response_callbacks, - cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + cluster_error_retry_attempts=self.client.retry.get_retries(), read_from_replicas=self.client.read_from_replicas, reinitialize_steps=self.client.reinitialize_steps, lock=self.client._lock, diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 4188b93d70..3fbf821172 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -84,7 +84,7 @@ def pipeline(self, transaction=True, shard_hint=None): startup_nodes=self.client.nodes_manager.startup_nodes, result_callbacks=self.client.result_callbacks, cluster_response_callbacks=self.client.cluster_response_callbacks, - cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + cluster_error_retry_attempts=self.client.retry.get_retries(), read_from_replicas=self.client.read_from_replicas, reinitialize_steps=self.client.reinitialize_steps, lock=self.client._lock, diff --git a/redis/retry.py b/redis/retry.py index 03fd973c4c..ca9ea76f24 100644 --- a/redis/retry.py +++ b/redis/retry.py @@ -44,6 +44,18 @@ def update_supported_errors( set(self._supported_errors + tuple(specified_errors)) ) + def get_retries(self) -> int: + """ + Get the number of retries. + """ + return self._retries + + def update_retries(self, value: int) -> None: + """ + Set the number of retries. + """ + self._retries = value + def call_with_retry( self, do: Callable[[], T], diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index a0429152ec..7d411b578b 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -13,7 +13,11 @@ from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster from redis.asyncio.connection import Connection, SSLConnection, async_timeout from redis.asyncio.retry import Retry -from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff +from redis.backoff import ( + ExponentialBackoff, + ExponentialWithJitterBackoff, + NoBackoff, +) from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, PRIMARY, @@ -367,71 +371,79 @@ async def test_cluster_set_get_retry_object(self, request: FixtureRequest): retry = Retry(NoBackoff(), 2) url = request.config.getoption("--redis-url") async with RedisCluster.from_url(url, retry=retry) as r: - assert r.get_retry()._retries == retry._retries - assert isinstance(r.get_retry()._backoff, NoBackoff) + assert r.retry.get_retries() == retry.get_retries() + assert isinstance(r.retry._backoff, NoBackoff) for node in r.get_nodes(): - n_retry = node.connection_kwargs.get("retry") + # validate nodes lower level connections default + # retry policy is applied + n_retry = node.acquire_connection().retry assert n_retry is not None - assert n_retry._retries == retry._retries + assert n_retry._retries == 0 assert isinstance(n_retry._backoff, NoBackoff) rand_cluster_node = r.get_random_node() existing_conn = rand_cluster_node.acquire_connection() # Change retry policy new_retry = Retry(ExponentialBackoff(), 3) r.set_retry(new_retry) - assert r.get_retry()._retries == new_retry._retries - assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert r.retry.get_retries() == new_retry.get_retries() + assert isinstance(r.retry._backoff, ExponentialBackoff) for node in r.get_nodes(): - n_retry = node.connection_kwargs.get("retry") + # validate nodes lower level connections are not affected + n_retry = node.acquire_connection().retry assert n_retry is not None - assert n_retry._retries == new_retry._retries - assert isinstance(n_retry._backoff, ExponentialBackoff) - assert existing_conn.retry._retries == new_retry._retries + assert n_retry._retries == 0 + assert isinstance(n_retry._backoff, NoBackoff) + assert existing_conn.retry.get_retries() == 0 new_conn = rand_cluster_node.acquire_connection() - assert new_conn.retry._retries == new_retry._retries + assert new_conn.retry._retries == 0 async def test_cluster_retry_object(self, request: FixtureRequest) -> None: url = request.config.getoption("--redis-url") async with RedisCluster.from_url(url) as rc_default: # Test default retry - retry = rc_default.connection_kwargs.get("retry") + retry = rc_default.retry # FIXME: Workaround for https://github.com/redis/redis-py/issues/3030 host = rc_default.get_default_node().host assert isinstance(retry, Retry) assert retry._retries == 3 - assert isinstance(retry._backoff, type(default_backoff())) - assert rc_default.get_node(host, 16379).connection_kwargs.get( - "retry" - ) == rc_default.get_node(host, 16380).connection_kwargs.get("retry") + assert isinstance(retry._backoff, type(ExponentialWithJitterBackoff())) + + # validate nodes connections are using the default retry for + # lower level connections when client is created through 'from_url' method + # without specified retry object + node1_retry = rc_default.get_node(host, 16379).acquire_connection().retry + node2_retry = rc_default.get_node(host, 16380).acquire_connection().retry + for node_retry in (node1_retry, node2_retry): + assert node_retry.get_retries() == 0 + assert isinstance(node_retry._backoff, NoBackoff) + assert node_retry._supported_errors == (ConnectionError,) retry = Retry(ExponentialBackoff(10, 5), 5) async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry: # Test custom retry - assert ( - rc_custom_retry.get_node(host, 16379).connection_kwargs.get("retry") - == retry - ) + assert rc_custom_retry.retry == retry + # validate nodes connections are using the default retry for + # lower level connections when client is created through 'from_url' method + # with specified retry object + node1_retry = rc_default.get_node(host, 16379).acquire_connection().retry + node2_retry = rc_default.get_node(host, 16380).acquire_connection().retry + for node_retry in (node1_retry, node2_retry): + assert node_retry.get_retries() == 0 + assert isinstance(node_retry._backoff, NoBackoff) + assert node_retry._supported_errors == (ConnectionError,) async with RedisCluster.from_url( - url, connection_error_retry_attempts=0 + url, cluster_error_retry_attempts=0 ) as rc_no_retries: - # Test no connection retries - assert ( - rc_no_retries.get_node(host, 16379).connection_kwargs.get("retry") - is None - ) + # Test no cluster retries + assert rc_no_retries.retry.get_retries() == 0 async with RedisCluster.from_url( url, retry=Retry(NoBackoff(), 0) ) as rc_no_retries: - assert ( - rc_no_retries.get_node(host, 16379) - .connection_kwargs.get("retry") - ._retries - == 0 - ) + assert rc_no_retries.retry.get_retries() == 0 async def test_empty_startup_nodes(self) -> None: """ @@ -2809,7 +2821,7 @@ async def test_multi_key_operation_with_multi_slots(self, r: RedisCluster) -> No async def test_cluster_down_error(self, r: RedisCluster) -> None: """ - Test that the pipeline retries cluster_error_retry_attempts times before raising + Test that the pipeline retries the specified in retry object times before raising an error. """ key = "foo" @@ -2834,10 +2846,7 @@ async def parse_response( async with r.pipeline() as pipe: with pytest.raises(ClusterDownError): await pipe.get(key).execute() - assert ( - node.parse_response.await_count - == 3 * r.cluster_error_retry_attempts + 1 - ) + assert node.parse_response.await_count == 3 * r.retry.get_retries() + 1 async def test_connection_error_not_raised(self, r: RedisCluster) -> None: """Test ConnectionError handling with raise_on_error=False.""" diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d96342f87a..d4e48e199b 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -14,7 +14,11 @@ import redis from redis import Redis from redis._parsers import CommandsParser -from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff +from redis.backoff import ( + ExponentialBackoff, + ExponentialWithJitterBackoff, + NoBackoff, +) from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -884,46 +888,48 @@ def moved_redirect_effect(connection, *args, **options): def test_cluster_get_set_retry_object(self, request): retry = Retry(NoBackoff(), 2) r = _get_client(RedisCluster, request, retry=retry) - assert r.get_retry()._retries == retry._retries - assert isinstance(r.get_retry()._backoff, NoBackoff) + assert r.retry.get_retries() == retry.get_retries() + assert isinstance(r.retry._backoff, NoBackoff) for node in r.get_nodes(): - assert node.redis_connection.get_retry()._retries == retry._retries + assert node.redis_connection.get_retry().get_retries() == 0 assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) rand_node = r.get_random_node() existing_conn = rand_node.redis_connection.connection_pool.get_connection() # Change retry policy new_retry = Retry(ExponentialBackoff(), 3) r.set_retry(new_retry) - assert r.get_retry()._retries == new_retry._retries - assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert r.retry.get_retries() == new_retry.get_retries() + assert isinstance(r.retry._backoff, ExponentialBackoff) for node in r.get_nodes(): - assert node.redis_connection.get_retry()._retries == new_retry._retries - assert isinstance( - node.redis_connection.get_retry()._backoff, ExponentialBackoff - ) - assert existing_conn.retry._retries == new_retry._retries + assert node.redis_connection.get_retry()._retries == 0 + assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + assert existing_conn.retry._retries == 0 new_conn = rand_node.redis_connection.connection_pool.get_connection() - assert new_conn.retry._retries == new_retry._retries + assert new_conn.retry._retries == 0 def test_cluster_retry_object(self, r) -> None: # Test default retry # FIXME: Workaround for https://github.com/redis/redis-py/issues/3030 host = r.get_default_node().host - retry = r.get_connection_kwargs().get("retry") + # test default retry config + retry = r.retry assert isinstance(retry, Retry) - assert retry._retries == 0 - assert isinstance(retry._backoff, type(default_backoff())) - node1 = r.get_node(host, 16379).redis_connection - node2 = r.get_node(host, 16380).redis_connection - assert node1.get_retry()._retries == node2.get_retry()._retries - - # Test custom retry + assert retry.get_retries() == 3 + assert isinstance(retry._backoff, type(ExponentialWithJitterBackoff())) + node1_connection = r.get_node(host, 16379).redis_connection + node2_connection = r.get_node(host, 16380).redis_connection + assert node1_connection.get_retry()._retries == 0 + assert node2_connection.get_retry()._retries == 0 + + # Test custom retry is not applied to nodes retry = Retry(ExponentialBackoff(10, 5), 5) rc_custom_retry = RedisCluster(host, 16379, retry=retry) assert ( - rc_custom_retry.get_node(host, 16379).redis_connection.get_retry()._retries - == retry._retries + rc_custom_retry.get_node(host, 16379) + .redis_connection.get_retry() + .get_retries() + == 0 ) def test_replace_cluster_node(self, r) -> None: diff --git a/tests/test_retry.py b/tests/test_retry.py index cb001fbbd5..926fe28313 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -203,6 +203,22 @@ def test_client_retry_on_timeout(self, request): finally: assert parse_response.call_count == retries + 1 + @pytest.mark.onlycluster + def test_get_set_retry_object_for_cluster_client(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(Redis, request, retry_on_timeout=True, retry=retry) + exist_conn = r.connection_pool.get_connection() + assert r.retry._retries == retry._retries + assert isinstance(r.retry._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry_policy) + assert r.retry._retries == new_retry_policy._retries + assert isinstance(r.retry._backoff, ExponentialBackoff) + assert exist_conn.retry._retries == new_retry_policy._retries + new_conn = r.connection_pool.get_connection() + assert new_conn.retry._retries == new_retry_policy._retries + + @pytest.mark.onlynoncluster def test_get_set_retry_object(self, request): retry = Retry(NoBackoff(), 2) r = _get_client(Redis, request, retry_on_timeout=True, retry=retry)