当前位置: 代码迷 >> 综合 >> redis 源码系列(17):分身术 --- replication
  详细解决方案

redis 源码系列(17):分身术 --- replication

热度:20   发布时间:2024-01-25 12:29:12.0

单点服务在生产环境是绝对无法接受的,但是数据库服务,要实现多节点或者说分布式部署,面临的问题比 stateless 服务要多的多。数据的同步方式、一致性和可用性的妥协诸多限制,必须都加以考虑。

今天我们来学习一下 redis 主从同步相关内容,本节内容是 redis 实现高可用、数据安全、数据分区的基石。如果在节点之间没有一个可靠的数据同步方法,那么上述的一切都成为空中阁楼。

主节点在任意时刻只有一个,从节点可以有若干个。主从节点需要保持链接,主节点异步的将数据同步到从节点。

Master

主从同步,就是将主节点的数据同步到从节点。同步的大体流程如下:

  1. 主节点在接受到同步请求后,与从节点进行全量同步,启动 BGSAVE (如果之前已经有可用 BGSAVE 在执行,即不需要启动)
  2. 主节点需要将后续导致数据变化的命令(或者数据过期)发送到从节点,持续同步数据

全量同步

主节点在启动的时候,并不知道从节点的任何信息。当收到 sync 或者 psync 命令后,与从节点进行开始同步:

