什么是 Raft 算法?
Raft 算法属于 Multi-Paxos 算法,它是在兰伯特 Multi-Paxos 思想的基础上,做了一些简化和限制,比如增加了日志必须是连续的,只支持领导者、跟随者和候选人三种状态。
Raft 算法是现在分布式系统开发首选的共识算法。绝大多数选用 Paxos 算法的系统(比如 Cubby、Spanner)都是在 Raft 算法发布前开发的,当时没得选;而全新的系统大多选择了 Raft 算法(比如 Etcd、Consul)。
如果要用一句话概括 Raft 算法,我觉得是这样的:从本质上说,Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致。
Raft 角色
-
领导者:处理写请求、管理日志复制和不断地发送心跳信息。
-
候选者:候选人将向其他节点发送请求投票 RPC 消息,通知其他节点来投票,如果赢得了大多数选票,就晋升当领导者。
-
跟随者:默默地接收和处理来自领导者的消息,当等待领导者心跳信息超时的时候,就主动站出来,推荐自己当候选人。
Raft 通过保留复制的日志来工作。该日志是仅追加的数据结构,其中添加了新条目,并且只有一个服务器(领导者)负责管理此日志。每个 write 请求都发送到领导者节点,并且该节点会将其分发给跟随者节点,并确保仅在安全存储数据后,客户端才会收到此写入的确认。
共识问题分为三个子问题:领导人选举,复制和安全。
领导人选举
服务器始终以跟随者身份启动,并且期望领导者发出心跳。跟随者将等待此心跳一段时间(定义为 election timeout),如果未收到该心跳 ,则将假定领导者死了并转换为候选状态。进入此状态后,它要做的第一件事是为自己投票,然后向所有其他节点发送投票请求(此请求是一个称为的 RPC RequestVote)。如果它从该群集中的大多数节点(例如 5 个中的 3个)收到对该请求的确认,则转换为 Leader 状态。
1、超时为 150ms 的节点,就增加自己的任期编号,并推举自己为候选人,先给自己投上一张选票,然后向其他节点发送请求投票 RPC 消息,请它们选举自己为领导者。
2、如果其他节点接收到候选人 A 的请求投票 RPC 消息,在编号为 1 的这届任期内,也还没有进行过投票,那么它将把选票投给节点 A,并增加自己的任期编号。
3、如果候选人在选举超时时间内赢得了大多数的选票,那么它就会成为本届任期内新的领导者。
4、节点 A 当选领导者后,它将周期性地发送心跳消息,通知其他服务器我是领导者,阻止跟随者发起新的选举。
存在的问题?
首先如果所有节点都同时启动,那么它们也都将同时超时,这意味着每个节点都将触发该相同的 RequestVote RPC,这使得单个节点很难获得多数票。Raft 通过为每个节点使用随机选举超时来缓解此问题,这意味着一个候选者通常会在其他候选者之前超时,有可能成为新领导者。
即使有这个随机超时,我们仍然可以有一个分裂表决的情况,其中没有一个节点拥有多数表决权。例如,在一个由 5 个节点组成的集群中,当领导者去世时,我们将最终得到 4 个节点,并且如果其中两个节点大致同时超时,那么每个节点都将获得 2 票,因此他们都无法成为领导者。解决方案非常简单:只需等待另一个超时,这很可能会解决问题。当发生此超时并且该周期没有领导者时,将启动一个新的周期,并且每个节点在下一次选举中都将具有一个新的随机超时值,这可能又不一样了。因此,我们将对性能造成损失,但是此超时通常只有几毫秒,并且需要进行分割表决这种情况应该很少见。
在 Raft 算法中约定,如果一个候选人或者领导者,发现自己的任期编号比其他节点小,那么它会立即恢复成跟随者状态。
如果一个节点接收到一个包含较小的任期编号值的请求,那么它会直接拒绝这个请求。
当任期编号相同时,日志完整性高的跟随者,拒绝投票给日志完整性低的候选人
日志复制
当我们选出一个负责人后,每个请求都将发送到该节点。如果跟随者节点收到请求,则可以将其重定向到领导者,或者将错误返回给客户端,指示哪个节点是领导者。
领导者收到请求后,首先将其追加到其日志中,然后将请求发送给每个关注者,以便他们可以执行相同的操作。此RPC 称为 AppendEntries。尽管该消息已附加到日志中,但尚未提交,并且客户端未获得确认操作成功的确认。领导者从大多数节点得到确认后,就可以真正提交消息,知道消息已安全存储,然后响应客户端。当候选者收到下一条心跳消息(只是一个空的 AppendEntries RPC)时,他们知道他们也可以提交此消息。
除了客户端发送的命令外,每个日志条目还具有周期编号和索引。该周期仅定义了一个时间单位,索引是日志中的位置。
日志的格式
指令:一条由客户端请求指定的、状态机需要执行的指令。你可以将指令理解成客户端指定的数据。
索引值:日志项对应的整数索引值。它其实就是用来标识日志项的,是一个连续的、单调递增的整数号码。
任期编号:创建这条日志项的领导者的任期编号。
如何复制日志?
1、接收到客户端请求后,领导者基于客户端请求中的指令,创建一个新日志项,并附加到本地日志中。
2、领导者通过日志复制 RPC,将新的日志项复制到其他的服务器。
3、当领导者将日志项,成功复制到大多数的服务器上的时候,领导者会将这条日志项提交到它的状态机中。
4、领导者将执行的结果返回给客户端。
5、当跟随者接收到心跳信息,或者新的日志复制 RPC 消息后,如果跟随者发现领导者已经提交了某条日志项,而它还没提交,那么跟随者就将这条日志项提交到本地的状态机中。
安全
为确保正确复制每个日志并以相同的顺序执行命令,某些安全机制是必需的。
日志匹配属性
Raft 维护 “日志匹配属性” 属性,即如果两个不同的日志条目具有相同的术语号和相同的索引,则它们将:
- 存储完全相同的命令;
- 在所有前面的条目中都相同。
由于领导者将永远不会在同一周期中创建多个具有相同索引的条目,因此第一个属性已实现。
第二个属性是保证跟随者在收到 AppendEntries RPC 时执行的一致性检查,以确保前面的所有条目都是相同的。
它的工作方式如下:Leader 跟踪其日志中提交的最高索引,并在每个 AppendEntriesRPC(甚至是心跳)中发送该信息。如果跟随者在其本地日志中找不到带有该索引的条目,它将拒绝该请求,因此,如果 AppendEntriesRPC成功返回,则领导者将知道其日志与跟随者的日志相同。
当节点正常运行时,这些日志将始终保持一致。但是,当领导者崩溃时,此日志可能会不一致,这就是AppendEntries 一致性检查将为我们提供帮助的时候。想象一下这种情况:
- 我们有三个节点,N1,N2 和 N3,N1 成为领导者;
- N1 复制信息 term=1; index=1; command=x,并 term=1; index=2; command=y 用 N2,但 N3 从来没有得到这些消息;
- 现在 N1 崩溃了,N2 成为了新的领导者;
- 如果 N2 尝试将消息复制 term=2; index=3; command=z 到 N3,一致性检查将拒绝此消息,因为最高的提交索引(3)不存在于N3的日志中。
- N2 然后将返回日志,并在中的最新条目之后传输所有条目 N3,使日志再次一致。
选举限制
此属性保证如果候选人的日志中没有所有已提交的条目,则该候选人将永远不会赢得领导人选举。由于在大多数节点上都需要存在一个条目才能被视为已提交,因此在进行选举时,至少一个节点将具有最新的已提交条目。如果跟随者节点 RequestVote 从日志中后面的候选项(表示较小的期限编号,或相同的期限编号但索引较小)接收到RPC,则不会将投票授予该候选项。
在上面的示例中,我们有三个日志,每个条目均以其创建时的周期编号表示。 在这种情况下,Node 1 是领导者,并且能够提交到索引 5,在索引 5 中,它得到了大多数节点(自身和 Node 2)的确认。如果 Node 1 逝世并开始新的选举,也许 Node 3 可以成为第一个过渡到候选人并试图成为领导人的人。这将是一个问题,因为其日志没有最新的提交条目(周期 3,索引 5)。当它向发送一个 RequestVote 时 Node 2,此节点将注意到其自己的日志比的日志更新 Node 3,因此不会授予其投票权,从而无法 Node 3 成为领导者。
集群成员和共同共识
成员变更存在的问题
在集群中进行成员变更的最大风险是,可能会同时出现 2 个领导者。
在进行成员变更时,节点 A、B 和 C 之间发生了分区错误,节点 A、B 组成旧配置中的“大多数”,也就是变更前的 3 节点集群中的“大多数”,那么这时的领导者(节点 A)依旧是领导者。
另一方面,节点 C 和新节点 D、E 组成了新配置的“大多数”,也就是变更后的 5 节点集群中的“大多数”,它们可能会选举出新的领导者(比如节点 C)。那么这时,就出现了同时存在 2 个领导者的情况。
最简单的解决办法:重启
先将集群关闭再启动新集群。也就是先把节点 A、B、C 组成的集群关闭,然后再启动节点 A、B、C、D、E 组成的新集群。
这种办法行不通,因为你每次变更都要重启集群,意味着在集群变更期间服务不可用,太影响用户体验了。
常用的解决办法:单节点变更
通过一次变更一个节点实现成员变更。如果需要变更多个节点,那你需要执行多次单节点变更。比如将 3 节点集群扩容为 5 节点集群,这时你需要执行 2 次单节点 变更,先将 3 节点集群变更为 4 节点集群,然后再将 4 节点集群变更为 5 节点集群,就像下图的样子:
具体步骤:
目前的集群配置为[A, B, C],我们先向集群中加入节点 D,这意味着新配置为[A, B, C, D]。成员变更,是通过这么两步实现的:
第一步,领导者(节点 A)向新节点(节点 D)同步数据;
第二步,领导者(节点 A)将新配置[A, B, C, D]作为一个日志项,复制到新配置中所有节点(节点 A、B、C、D)上,然后将新配置的日志项提交到本地状态机,完成单节点变更。
在变更完成后,现在的集群配置就是[A, B, C, D],我们再向集群中加入节点 E,也就是说,新配置为[A, B, C, D, E]。成员变更的步骤和上面类似:
第一步,领导者(节点 A)向新节点(节点 E)同步数据;
第二步,领导者(节点 A)将新配置[A, B, C, D, E]作为一个日志项,复制到新配置中的所有节点(A、B、C、D、E)上,然后再将新配置的日志项提交到本地状态机,完成单节点变更。
这样一来,我们就通过一次变更一个节点的方式,完成了成员变更,保证了集群中始终只有一个领导者,而且集群也在稳定运行,持续提供服务。
不好实现的解决方案:联合共识
当集群中节点的状态发生变化(集群配置发生变化)时,系统容易受到可能导致系统故障的故障的影响。因此,为防止这种情况,Raft 使用了一种称为两阶段的方法来更改群集成员身份。因此,在这种方法中,群集在实现新的群集成员身份配置之前首先更改为中间状态(称为联合共识)。联合共识使系统即使在配置之间进行转换时也可用于响应客户端请求。因此,增加分布式系统的可用性是主要目的。
Raft 和 Paxos
Paxos 和 Raft 采取了非常相似的方式来达成分布式共识,只是在领导人选举上的方式有所不同。最值得注意的是,Raft 仅允许具有最新日志的服务器成为领导者,而 Paxos 允许任何服务器成为领导者,前提是它随后更新其日志以确保它是最新的。
考虑到其简单性,Raft 的方法出奇的高效,因为与 Paxos 不同,它不需要在领导人选举期间交换日志条目。
Raft 规范中一个特别有趣的组件是协调对集群成员资格进行更改的机制。该协议采用一种新颖的方法,其中使用两个重叠的多数(即由新旧集群配置定义的法定人数)达成联合共识,从而在不影响操作的情况下支持动态弹性。
Raft 的出现清楚地表明了软件开发社区的积极拥护,这是近40 种使用各种不同语言的开源实现所证明的。尽管Paxos 在描述分布式共识的本质上非常优雅,但是由于缺乏全面而规范的规范,Paxos 难以使用,并且难以在实际系统中实现。
Raft 在框架中的实现
RocketMQ DLedger 多副本之 Leader 选主
通过 DLedgerLeaderElector 的 startup 方法启动状态管理机,代码如下:
public void startup() {
// 启动状态维护管理器stateMaintainer.start();// 遍历节点角色状态改变监听器并启动它for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
roleChangeHandler.startup();}
}
启动状态维护管理器,主要看 run 逻辑,代码如下:
public void run() {
while (running.get()) {
try {
doWork();} catch (Throwable t) {
if (logger != null) {
logger.error("Unexpected Error in running {} ", getName(), t);}}}latch.countDown();
}@Override public void doWork() {
try {
// 判断节点是否参与 Leader 选举,默认是 trueif (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);// 维护者的核心方法DLedgerLeaderElector.this.maintainState();}sleep(10); // 每执行一次选举,休息10毫秒} catch (Throwable t) {
DLedgerLeaderElector.logger.error("Error in heartbeat", t);}
}
maintainState 方法会根据当前节点的角色运行指定的逻辑,代码如下:
private void maintainState() throws Exception {
if (memberState.isLeader()) {
// 领导者,将心跳发送给追随者maintainAsLeader();} else if (memberState.isFollower()) {
// 追随者,接受心跳,当领导没有心跳时更换候选人maintainAsFollower();} else {
// 候选人,提出一个投票maintainAsCandidate();}
}
memberState 初始状态在 MemberState 创建时可以知道是 CANDIDATE,和 Raft 算法不一样,因为 Raft 算法节点初始状态为 Follower。
现在依次来看不同角色运行的不同逻辑是怎样的。
1)候选人角色:maintainAsCandidate 方法
private void maintainAsCandidate() throws Exception {
//for candidateif (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;}long term; // 投票轮次long ledgerEndTerm; // Leader 节点当前的投票轮次long ledgerEndIndex; // 当前日志的最大序列,即下一条日志的开始 index,日志复制会用到synchronized (memberState) {
if (!memberState.isCandidate()) {
return;}// 上一次的投票结果为 WAIT_TO_VOTE_NEXT || 需要立即开启投票(默认是 false)if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();// 获取下一轮投票的任期编号,也就是第几轮投票了term = memberState.nextTerm();logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;} else {
// 维护好当前的任期编号term = memberState.currTerm();}ledgerEndIndex = memberState.getLedgerEndIndex();ledgerEndTerm = memberState.getLedgerEndTerm();}if (needIncreaseTermImmediately) {
// 如果需要立即开启投票,重新设置下一次投票超时时间// 下一次倒计时:当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) + (1000 - 300)之间的随机值nextTimeToRequestVote = getNextTimeToRequestVote();needIncreaseTermImmediately = false;return;}long startVoteTimeMs = System.currentTimeMillis();final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);final AtomicLong knownMaxTermInGroup = new AtomicLong(-1); // 已知的最大投票轮次final AtomicInteger allNum = new AtomicInteger(0); // 所有投票票数final AtomicInteger validNum = new AtomicInteger(0); // 有效投票数final AtomicInteger acceptedNum = new AtomicInteger(0); // 获得的投票数final AtomicInteger notReadyTermNum = new AtomicInteger(0); // 如果对端节点的投票轮次小于发起投票的轮次,则认为对端未准备好投票final AtomicInteger biggerLedgerNum = new AtomicInteger(0); // 最大发起投票的节点final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false); // 是否已经存在 LeaderCountDownLatch voteLatch = new CountDownLatch(1);// 遍历投票结果,收集投票结果for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
future.whenComplete((VoteResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;}logger.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x));// 统计有效节点的数量if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();}synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
case ACCEPT: // 赞成票acceptedNum.incrementAndGet();break;case REJECT_ALREADY_VOTED: // 拒绝票,原因是已经投了其他节点的票break;case REJECT_ALREADY__HAS_LEADER: // 拒绝票,原因是因为集群中已经存在 Leader alreadyHasLeader.compareAndSet(false, true);break;case REJECT_TERM_SMALL_THAN_LEDGER: // 拒绝票,如果自己维护的 term 小于远端维护的 ledgerEndTermcase REJECT_EXPIRED_VOTE_TERM: // 拒绝票,如果自己维护的 term 小于远端维护的 termif (x.getTerm() > knownMaxTermInGroup.get()) {
knownMaxTermInGroup.set(x.getTerm());}break;case REJECT_EXPIRED_LEDGER_TERM: // 拒绝票,自己维护的 ledgerTerm 小于对端维护的 ledgerTermcase REJECT_SMALL_LEDGER_END_INDEX: // 拒绝票,自己维护的 dedgerEndIndex 小于对端维护的值biggerLedgerNum.incrementAndGet();break;case REJECT_TERM_NOT_READY: // 拒绝票,对端的投票轮次小于自己的 teamnotReadyTermNum.incrementAndGet();break;default:break;}}if (alreadyHasLeader.get()|| memberState.isQuorum(acceptedNum.get())|| memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
voteLatch.countDown();}} catch (Throwable t) {
logger.error("Get error when parsing vote response ", t);} finally {
allNum.incrementAndGet();if (allNum.get() == memberState.peerSize()) {
voteLatch.countDown();}}});}try {
// 等待收集投票结果,并设置超时时间voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);} catch (Throwable ignore) {
}lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);VoteResponse.ParseResult parseResult;// 根据收集的投票结果判断是否能成为 Leaderif (knownMaxTermInGroup.get() > term) {
// 如果对端的投票轮次大于发起投票的节点,则该节点使用对端的轮次,重新进入到 Candidate 状态parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote();changeRoleToCandidate(knownMaxTermInGroup.get());} else if (alreadyHasLeader.get()) {
// 已经存在 Leader,该节点重新进入到 Candidate 状态parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;} else if (!memberState.isQuorum(validNum.get())) {
// 如果收到的有效票数未超过半数parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote = getNextTimeToRequestVote();} else if (memberState.isQuorum(acceptedNum.get())) {
// 如果得到的赞同票超过半数,则成为 LeaderparseResult = VoteResponse.ParseResult.PASSED;} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
// 如果得到的赞成票加上未准备投票的节点数超过半数,则应该立即发起投票parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
// 如果得到的赞成票加上对端维护的 ledgerEndIndex 超过半数parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote = getNextTimeToRequestVote();} else {
// 其他情况,开启下一轮投票parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote();}lastParseResult = parseResult;logger.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);// 如果投票成功if (parseResult == VoteResponse.ParseResult.PASSED) {
logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);// 将节点状态设置为 LeaderchangeRoleToLeader(term);}}
1、比较节点的轮次是否一致,比如 A 节点的轮次处于第二轮,处于第三轮的 B 节点或者第一轮的 C 节点投票给节点 A 都属于拒绝票。
2、比较节点的日志索引大小,比如 A 节点的日志索引为 10,如果日志索引为 20 的 B 节点投票给节点 A 属于拒绝飘。
3、如果得到的赞同票超过半数,则成为 Leader,然后将进入到 maintainAsLeader 方法。
2)领导者角色:maintainAsLeader 方法
private void maintainAsLeader() throws Exception {
// 判断上一次发送心跳的时间与当前时间的差值是否大于心跳包发送间隔,如果超过,则说明需要发送心跳包if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
long term;String leaderId;synchronized (memberState) {
if (!memberState.isLeader()) {
// 如果当前不是 leader 节点,则直接返回,二次判断//stop sendingreturn;}term = memberState.currTerm();leaderId = memberState.getLeaderId();lastSendHeartBeatTime = System.currentTimeMillis();}sendHeartbeats(term, leaderId); // 向集群内的所有节点发送心跳包}
}
3)追随者角色:maintainAsFollower 方法
当 Candidate 状态的节点在收到主节点发送的心跳包后,会将状态变更为 follower。
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
// 如果 maxHeartBeatLeak (默认为3)个心跳包周期内未收到心跳,则将状态变更为 Candidateif (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());changeRoleToCandidate(memberState.currTerm());}}}
}
参考资料
扩展阅读
Raft 算法论文
参考资料
极客时间专栏:分布式技术原理与算法解析
Raft: Consensus made simple?
Raft distributed consensus