diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 29ec8793..29162e2c 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -6,13 +6,13 @@ # rediscluster imports from .client import RedisCluster from .exceptions import ( - RedisClusterException, AskError, MovedError, TryAgainError, + RedisClusterException, AskError, MovedError, TryAgainError, ResponseError, ) from .utils import clusterdown_wrapper, dict_merge # 3rd party imports from redis import Redis -from redis.exceptions import ConnectionError, RedisError, TimeoutError +from redis.exceptions import ConnectionError, RedisError, TimeoutError, ExecAbortError from redis._compat import imap, unicode @@ -90,7 +90,7 @@ def annotate_exception(self, exception, number, command): number, cmd, unicode(exception.args[0])) exception.args = (msg,) + exception.args[1:] - def execute(self, raise_on_error=True): + def execute(self, raise_on_error=True, use_multi = False): """ """ stack = self.command_stack @@ -99,7 +99,7 @@ def execute(self, raise_on_error=True): return [] try: - return self.send_cluster_commands(stack, raise_on_error) + return self.send_cluster_commands(stack, raise_on_error, use_multi = use_multi ) finally: self.reset() @@ -136,7 +136,7 @@ def reset(self): # self.connection = None @clusterdown_wrapper - def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True): + def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True, use_multi = False ): """ Send a bunch of cluster commands to the redis cluster. @@ -157,6 +157,7 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # command should route to. slot = self._determine_slot(*c.args) node = self.connection_pool.get_node_by_slot(slot) + c.slot = slot # little hack to make sure the node name is populated. probably could clean this up. self.connection_pool.nodes.set_node_name(node) @@ -165,7 +166,11 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # we can build a list of commands for each node. node_name = node['name'] if node_name not in nodes: - nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + nodes[node_name] = NodeCommands( + self.parse_response, + self.connection_pool.get_connection_by_node(node), + response_callbacks = self.response_callbacks, + use_multi = use_multi ) nodes[node_name].append(c) @@ -201,7 +206,13 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # collect all the commands we are allowed to retry. # (MOVED, ASK, or connection errors or timeout errors) attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) - if attempt and allow_redirections: + if attempt and allow_redirections and use_multi : + # some slots moved, refresh node-slot table asap. + self.refresh_table_asap = True + super( ClusterPipeline, self ).execute_command( 'cluster', 'keyslot', 'a' ) + # ^^ run a harmless command to force fetch a new node-slot table. + + if attempt and allow_redirections and not use_multi : # RETRY MAGIC HAPPENS HERE! # send these remaing comamnds one at a time using `execute_command` # in the main client. This keeps our retry logic in one place mostly, @@ -217,6 +228,11 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # If a lot of commands have failed, we'll be setting the # flag to rebuild the slots table from scratch. So MOVED errors should # correct themselves fairly quickly. + # + # not use_multi: if use_multi == True, it means we want the commands for + # the same slot to be atomic. Automatic retry here spoils the atomic + # semantic. So no automatic retry here and let the caller handle the + # retry, to ensure atomic semantic. self.connection_pool.nodes.increment_reinitialize_counter(len(attempt)) for c in attempt: try: @@ -371,18 +387,23 @@ def __init__(self, args, options=None, position=None): self.result = None self.node = None self.asking = False + self.slot = None class NodeCommands(object): """ """ - def __init__(self, parse_response, connection): + def __init__(self, parse_response, connection, + response_callbacks = None, use_multi = False ): """ """ self.parse_response = parse_response self.connection = connection - self.commands = [] + self.response_callbacks = response_callbacks or dict() + self.commands = [] + self.iRsp = [] + self.useMulti = use_multi # if true, use multi-exec where possible. def append(self, c): """ @@ -404,12 +425,69 @@ def write(self): # build up all commands into a single request to increase network perf # send all the commands and catch connection and timeout errors. try: - connection.send_packed_command(connection.pack_commands([c.args for c in commands])) + if self.useMulti : + cmds = self.addTransaction( commands ) + else : + cmds = [ c.args for c in commands ] + connection.send_packed_command( connection.pack_commands( cmds ) ) except (ConnectionError, TimeoutError) as e: for c in commands: c.result = e + def addTransaction( self, commands ): + cmds = [] + iRsp = [] # list of indices where the response should go + + nCmds = len( commands ) + if nCmds < 2: + cmds = [ c.args for c in commands ] + iRsp = [ i for i, c in enumerate( commands )] + self.iRsp = iRsp + return cmds + + iRun = [] # list of indices in the run + inArun = False # in a run of commands for same slot + for i in range( nCmds - 1 ) : + c1 = commands[ i ] + c2 = commands[ i + 1 ] + # starting a new run? + if not inArun and c1.slot == c2.slot : + inArun = True + cmds.append( ( 'MULTI', ) ) + iRsp.append( None ) # should discard this rsp (will be "OK") + # add the actual cmd + inArun = self.addCmd( cmds, c1, i, iRsp, iRun, inArun, c1.slot == c2.slot ) + + # remember to add the last commands + inArun = self.addCmd( cmds, c2, i + 1, iRsp, iRun, inArun, False ) + + self.iRsp = iRsp + return cmds + + def addCmd( self, cmds, cmd, i, iRsp, iRun, inArun, sameSlot ): + cmds.append( cmd.args ) + if inArun: + index = str( i ) # in good case we should discard this rsp. + iRsp.append( index ) # should discard this rsp (will be "QUEUED") + iRun.append( i ) + else: + iRsp.append( i ) # this response is for cmds[ i ] + + # ending a run? + if inArun and not sameSlot : + inArun = False + cmds.append( ( 'EXEC', ) ) + iRsp.append( iRun[ : ] ) # need [:] to create a shallow copy, because we clear iRun next. + iRun.clear() # must use clear(), since the caller owns iRun. + return inArun + def read(self): + if self.useMulti : + self.readMultiExec() + else : + self.readPlain() + + def readPlain(self): """ """ connection = self.connection @@ -434,3 +512,76 @@ def read(self): return except RedisError: c.result = sys.exc_info()[1] + + def readMultiExec(self): + """ + """ + # much of error handling copied from + # redis.client.Pipeline._execute_transaction + connection = self.connection + commands = self.commands + for index in self.iRsp : # iRsp = list of indices into responses + try: + rsp = self.parse_response( connection, '_' ) + except (ConnectionError, TimeoutError) as e: + for c in self.commands: + c.result = e + return + except RedisError: + rsp = sys.exc_info()[ 1 ] + + if index is None : + # We should eat this response ('OK' from MULTI). + # We used to eat 'QUEUED' for commands in multi-exec. But it could + # be moved or other errors. In those case we want to record the + # error to the commands. This is handled in next section. + # TODO: if the rsp to MULTI is not 'OK'? + continue + + if isinstance( index, str ) : + # for commands that are part of a multi-exec transaction, we encode + # the index as str (usually int). + # If the result is "QUEUED" (good case), we can discard this result. + # But if the result is an error, we need to store the error result. + if rsp == b'QUEUED' : + continue # good case, discard the placeholder rsp. + index = int( index ) + cmd = commands[ index ] + if cmd . result is None : + cmd . result = rsp # error case, record the error. + continue + + if isinstance( index, int ) : + # one rsp for one plain command + indexList = [ index ] + rspList = [ rsp ] + else : + # list of rsp from EXEC (as in MULTI-EXEC) + indexList = index + rspList = rsp + + if isinstance( rsp, ExecAbortError ) : + # got exec abort error, eg. due to slot moved. + # the errors were already recorded the the commands, no need + # to do it again here in the EXEC step. + continue + + if len( rspList ) != len( indexList ) : + self.connection.disconnect() + raise ResponseError( "Wrong number of response items from " + "pipeline execution" ) + + for i, r in zip( indexList, rspList ) : + cmd = commands[ i ] + if cmd.result is not None: + continue # already have the result + + # We have to run response callbacks manually + if not isinstance( r, Exception ): + args = cmd.args + options = cmd.options + command_name = args[ 0 ] + if command_name in self.response_callbacks : + r = self.response_callbacks[ command_name ]( r, **options ) + + cmd.result = r