void syncCommand(redisClient *c) {// 已经是 SLAVE ,或者处于 MONITOR 模式,返回if (c->flags & REDIS_SLAVE) return;// 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC// redis 从节点也可以有自己的从节点,这里不展开讨论if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {addReplyError(c,"Can't SYNC while not connected with my master");return;}// 要进行 sync 或者 psync 的从节点的发送缓冲区必须是空的。因为我们可能会将// 这个从节点的发送缓冲区(存储的是 BGSAVE 以后的脏数据)拷贝到其他从节点的输出缓冲区if (listLength(c->reply) != 0 || c->bufpos != 0) {addReplyError(c,"SYNC and PSYNC are invalid with pending output");return;}redisLog(REDIS_NOTICE,"Slave asks for synchronization");/*解释一下 psync:psync 代表的是 partial sync,即部分同步。这个机制是为了当已经同步过的主从节点之间因为某些原因断开链接后,当链接重新建立以后,需要重新开始同步数据时,可以避免不必要的全量同步。*/if (!strcasecmp(c->argv[0]->ptr,"psync")) {// 尝试进行 PSYNCif (masterTryPartialResynchronization(c) == REDIS_OK) {// 可执行 PSYNCserver.stat_sync_partial_ok++;return; /* No full resync needed, return. */} else {// 不可执行 PSYNCchar *master_runid = c->argv[1]->ptr;// replication id 为 ? 是强制要求全量同步的意思,所以不需要统计到错误里面if (master_runid[0] != '?') server.stat_sync_partial_err++;}} else {// 旧版实现,设置标识,避免接收 REPLCONF ACK c->flags |= REDIS_PRE_PSYNC;}// 执行 full resynchronization ,增加计数server.stat_sync_full++;// 检查是否有 BGSAVE 在执行if (server.rdb_child_pid != -1) {// 如果有正在进行的 BGSAVE,我们还需要检查这个 BGSAVE 的数据是不是可以用于与从节点同步数据// 为什么会不适合呢?如果住节点的 BGSAVE 发生在有从节点与其链接前,主节点不会把 BGSAVE 启动// 以后的数据保存下来(写入到从节点的输出缓冲区),这样会导致 BGSAVE 启动以后的增量写丢失,// 这种 BGSAVE 产生的 rdb 文件是无法用于与从节点同步数据使用的redisClient *slave;listNode *ln;listIter li;// 只要之前已经有处于 REDIS_REPL_WAIT_BGSAVE_END 的从节点,说明这个 BGSAVE 启动以后的// 增量写数据已经保存下来了,所以不需要重新启动 BGSAVElistRewind(server.slaves,&li);while((ln = listNext(&li))) {slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;}if (ln) {// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB,我们直接把这个等待 BGSAVE 完成的// 从节点的输出缓冲区数据拷贝到当前节点的输出缓冲区(里面存储的就是同步数据,这也是上// 面不允许进行 SYNC 的从节点输出缓冲区内有其他内容的原因copyClientOutputBuffer(c,slave);c->replstate = REDIS_REPL_WAIT_BGSAVE_END;redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");} else {// 启动这个 BGSAVE 的时候,还没有从节点,所以需要等待下一个 BGSAVE 启动c->replstate = REDIS_REPL_WAIT_BGSAVE_START;redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");}} else {// 没有 BGSAVE 在进行,开始一个新的 BGSAVEredisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");addReplyError(c,"Unable to perform background save");return;}// 设置状态c->replstate = REDIS_REPL_WAIT_BGSAVE_END;// 刷新脚本缓存replicationScriptCacheFlush();}// 启用 Nagle 算法,失败也无所谓,所以不检查错误if (server.repl_disable_tcp_nodelay)anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */c->repldbfd = -1;c->flags |= REDIS_SLAVE;// 确保后续传播到从节点的数据会发送 SELECT 命令server.slaveseldb = -1; // 添加到 slave 列表中listAddNodeTail(server.slaves,c);// 如果是第一个 slave ,那么初始化 backlog,只有初始化了 backlog,主节点才会传播命令到从节点if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)createReplicationBacklog();return;
}
/*当 BGSAVE 完成时,会调用这个函数 */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {if (!bysignal && exitcode == 0) {// BGSAVE 成功redisLog(REDIS_NOTICE,"Background saving terminated with success");// dirty 更新为 BGSAVE 以后的脏数据数server.dirty = server.dirty - server.dirty_before_bgsave;server.lastsave = time(NULL);server.lastbgsave_status = REDIS_OK;} else if (!bysignal && exitcode != 0) {// BGSAVE 出错redisLog(REDIS_WARNING, "Background saving error");server.lastbgsave_status = REDIS_ERR;} else {// BGSAVE 被中断redisLog(REDIS_WARNING,"Background saving terminated by signal %d", bysignal);// 移除临时文件rdbRemoveTempFile(server.rdb_child_pid);/* SIGUSR1 is whitelisted, so we have a way to kill a child without* tirggering an error conditon. */if (bysignal != SIGUSR1)server.lastbgsave_status = REDIS_ERR;}// 更新服务器状态server.rdb_child_pid = -1;server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;server.rdb_save_time_start = -1;// 处理正在等待 BGSAVE 完成的从节点updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}void updateSlavesWaitingBgsave(int bgsaveerr) {listNode *ln;int startbgsave = 0;listIter li;// 遍历所有 slavelistRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {// 之前的 RDB 文件不能被 slave 使用,立马开始一个新 BGSAVEstartbgsave = 1;slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {// 执行到这里,说明有 slave 在等待 BGSAVE 完成struct redis_stat buf;if (bgsaveerr != REDIS_OK) {// 但是 BGSAVE 执行错误 释放 slavefreeClient(slave);redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");continue;}// 打开 RDB 文件if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||redis_fstat(slave->repldbfd,&buf) == -1) {freeClient(slave);redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));continue;}// 设置偏移量,各种值,准备发送 rdb 文件给从节点slave->repldboff = 0;slave->repldbsize = buf.st_size;slave->replstate = REDIS_REPL_SEND_BULK;slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) slave->repldbsize);// 清空之前的写事件处理器,注册新的写事件处理器aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {freeClient(slave);continue;}}}// 需要执行新的 BGSAVEif (startbgsave) {// 开始行的 BGSAVE ,并清空脚本缓存replicationScriptCacheFlush();if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {// 启动 BGSAVE 失败的话,断开与从节点的链接listIter li;listRewind(server.slaves,&li);redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)freeClient(slave);}}}
}
// 向从节点发送 rdb 文件时的写回调函数
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *slave = privdata;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);char buf[REDIS_IOBUF_LEN];ssize_t nwritten, buflen;// 要发送 rdb 文件的长度到从节点if (slave->replpreamble) {nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));if (nwritten == -1) {redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",strerror(errno));freeClient(slave);return;}sdsrange(slave->replpreamble,nwritten,-1);if (sdslen(slave->replpreamble) == 0) {sdsfree(slave->replpreamble);slave->replpreamble = NULL;/* fall through sending data. */} else {return;}}// 开始发送 rdb 文件lseek(slave->repldbfd,slave->repldboff,SEEK_SET);// 读取 RDB 数据buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);if (buflen <= 0) {redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",(buflen == 0) ? "premature EOF" : strerror(errno));freeClient(slave);return;}// 发送 rdb 数据到从节点if ((nwritten = write(fd,buf,buflen)) == -1) {if (errno != EAGAIN) {redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",strerror(errno));freeClient(slave);}return;}// 更新 offsetslave->repldboff += nwritten;// 如果写入已经完成if (slave->repldboff == slave->repldbsize) {// 关闭 RDB 文件描述符close(slave->repldbfd);slave->repldbfd = -1;// 删除之前绑定的写事件处理器aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);// 只有在传输完 rdb 文件之后,从节点才算处于 online 状态slave->replstate = REDIS_REPL_ONLINE;// 更新响应时间slave->repl_ack_time = server.unixtime;// 在 BGSAVE 启动后的所有需要传播的数据都先保存在从节点的输出缓存中// 现在可以开始发送这些数据了,注册发送缓冲区数据的写回调,个人感觉也许应该先检查一下// 缓冲区非空再注册if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,sendReplyToClient, slave) == AE_ERR) {redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));freeClient(slave);return;}// 刷新低延迟从节点的数量(slave->repl_ack_time 修改后都需要重新计算)refreshGoodSlavesCount();redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");}
}

