Redis持久化机制包括AOF和RDB两种:
- RDB保存二进制形式的数据库快照。
- AOF以协议文本的方式,记录数据库写入的指令。
本文详细介绍这两种方式的实现,以及涉及到主从复制的情况。由于持久化涉及Redis文件系统RIO,所以也会对RIO进行介绍。
作为Redis源码分析的系列文章,本文使用的版本和Redis底层对象实现原理分析、Redis Sentinel实现原理分析等文章是相同的。
RIO
rioInitWithFile
从FILE
创建一个rio
对象。
1 | void rioInitWithFile(rio *r, FILE *fp) { |
解释一下剩下来的两个参数:
autosync
表示在写入autosync
个字节之后,就进行fsync
。
可以通过rioSetAutoSync
函数进行设置。
bio
Redis将耗时的io操作放到后台的线程来执行。因此叫做background io。
创建一个io任务
可以将下列的任务给bio做
BIO_CLOSE_FILE
等于延迟了的close(2)
BIO_AOF_FSYNC
等于延迟了的AOF fsyncBIO_LAZY_FREE
等于延迟了的内存释放
对于每一种类型,维护一个任务队列bio_jobs[type]
,一个互斥量bio_jobs[type]
和一个条件变量bio_newjob_cond[type])
。
创建io任务很简单,首先获得对应任务类型的锁,然后将任务job
加到对应列表bio_jobs[type]
的尾部,然后通知条件变量。1
2
3
4
5
6
7
8
9
10
11
12
13void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
后台处理
从bioInit
里面可以看到,这个void *arg
,实际上传入的是int类型的type
。Redis会为每一种任务创建一个线程专门来处理。
1 | void *bioProcessBackgroundJobs(void *arg) { |
这个函数接受一个字符串,类似”0,2,3”, “0,2-3”, “0-20:2”这样。表示设置对某些CPU的亲和性。
此外,还需要让线程可以异步终止。
1 | redisSetCpuAffinity(server.bio_cpulist); |
下面是处理信号机制,在这里面需要对bio_mutex[type]
加锁的。
1 | pthread_mutex_lock(&bio_mutex[type]); |
下面的循环是一个经典的生产者消费者模型,我们这个函数是消费者。因此,如果我们检查到自己的队列是空的,那么就在条件变量bio_newjob_cond[type]
上面等待,我们还需要同时传入bio_mutex[type]
,因为条件变量的实现需要对这个mutex加锁或者解锁。如果说队列不是空的,就读取队头,但是不实际pop,并且解锁。
1 | while(1) { |
下面就是根据任务类型,去做相应的工作。
1 | /* Process the job accordingly to its type. */ |
等我们处理完了,再把对应的节点pop出来。
1 | /* Lock again before reiterating the loop, if there are no longer |
RDB
RDB机制的调用链(从下到上)如下所示:
startSaving
rdbSave
flushAllDataAndResetRDB
flushallCommand
FLUSHALL
指令
saveCommand
SAVE
指令rdbSaveBackground
bgsaveCommand
BGSAVE
指令
rdbSave
观察函数签名,将一个结构rsi
存到文件filename
里面
1 | int rdbSave(char *filename, rdbSaveInfo *rsi) |
首先是尝试创建临时的rdb文件,这里先创建临时文件,可能是为了防止RDB过程执行到一半宕掉了,导致写的RDB文件不全或者有问题。这样等到确定成功再改名会好一点?
1 | snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); |
如果文件创建失败,会产生错误日志
1 | serverLog(LL_WARNING, |
下面就是真正的dump过程。首先创建一个rio对象rdb,并且调用函数startSaving
1 | rioInitWithFile(&rdb,fp); |
这个函数根据传入的rdbflags
,向Redis发送事件。有关事件模块的内容,我们不在这里进行论述。需要注意,函数中额外检查了pid,从而确定是同步RDB还是异步RDB。
1 | void startSaving(int rdbflags) { |
对应的rdbflags
有下面的取值
RDBFLAGS_NONE
在rdbSave
中调用RDBFLAGS_AOF_PREAMBLE
是否用于AOF机制RDBFLAGS_REPLICATION
是否用于主从复制RDBFLAGS_ALLOW_DUP
这是一个选项
如果开启了rdb_save_incremental_fsync
增量写盘,就设置一下rio的autosync字段,REDIS_AUTOSYNC_BYTES
默认是32MB。容易看出,写32MB才刷盘,如果此时系统宕机,Redis的持久性是得不到保障的,这个在我们对InnoDB的介绍中也出现过。
1 | if (server.rdb_save_incremental_fsync) |
下面是核心逻辑rdbSaveRio
,我们在后面专门讨论
1 | if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { |
下面执行fflush,将C库缓冲区写到内核缓冲区,再调用fsync强制落盘。由于RDB类似于写checkpoint而不是写日志,所以这边写完直接刷盘,不需要统计autosync。
1 | /* Make sure data will not remain on the OS's output buffers */ |
下面调用rename
转换成正式的名字,调用stopSaving(1)
发送成功事件。如果rename失败,就发送失败事件,并且调用unlink
删除临时文件。
1 | /* Use RENAME to make sure the DB file is changed atomically only |
rdbSaveRio
首先看一下dump.rdb
的内容,他通常位于redis的安装目录下。
1 | REDIS0006þ^@^@^AcÀ^B^@^AbÀ^A^@^AaÀ^@ÿ<92>?6Äx^B±Ä |
照例查看函数声明。
1 | int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { |
首先写入magic,和全局以及所有模块的辅助信息
1 | if (server.rdb_checksum) |
下面对于每一个数据库j
,进行dump写入
1 | for (j = 0; j < server.dbnum; j++) { |
这里获得一个安全迭代器,也就是说在这个迭代器存在的时候是停止Rehash的。
1 | di = dictGetSafeIterator(d); |
写入RDB_OPCODE_SELECTDB
这个op,并保存一些元数据:
- 当前db的编号
- 当前db的size
- 当前db的expires链表的size
这些元数据会通过提前写入的RDB_OPCODE_
进行区分。
1 | if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; |
下面,遍历迭代器,以存储实际的数据。
1 | /* Iterate this DB writing every entry */ |
函数getExpire
是用来获取key的过期时间的,我们需要同时将过期时间也写到RDB里面。而过期时间是单独存放在db->expires
里面的,所以这里需要额外取出来,再存进去。
1 | expire = getExpire(db,&key); |
下面这些代码不太清楚是什么
1 | /* If we are storing the replication information on disk, persist |
最后,存入一个EOF和CRC64校验码。
1 | /* EOF opcode */ |
flushAllDataAndResetRDB
1 | void flushAllDataAndResetRDB(int flags) { |
AOF
AOF机制的调用链(从下到上)如下所示:
feedAppendOnlyFile
propagate
通常在各个模块中被带有PROPAGATE_AOF|PROPAGATE_REPL
参数地调用
在key过期时,propagateExpire
会被调用,从而发送过期消息给AOF。
写入
feedAppendOnlyFile
首先看参数,dictid
实际上表示当前redis数据库的id
1 | void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { |
下面的这些cat...Command
方法,实际上都是根据操作去重新组装回命令。
1 | sds buf = sdsempty(); |
函数sdscatlen
将buf
追加到server.aof_buf
末尾,类似于concat,但这个取名有点迷惑,让人觉得是category的简写。
1 | /* Append to the AOF buffer. This will be flushed on disk just before |
如果BGREWRITEAOF
正在进行,还需要将命令追加到重写缓存中,记录当前正在重写的AOF文件和数据库当前状态的差异。这个命令用于异步执行一个AOF文件重写操作,重写会创建一个当前AOF文件的体积优化版本。即使BGREWRITEAOF
执行失败,也不会有任何数据丢失,因为旧的AOF在BGREWRITEAOF
成功之前不会被修改。
为什么要支持AOF重写呢?考虑下面的情形:对一个计数器调用了100次INCR
,AOF文件需要使用100个条目来记录。但实际上只使用一条SET
保存最后的值就行了。所以BGREWRITEAOF
可以在不打断服务客户端的情况下,重建AOF文件,这个文件包含重建当前数据集所需的最少命令。
1 | if (server.aof_child_pid != -1) |
aofRewriteBufferAppend
1 | /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ |
catAppendOnlyExpireAtCommand
我们抽取一个cat...Command
进行分析
1 | sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) { |
Redis中的引用计数规则,让人觉得有点难懂,原因是有的对象是由被调用者而不是调用者释放的,但在这里的代码基本都是由调用者释放(调用decrRefCount
)的。
1 | decrRefCount(seconds); |
刷盘
flushAppendOnlyFile
服务器先写AOF,再返回给客户端。因为客户端进行写操作的机会是在event loop中,我们需要将所有的AOF写先缓存起来,并且在重新进入event loop前进行刷盘。
目前AOF刷盘有几种策略:
- 每个命令刷盘一次
这也是最安全和最慢的 - 每秒刷盘一次(everysec)
- 从不刷盘
当采用everysec方式的时候,如果后台线程有在fsync,那么会延迟这次fsync,这是因为Linux上,write(2)
调用也会被后台的fsync阻塞。当这种情况发生时,说明要尽快刷AOF缓存。所以会尝试在serverCron()
里面刷。但是如果force是1,那么无论是否fsync,都强行写入。
1 |
|