Redis事务的实现

本文详细介绍Redis事务的实现,以及涉及到主从复制的情况。由于持久化涉及Redis文件系统RIO,所以也会对RIO进行介绍。
这是Redis源码分析的系列文章的第四篇,前三篇分别是

  1. Redis底层对象实现原理分析
  2. Redis Sentinel实现原理分析
  3. Redis持久化机制实现

简介

事务在执行期间不会主动中断,也就是说服务器在执行完事务中的所有命令之后,才会继续处理其他客户端的其他命令。
需要注意的是,Redis并没有原子性。原因之一是Redis不支持事务回滚。有人说Redis不是有DISCARD的么?但这个DISCARD只是在命令入队的阶段,等到真正执行事务的时候,Redis并不支持执行到一般就退出。

multi

总的来说,就是multi开始事务,最后exec提交事务或者discard放弃事务。在发送完multi后,后面发送的命令都不会被立即执行,而是返回QUEUED状态。当收到exec后,事务会将这个队列中的命令按照fifo的顺序执行。并发结果放入一个回复队列中返回给客户端。

watch

在multi前监控一些key,如果这些key被修改,则事务回滚。

主要结构

multiState

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct multiState {
multiCmd *commands;
int count;
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

主要涉及下面一些字段:

  1. commands
    这个是multiCmd数组,表示事务中的FIFO的队列。
  2. count
    multi命令的数量,表示commands的长度?
  3. cmd_flags

multiCmd

1
2
3
4
5
typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd;
} multiCmd;

redisCmd

事务开始和结束的实现

普通命令

可以看到,在processCommand中,如果被设置了MULTI,那么就不会调用call,而是调用queueMultiCommand将指令入队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int processCommand(client *c) {
...
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
...
}

开始

multi指令,会给客户端打上CLIENT_MULTI这个flag。

1
2
3
4
5
6
7
8
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}

discard

1
2
3
4
5
6
7
8
void discardCommand(client *c) {
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
discardTransaction(c);
addReply(c,shared.ok);
}

discardTransaction

先调用discardTransaction清空multiCmd数组。
再调用initClientMultiState清除multiState里面的其他内容。
再取消所有事务相关的flag。这边的flag都是什么意思?

  1. CLIENT_MULTI
  2. CLIENT_DIRTY_CAS
    表示被watch的字段发生了修改。
  3. CLIENT_DIRTY_EXEC
    flagTransaction产生,表示在先前操作中,出现了错误,所以不能exec。包含下面情况:
    1. rejectCommand
      在下面的场景下被调用:
      1. shared.noautherr
      2. shared.oomerr
      3. shared.bgsaveerr
      4. 等等
    2. rejectCommandFormat
    3. processCommand中,涉及Redis Cluster相关。
      1
      2
      3
      4
      5
      6
      void discardTransaction(client *c) {
      freeClientMultiState(c);
      initClientMultiState(c);
      c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
      unwatchAllKeys(c);
      }

exec

1
2
3
4
5
6
7
8
9
10
11
12
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;

if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}

检查flags,discard掉有问题的事务:

  1. 对于CLIENT_DIRTY_CAS返回空数组
    因为这个不能算作是错误,只能作为一种特殊的表现。
  2. 对于CLIENT_DIRTY_EXEC返回EXECABORT
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /* 
    * A failed EXEC in the first case returns a multi bulk nil object
    * (technically it is not an error but a special behavior), while
    * in the second an EXECABORT error is returned. */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
    addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
    shared.nullarray[c->resp]);
    discardTransaction(c);
    goto handle_monitor;
    }

因为已经检查了,取消客户端对所有键的监视。
因为事务中的命令在执行时可能会修改命令和命令的参数,所以为了正确地传播命令,需要备份这些命令和参数。
下面开始处理multiState下面所有的multiCmd

1
2
3
4
5
6
7
8
9
10
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyArrayLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;

当我们遇到第一个不是CMD_READONLY或者CMD_ADMIN的请求时,传播multi指令。【Q】这里readonly又出现了,为啥这么特殊呢?我觉得readonly指令一般指的是get之类的不涉及修改的指令。而一旦涉及修改或者增加等的指令,我们一定要把这个变动propagate出去。
这样,我们可以将MULTI/..../EXEC打包成一整个,并且AOF和Slave都具有相同的一致性和原子性的要求。为啥呢?

1
2
3
4
5
6
7
if (!must_propagate &&
!server.loading &&
!(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
{
execCommandPropagateMulti(c);
must_propagate = 1;
}

ACLCheckCommandPerm主要检查权限,包含:

  1. 执行命令的权限
  2. 对某个key的权限

如果检查通过,我们就用call来执行命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
addReplyErrorFormat(c,
"-NOPERM ACLs rules changed between the moment the "
"transaction was accumulated and the EXEC call. "
"This command is no longer allowed for the "
"following reason: %s",
(acl_retval == ACL_DENIED_CMD) ?
"no permission to execute the command or subcommand" :
"no permission to touch the specified keys");
} else {
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
}

因为执行后命令、命令参数可能会被改变,比如SPOP会被改写为SREM,所以这里需要更新事务队列中的命令和参数,确保Slave和AOF的数据一致性。

1
2
3
4
5
    /* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}

在这个循环结束之后,事务执行完了,我们还原命令,并且清空事务状态。

1
2
3
4
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
discardTransaction(c);

下面,如果我们需要propagate的话,就增加dirty。先前我们知道,在call结束之后,如果有dirty,会去触发检测是否propagate的逻辑。而call也是可以嵌套调用的。
这里有一个特殊情况需要考虑。就是在multi/exec块中,这个实例突然从Master切换成了Slave(可能是来自Sentinel的SLAVEOF指令)。此时,原来Master收到的multi指令已经被传播到了replication backlog里面,但是后面的可能还没来得及传。对于这种情况,我们需要保证至少通过exec来结束这个backlog。【Q】为啥不用discard呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    /* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}

handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
* since the natural order of commands execution is actually:
* MUTLI, EXEC, ... commands inside transaction ...
* Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
* table, and we do it here with correct ordering. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}