增量同步

当有写命令需要传播的时候,主节点还需要将这些需要传播的数据发送到从节点:

/*backlog 是一个循环数组,用于存储 BGSAVE 开始以后的增量修改。当有命令需要传播到从节点会先写入到 backlog,然后再发送到从节点。因为循环数组大小有限,这也是是 PSYNC 对于超过范围的 offset 无法支持。这里再介绍一下 redis 中主节点用:<replication_id> <offset> 表示数据库状态,任意两个节点,只要拥有相同的 replication id 和offset,那么就可以说他们拥有相同的数据。replication id 是主节点随机生成的字符串,offset 是增量数据的字节偏移量 */
void feedReplicationBacklog(void *ptr, size_t len) {unsigned char *p = ptr;// server.master_repl_offset 代表全局偏移量,每次传播命令都累加server.master_repl_offset += len;// 环形 buffer ,每次写尽可能多的数据,并在到达尾部时将 idx 重置到头部// 写满以后会覆盖之前的数据,但是没有关系,这个本来就是作为一个后备存储// 传播的命令都会立马发送到客户端while(len) {// 从 idx 到 backlog 尾部的字节数size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;// 如果 idx 到 backlog 尾部这段空间足以容纳要写入的内容// 那么直接将写入数据长度设为 len// 在将这些 len 字节复制之后,这个 while 循环将跳出if (thislen > len) thislen = len;// 将 p 中的 thislen 字节内容复制到 backlogmemcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);// 更新 idx ,指向新写入的数据之后server.repl_backlog_idx += thislen;// 如果写入达到尾部,那么将索引重置到头部if (server.repl_backlog_idx == server.repl_backlog_size)server.repl_backlog_idx = 0;// 减去已写入的字节数len -= thislen;// 将指针移动到已被写入数据的后面,指向未被复制数据的开头p += thislen;// 增加实际长度server.repl_backlog_histlen += thislen;}// server.repl_backlog_histlen 代表当前可用的数据长度,最大不能超过 backlog 的容量// 如果已经超过,代表写入的数据已经发生了覆盖if (server.repl_backlog_histlen > server.repl_backlog_size)server.repl_backlog_histlen = server.repl_backlog_size;// server.repl_backlog_off 是 backlog 中最小的全局偏移值,当从节点进行 PSYNC 时// 只有 offset 在 [server.repl_backlog_off,server.master_repl_offset) 内才// 可能进行 partial resyncserver.repl_backlog_off = server.master_repl_offset) -server.repl_backlog_histlen + 1;
}
/*当启动主从同步以后,主从节点需要持续同步增量数据 */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {listNode *ln;listIter li;int j, len;char llstr[REDIS_LONGSTR_SIZE];// backlog 为空,且没有从服务器,直接返回if (server.repl_backlog == NULL && listLength(slaves) == 0) return;// 如果有从节点,那么必然已经创建了 backlog 存储redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));// 如果有需要的话,发送 SELECT 命令,指定数据库if (server.slaveseldb != dictid) {robj *selectcmd;if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {int dictid_len;dictid_len = ll2string(llstr,sizeof(llstr),dictid);selectcmd = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",dictid_len, llstr));}// 将 SELECT 命令添加到 backlogif (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);// 发送给所有从服务器listRewind(slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;addReply(slave,selectcmd);}if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}server.slaveseldb = dictid;// 将命令写入到 backlogif (server.repl_backlog) {// 构造传播数据char aux[REDIS_LONGSTR_SIZE+3];aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);// 将参数从对象转换成协议格式aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}// 将命令发送给所有从节点listRewind(slaves,&li);while((ln = listNext(&li))) {// 指向从服务器redisClient *slave = ln->value;// 不要给正在等待 BGSAVE 开始的从服务器发送命令,因为这个数据对他们来说是无用非法的// 这种节点的缓冲区需要保存的是对他么合法的 BGSAVE 开始以后的增量数据if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;// 添加数据到缓冲区,这些数据只有在已经完成 rdb 传输以后才会发送给从节点addReplyMultiBulkLen(slave,argc);for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);}
}

Partial Resync

Partial Resync 是为了避优化已经同步过的主从节点,在链接短暂断开以后重新同步时的开销和效率。假设从节点与主节点同步以后,因为网络原因,与主节点短暂断开了链接,期间从节点会丢失一些主节点的增量更新数据。当主从再次链接以后,如果发现主节点还保存有从节点丢失的信息(在 backlog 中),这次同步就可以跳过刚刚介绍的全量同步的步骤,这无疑可以大大提升同步效率。

需要指出 PSYNC 在一些老版本的 redis 中是不支持的。笔者看的源码 3.0,对照了下现在 redis 5.x 的代码,其实相关部分改动有很多,redis 开发团队对 PSYNC 又做了相当多的优化。但是对基础想法的理解的重要性,要远高于优化,所以笔者还是以 3.0 版本代码为基础讲解:

/*收到 PSYNC 命令后,主节点会尝试看看可不可以进行 parital resync,如果不行的话需要重新进行全量+增量同步过程。PSYNC 命令格式如下:PSYNC replication_id offset代表从节点希望与 replication_id 主节点,从 offset 偏移量开始重新同步数据 */
int masterTryPartialResynchronization(redisClient *c) {long long psync_offset, psync_len;char *master_runid = c->argv[1]->ptr;char buf[128];int buflen;// 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能if (strcasecmp(master_runid, server.runid)) {if (master_runid[0] != '?') {// replication_id 不匹配,这种情况是因为这个主节点是由从节点提升而来或者主节点重启// 而这些要求 PSYNC 的节点保存的仍然是之前主节点的 replication_id redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: ""Runid mismatch (Client asked for runid '%s', my runid is '%s')",master_runid, server.runid);} else {// replication_id 是 ? 代表从节点自己指定了全量同步redisLog(REDIS_NOTICE,"Full resync requested by slave.");}// 需要 full resyncgoto need_full_resync;}// 取出 psync_offset 参数,从节点要求从这个 offset 开始同步数据if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=REDIS_OK) goto need_full_resync;/*backlog 保存一定 offset 范围内的增量数据,如果从节点要求的 offset在 backlog 保存的 offset 范围内,就可以进行 partial resync*/if (!server.repl_backlog ||psync_offset < server.repl_backlog_off ||psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)){// 执行 FULL RESYNCredisLog(REDIS_NOTICE,"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);if (psync_offset > server.master_repl_offset) {redisLog(REDIS_WARNING,"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");}goto need_full_resync;}// 可以进行 partial resync 的从节点,设置为 ONLINE 状态,加入到从节点列表中c->flags |= REDIS_SLAVE;c->replstate = REDIS_REPL_ONLINE;c->repl_ack_time = server.unixtime;listAddNodeTail(server.slaves,c);// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");if (write(c->fd,buf,buflen) != buflen) {freeClientAsync(c);return REDIS_OK;}// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器psync_len = addReplyReplicationBacklog(c,psync_offset);redisLog(REDIS_NOTICE,"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);// 刷新低延迟从服务器的数量refreshGoodSlavesCount();return REDIS_OK; /* The caller can return, no full resync needed. */need_full_resync:// psync 从最新 offset 开始psync_offset = server.master_repl_offset;// 如果还没有创建 repl_backlog,offset 再加1if (server.repl_backlog == NULL) psync_offset++;// 发送 +FULLRESYNC ,表示需要完整重同步buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",server.runid,psync_offset);if (write(c->fd,buf,buflen) != buflen) {freeClientAsync(c);return REDIS_OK;}return REDIS_ERR;
}
// partial resync,发送从节点缺失数据
long long addReplyReplicationBacklog(redisClient *c, long long offset) {long long j, skip, len;redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);if (server.repl_backlog_histlen == 0) {redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");return 0;}redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",server.repl_backlog_size);redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",server.repl_backlog_off);redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",server.repl_backlog_histlen);redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",server.repl_backlog_idx);// server.repl_backlog_off 是 backlog 内保存的 olddest offset// offset - server.repl_backlog_off 即我们需要跳过的字节数skip = offset - server.repl_backlog_off;redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);// 将 j 指向 offset 对应在 backlog 内的地址j = (server.repl_backlog_idx +(server.repl_backlog_size-server.repl_backlog_histlen)) %server.repl_backlog_size;redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);j = (j + skip) % server.repl_backlog_size;// 发送从节点缺失的数据len = server.repl_backlog_histlen - skip;redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);while(len) {long long thislen =((server.repl_backlog_size - j) < len) ?(server.repl_backlog_size - j) : len;redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));len -= thislen;j = 0;}return server.repl_backlog_histlen - skip;
}

Slave

不同于主节点,从节点既要作为服务端,在主从同步的过程中,还要作为一个客户端与主节点通信。通过在配置文件中添加 salveof HOST PORT 指令,节点启动以后即会成为指定主节点的从节点:

   // 解析配置,当 server.masterhost 非空则代表这个节点是从节点} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {slaveof_linenum = linenum;server.masterhost = sdsnew(argv[1]);server.masterport = atoi(argv[2]);server.repl_state = REDIS_REPL_CONNECT;}

链接主节点

主从同步相关的逻辑主要集中在 replicationCron 中,这个函数每 1s 调用一次:

/*取消链接,这里的链接不是指 TCP 链接,而是主从同步链接。 */
void undoConnectWithMaster(void) {int fd = server.repl_transfer_s;// 连接必须处于正在连接状态redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||server.repl_state == REDIS_REPL_RECEIVE_PONG);aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);close(fd);server.repl_transfer_s = -1;// 回到 CONNECT 状态server.repl_state = REDIS_REPL_CONNECT;
}/*repl_state 状态转移REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG |REDIS_REPL_CONNECTED <- REDIS_REPL_TRANSFER */
void replicationCron(void) {// 链接到主节点超时,取消链接if (server.masterhost &&(server.repl_state == REDIS_REPL_CONNECTING ||server.repl_state == REDIS_REPL_RECEIVE_PONG) &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");// 取消连接undoConnectWithMaster();}// RDB 文件的传送已超时if (server.masterhost && server.repl_state ==  &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");// 停止传送,并删除临时文件replicationAbortSyncTransfer();}// 从服务器曾经连接上主服务器,但现在超时(当重新链接的时候,就会触发 PSYNC)if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&(time(NULL)-server.master->lastinteraction) > server.repl_timeout){redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");// 释放主服务器freeClient(server.master);}// 还没有与主节点建立连接if (server.repl_state == REDIS_REPL_CONNECT) {redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);// 非阻塞的链接主节点if (connectWithMaster() == REDIS_OK) {redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");}}// 如果已经建立了链接,而且主节点支持 PSYNC,发送 ACK 给主节点if (server.masterhost && server.master &&!(server.master->flags & REDIS_PRE_PSYNC))replicationSendAck();// 如果有从服务器,那么隔一段时间发送一个 PING 指令作为心跳,让对方知道我们没有掉线if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {listIter li;listNode *ln;robj *ping_argv[1];// 向从节点传播 PINGping_argv[0] = createStringObject("PING",4);replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);decrRefCount(ping_argv[0]);// 因为对于 REDIS_REPL_WAIT_BGSAVE_START 和 REDIS_REPL_WAIT_BGSAVE_END 状态// 的从节点,其不会相应 PING 命令,所以我们发送一个 \n 给从节点,这个没有实际作用,只是// 让对方知道我们没有掉线listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {if (write(slave->fd, "\n", 1) == -1) {/* Don't worry, it's just a ping. */}}}}// 断开超时从服务器if (listLength(server.slaves)) {listIter li;listNode *ln;// 遍历所有从服务器listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;// 略过未 ONLINE 的从服务器if (slave->replstate != REDIS_REPL_ONLINE) continue;// 不检查旧版的从服务器if (slave->flags & REDIS_PRE_PSYNC) continue;// 释放超时从服务器if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout){char ip[REDIS_IP_STR_LEN];int port;if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {redisLog(REDIS_WARNING,"Disconnecting timedout slave: %s:%d",ip, slave->slave_listening_port);} // 释放freeClient(slave);}}}// 没有任何从服务器,等待一定时间后就释放 backlog 资源(等待一段时间是为了防止从节点只是暂时掉线)if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&server.repl_backlog){time_t idle = server.unixtime - server.repl_no_slaves_since;if (idle > server.repl_backlog_time_limit) {// 释放freeReplicationBacklog();redisLog(REDIS_NOTICE,"Replication backlog freed after %d seconds ""without connected slaves.",(int) server.repl_backlog_time_limit);}}// 在没有任何从服务器,AOF 关闭的情况下,清空 script 缓存if (listLength(server.slaves) == 0 &&server.aof_state == REDIS_AOF_OFF &&listLength(server.repl_scriptcache_fifo) != 0){replicationScriptCacheFlush();}// 更新符合给定延迟值的从服务器的数量refreshGoodSlavesCount();
}int connectWithMaster(void) {int fd;// 非阻塞地连接主服务器,注意如果 connect 的套接字是非阻塞的,那么 connect 返回的时候// 不代表已经完成了 TCP 三次握手fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);if (fd == -1) {redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",strerror(errno));return REDIS_ERR;}// 监听主服务器 fd 的读和写事件,并绑定文件事件处理器,当完成 3 次握手,fd 应该立马变为可写// 就会调用 syncWithMasterif (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==AE_ERR){close(fd);redisLog(REDIS_WARNING,"Can't create readable event for SYNC");return REDIS_ERR;}// 初始化统计变量server.repl_transfer_lastio = server.unixtime;server.repl_transfer_s = fd;// 将状态改为 REDIS_REPL_CONNECTINGserver.repl_state = REDIS_REPL_CONNECTING;return REDIS_OK;
}void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {char tmpfile[256], *err;int dfd, maxtries = 5;int sockerr = 0, psync_result;socklen_t errlen = sizeof(sockerr);REDIS_NOTUSED(el);REDIS_NOTUSED(privdata);REDIS_NOTUSED(mask);// 如果本从节点在与之前主节点建立连接以后,被提升为主节点,那么 server.repl_state 被设置为 REDIS_REPL_NONE// 这种情况下,我们立刻关闭套接字并且退出,因为我们已经不再是从节点if (server.repl_state == REDIS_REPL_NONE) {close(fd);return;}// 检查套接字错误,非阻塞的 connect 需要检查错误if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)sockerr = errno;if (sockerr) {aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",strerror(sockerr));goto error;}// 如果处于 REDIS_REPL_CONNECTING,代表我们需要进行一次同步(不管是全量还是 partial resync)// 在此之前,先阻塞的发送一个 PING,这个 PING 没有额外的含义,只是希望确定与主节点的网路情况良好if (server.repl_state == REDIS_REPL_CONNECTING) {redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");// 删除写事件监听,但是仍然保留读事件,因为我们要在这个函数里面读取主节点的 PONG 相应aeDeleteFileEvent(server.el,fd,AE_WRITABLE);// 更新状态server.repl_state = REDIS_REPL_RECEIVE_PONG;// 同步发送 PING,用 poll 实现了一个支持超时的阻塞写,具体代码不分析了,比较简单syncWrite(fd,"PING\r\n",6,100);// 返回,下次这个函数被调用的时候是主节点发回相应的时候return;}// 主节点对之前的 PING 进行了相应if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {char buf[1024];// 读事件也删除,后续的读事件回调不会用到这个函数aeDeleteFileEvent(server.el,fd,AE_READABLE);// 阻塞的读取主节点响应buf[0] = '\0';if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1){redisLog(REDIS_WARNING,"I/O error reading PING reply from master: %s",strerror(errno));goto error;}/* 合法响应只有3种:1. +PONG2. —NOAUTH3. -ERR operation not permitted后面两种需要进行鉴权后面会处理,其余响应均为非法*/if (buf[0] != '+' &&strncmp(buf,"-NOAUTH",7) != 0 &&strncmp(buf,"-ERR operation not permitted",28) != 0){// 接收到未验证错误redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);goto error;} else {redisLog(REDIS_NOTICE,"Master replied to PING, replication can continue...");}}// 进行身份验证,阻塞的发送 AUTH 命令,并且等待对方回应if(server.masterauth) {err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);if (err[0] == '-') {// AUTH 失败redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);sdsfree(err);goto error;}sdsfree(err);}// 将从节点作为 redis 服务器的节点通知给主节点,从节点也是可以服务客户端的{sds port = sdsfromlonglong(server.port);err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,NULL);sdsfree(port);// 老版本的 redis 不支持这个命令,所以不是致命错误,记录一下即可if (err[0] == '-') {redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);}sdsfree(err);}// 尝试进行 PSYNCpsync_result = slaveTryPartialResynchronization(fd);// 可以执行 partial resync,退出(新的事件处理函数已经绑定好了)if (psync_result == PSYNC_CONTINUE) {redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");return;}// 如果主服务器并不支持 PSYNC,我们还需要额外发送一个 SYNC 命令来请求同步if (psync_result == PSYNC_NOT_SUPPORTED) {redisLog(REDIS_NOTICE,"Retrying with SYNC...");if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",strerror(errno));goto error;}}// 不管是不支持 PSYNC 还是无法 partial resync,这里都需要进行全量同步,我们需要准备// 接收主节点的 rdb 文件,大家一个临时文件用于保存 rdbwhile(maxtries--) {snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd != -1) break;sleep(1);}if (dfd == -1) {redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));goto error;}// 设置一个读事件处理器,来读取主服务器的 RDB 文件if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)== AE_ERR){redisLog(REDIS_WARNING,"Can't create readable event for SYNC: %s (fd=%d)",strerror(errno),fd);goto error;}// 设置状态server.repl_state = REDIS_REPL_TRANSFER;// 更新统计信息server.repl_transfer_size = -1;server.repl_transfer_read = 0;server.repl_transfer_last_fsync_off = 0;server.repl_transfer_fd = dfd;server.repl_transfer_lastio = server.unixtime;server.repl_transfer_tmpfile = zstrdup(tmpfile);return;
error:close(fd);server.repl_transfer_s = -1;server.repl_state = REDIS_REPL_CONNECT;return;
}

全量同步

我们在分析主节点代码时就知道全量同步就是主节点发送 rdb 文件的过程。而从节点在需要全量同步以后会注册读回调函数来处理即将收到的 rdb 文件:

#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {char buf[4096];ssize_t nread, readlen;off_t left;REDIS_NOTUSED(el);REDIS_NOTUSED(privdata);REDIS_NOTUSED(mask);// 全量同步最开始发送的是 rdb 文件的大小,如果 server.repl_transfer_size == -1// 我们需要先解析 rdb 文件大小if (server.repl_transfer_size == -1) {// 调用读函数if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",strerror(errno));goto error;}// 出错?if (buf[0] == '-') {redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s",buf+1);goto error;} else if (buf[0] == '\0') {// 还记得在 replicationCron 中对与 BGSAVE_START 和 BGSAVE_END 的从节点发送的 \n 吗server.repl_transfer_lastio = server.unixtime;return;} else if (buf[0] != '$') {// 读入的内容出错,和协议格式不符redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);goto error;}// 分析 RDB 文件大小server.repl_transfer_size = strtol(buf+1,NULL,10);redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync: receiving %lld bytes from master",(long long) server.repl_transfer_size);// 不明白为什么这里就要退出了,其实可以继续读吧return;}// 开始正式读取 rdb 文件的内容(这里不太明白了,按说应该处理 EAGAIN 等情况才对)left = server.repl_transfer_size - server.repl_transfer_read;readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);nread = read(fd,buf,readlen);if (nread <= 0) {redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",(nread == -1) ? strerror(errno) : "connection lost");replicationAbortSyncTransfer();return;}// 写入到临时 rdb 文件server.repl_transfer_lastio = server.unixtime;if (write(server.repl_transfer_fd,buf,nread) != nread) {redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));goto error;}// 加上刚读取好的字节数server.repl_transfer_read += nread;// 定期将读入的文件 fsync 到磁盘,以免 buffer 太多,一下子写入时撑爆 IOif (server.repl_transfer_read >=server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC){off_t sync_size = server.repl_transfer_read -server.repl_transfer_last_fsync_off;rdb_fsync_range(server.repl_transfer_fd,server.repl_transfer_last_fsync_off, sync_size);server.repl_transfer_last_fsync_off += sync_size;}// 检查 RDB 是否已经传送完毕if (server.repl_transfer_read == server.repl_transfer_size) {// 完毕,将临时文件改名为 dump.rdbif (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));replicationAbortSyncTransfer();return;}// 先清空旧数据库redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");signalFlushedDb(-1);emptyDb(replicationEmptyDbCallback);// 先删除主服务器的读事件监听,因为 rdbLoad() 会调用 rdbLoadProgressCallback// 这个函数会调用 eventLoop 处理事件,如果不删除,会导致递归调用aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);// 载入 RDBif (rdbLoad(server.rdb_filename) != REDIS_OK) {redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");replicationAbortSyncTransfer();return;}// 关闭临时文件zfree(server.repl_transfer_tmpfile);close(server.repl_transfer_fd);// 为主节点构造客户端资源,createClient 中会绑定好读事件server.master = createClient(server.repl_transfer_s);// 标记这个客户端为主服务器server.master->flags |= REDIS_MASTER;// 标记它为已验证身份server.master->authenticated = 1;// 更新复制状态server.repl_state = REDIS_REPL_CONNECTED;// 设置主服务器的复制偏移量server.master->reploff = server.repl_master_initial_offset;// 保存主服务器的 RUN IDmemcpy(server.master->replrunid, server.repl_master_runid,sizeof(server.repl_master_runid));// 如果 offset 被设置为 -1 ,那么表示主服务器的版本低于 2.8 // 无法使用 PSYNC ,所以需要设置相应的标识值if (server.master->reploff == -1)server.master->flags |= REDIS_PRE_PSYNC;redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");// 重启 AOF,这会导致进行一次 AOF 重写if (server.aof_state != REDIS_AOF_OFF) {int retry = 10;// 关闭stopAppendOnly();// 再重启while (retry-- && startAppendOnly() == REDIS_ERR) {redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");sleep(1);}if (!retry) {redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");exit(1);}}}return;error:replicationAbortSyncTransfer();return;
}

增量同步

增量同步其实就是从节点不断接收主节点传播的写指令的过程

Partial Resync

当与主节点进行同步的时候,从节点会优先尝试进行 partial resync:

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {char *psync_runid;char psync_offset[32];sds reply;// 如果主节点支持 PSYNC,那么我们不管是进行了 FULLRESYNC 还是 partial resync// repl_master_initial_offset 都不会是 -1.只有对于不支持 PSYNC 的主节点// repl_master_initial_offset 才会保持1server.repl_master_initial_offset = -1;if (server.cached_master) {/*之前就提到了,resync 是发生在曾经同步过的主从之间,如果有曾经重新同步过的主节点我们将之前的 replication id 和 offset 发送过去请求 PSYNC*/psync_runid = server.cached_master->replrunid;snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);} else {// 缓存不存在,replication id 设置为 ? 请求全量同步redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");psync_runid = "?";memcpy(psync_offset,"-1",3);}// 向主服务器发送 PSYNC 命令,读取其响应reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);if (!strncmp(reply,"+FULLRESYNC",11)) {// 虽然支持 PSYNC 命令,但是无法进行 partial resync,执行全量同步char *runid = NULL, *offset = NULL;// 分析并记录主服务器的 run idrunid = strchr(reply,' ');if (runid) {runid++;offset = strchr(runid,' ');if (offset) offset++;}// 检查 run id 的合法性if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {redisLog(REDIS_WARNING,"Master replied with wrong +FULLRESYNC syntax.");// 主服务器支持 PSYNC ,但是却发来了异常的 run id// 只好将 run id 设为 0 ,让下次 PSYNC 时失败memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);} else {// 保存 run idmemcpy(server.repl_master_runid, runid, offset-runid-1);server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';// 以及 initial offsetserver.repl_master_initial_offset = strtoll(offset,NULL,10);// 打印日志,这是一个 FULL resyncredisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",server.repl_master_runid,server.repl_master_initial_offset);}// 要开始完整重同步,缓存中的 master 已经没用了,清除它replicationDiscardCachedMaster();sdsfree(reply);// 返回状态return PSYNC_FULLRESYNC;}if (!strncmp(reply,"+CONTINUE",9)) {// 可以进行 partial resync,我们会收到主节点从 offset 之后的数据redisLog(REDIS_NOTICE,"Successful partial resynchronization with master.");sdsfree(reply);// 将缓存中的 master 设为当前 masterreplicationResurrectCachedMaster(fd);// 返回状态return PSYNC_CONTINUE;}// 主节点不支持 PSYNC 命令if (strncmp(reply,"-ERR",4)) {/* If it's not an error, log the unexpected event. */redisLog(REDIS_WARNING,"Unexpected reply to PSYNC from master: %s", reply);} else {redisLog(REDIS_NOTICE,"Master does not support PSYNC or is in ""error state (reply: %s)", reply);}sdsfree(reply);// 清楚缓存的主服务器replicationDiscardCachedMaster();// 主服务器不支持 PSYNCreturn PSYNC_NOT_SUPPORTED;
}void replicationResurrectCachedMaster(int newfd) {// 设置 masterserver.master = server.cached_master;server.cached_master = NULL;server.master->fd = newfd;server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);server.master->authenticated = 1;server.master->lastinteraction = server.unixtime;// 回到已连接状态server.repl_state = REDIS_REPL_CONNECTED;// 将主节点加入到客户端列表中listAddNodeTail(server.clients,server.master);// 后续只需要想处理其他客户端一样处理主节点传播的消息即可if (aeCreateFileEvent(server.el, newfd, AE_READABLE,readQueryFromClient, server.master)) {redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));freeClientAsync(server.master); /* Close ASAP. */}// 如果有需要发送给主节点的消息,注册写回调if (server.master->bufpos || listLength(server.master->reply)) {if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,sendReplyToClient, server.master)) {redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));freeClientAsync(server.master); /* Close ASAP. */}}
}

当我们要释放某个客户端时候,会检查这个客户端是不是主节点,如果是的话,就会将其缓存下来,用于后续 PSYNC:

void freeClient(redisClient *c) {// 略if (server.master && c->flags & REDIS_MASTER) {redisLog(REDIS_WARNING,"Connection with master lost.");if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP|REDIS_BLOCKED|REDIS_UNBLOCKED))){replicationCacheMaster(c);return;}}
}void replicationCacheMaster(redisClient *c) {listNode *ln;redisAssert(server.master != NULL && server.cached_master == NULL);redisLog(REDIS_NOTICE,"Caching the disconnected master state.");// 从客户端链表中移除主服务器ln = listSearchKey(server.clients,c);redisAssert(ln != NULL);listDelNode(server.clients,ln);// 缓存 masterserver.cached_master = server.master;// 删除事件监视,关闭 socketaeDeleteFileEvent(server.el,c->fd,AE_READABLE);aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);close(c->fd);c->fd = -1;// 删除 peeridif (c->peerid) {sdsfree(c->peerid);c->peerid = NULL;}replicationHandleMasterDisconnection();
}void replicationHandleMasterDisconnection(void) {// 设置 replication 状态,重置 masterserver.master = NULL;server.repl_state = REDIS_REPL_CONNECT;server.repl_down_since = server.unixtime;// 如果这个从节点本身有从节点,断开所有与从节点的链接if (server.masterhost != NULL) disconnectSlaves();
}

我们要注意,数据的同步是单向的,只会从主节点到从节点,任何发生在从节点的写入,最终都会丢失。redis 并不禁止从节点处理写请求。

后续优化

在最新的 redis 代码中针对主从同步还做了很多优化,比如:

  1. 如果主节点是由从节点提升而来,那么它会保存之前主节点的信息,当从节点使用之前主节点的 replication id 进行 PSYNC 的时候,partial resync 仍然可能进行
  2. 全量同步时,主节点将 rdb 落盘再发送给从节点是没有必要的,后面的 redis 支持 diskless replication

总结

  1. 主从同步时数据库服务分布式部署必须面对的问题,redis 使用异步同步,达到最终一致性,但是这也导致了永远存在一个丢失数据的时间窗口(单点也有这个问题)
  2. redis 使用 PSYNC 机制来降低 resync 的成本
  3. 从节点本身也可以有自己的从节点,但是使用场景很少
  4. 通过添加从节点,我们可以扩展 redis 的服务能力,比如添加 read-only 从节点,将读请求分发到从节点,来减轻主节点压力。但是数据同步是单向的,从节点上发生的写入,最终都会丢失
  相关解决方案