精华内容
下载资源
问答
  • raft算法实现
    千次阅读
    2021-07-01 06:43:37

    1. Raft 算法简介

    1.1 Raft 背景

    在分布式系统中,一致性算法至关重要。在所有一致性算法中,Paxos 最负盛名,它由莱斯利·兰伯特(Leslie Lamport)于 1990 年提出,是一种基于消息传递的一致性算法,被认为是类似算法中最有效的。

    Paxos 算法虽然很有效,但复杂的原理使它实现起来非常困难,截止目前,实现 Paxos 算法的开源软件很少,比较出名的有 Chubby、LibPaxos。此外,Zookeeper 采用的 ZAB(Zookeeper Atomic Broadcast)协议也是基于 Paxos 算法实现的,不过 ZAB 对 Paxos 进行了很多改进与优化,两者的设计目标也存在差异——ZAB 协议主要用于构建一个高可用的分布式数据主备系统,而 Paxos 算法则是用于构建一个分布式的一致性状态机系统。

    由于 Paxos 算法过于复杂、实现困难,极大地制约了其应用,而分布式系统领域又亟需一种高效而易于实现的分布式一致性算法,在此背景下,Raft 算法应运而生。

    Raft 算法在斯坦福 Diego Ongaro 和 John Ousterhout 于 2013 年发表的《In Search of an Understandable Consensus Algorithm》中提出。相较于 Paxos,Raft 通过逻辑分离使其更容易理解和实现,目前,已经有十多种语言的 Raft 算法实现框架,较为出名的有 etcd、Consul 。

    1.2 Raft 角色

    根据官方文档解释,一个 Raft 集群包含若干节点,Raft 把这些节点分为三种状态:Leader、 Follower、Candidate,每种状态负责的任务也是不一样的。正常情况下,集群中的节点只存在 Leader 与 Follower 两种状态。

    Leader(领导者):负责日志的同步管理,处理来自客户端的请求,与Follower保持heartBeat的联系;

    Follower(追随者) :响应 Leader 的日志同步请求,响应Candidate的邀票请求,以及把客户端请求到Follower的事务转发(重定向)给Leader;

    Candidate(候选者) :负责选举投票,集群刚启动或者Leader宕机时,状态为Follower的节点将转为Candidate并发起选举,选举胜出(获得超过半数节点的投票)后,从Candidate转为Leader状态。

    1.3 Raft 三个子问题

    通常,Raft 集群中只有一个 Leader,其它节点都是 Follower。Follower 都是被动的,不会发送任何请求,只是简单地响应来自 Leader 或者 Candidate 的请求。Leader 负责处理所有的客户端请求(如果一个客户端和 Follower 联系,那么 Follower 会把请求重定向给 Leader)。

    为简化逻辑和实现,Raft 将一致性问题分解成了三个相对独立的子问题。

    选举(Leader Election) :当 Leader 宕机或者集群初创时,一个新的 Leader 需要被选举出来;

    日志复制(Log Replication) :Leader 接收来自客户端的请求并将其以日志条目的形式复制到集群中的其它节点,并且强制要求其它节点的日志和自己保持一致;

    安全性(Safety) :如果有任何的服务器节点已经应用了一个确定的日志条目到它的状态机中,那么其它服务器节点不能在同一个日志索引位置应用一个不同的指令。

    2. Raft 算法之 Leader Election 原理

    根据 Raft 协议,一个应用 Raft 协议的集群在刚启动时,所有节点的状态都是 Follower。由于没有 Leader,Followers 无法与 Leader 保持心跳(Heart Beat),因此,Followers 会认为 Leader 已经下线,进而转为 Candidate 状态。然后,Candidate 将向集群中其它节点请求投票,同意自己升级为 Leader。如果 Candidate 收到超过半数节点的投票(N/2 + 1),它将获胜成为 Leader。

    第一阶段:所有节点都是 Follower。

    上面提到,一个应用 Raft 协议的集群在刚启动(或 Leader 宕机)时,所有节点的状态都是 Follower,初始 Term(任期)为 0。同时启动选举定时器,每个节点的选举定时器超时时间都在 100~500 毫秒之间且并不一致(避免同时发起选举)。

    所有节点都是 Follower

    第二阶段:Follower 转为 Candidate 并发起投票。

    没有 Leader,Followers 无法与 Leader 保持心跳(Heart Beat),节点启动后在一个选举定时器周期内未收到心跳和投票请求,则状态转为候选者 Candidate 状态,且 Term 自增,并向集群中所有节点发送投票请求并且重置选举定时器。

    注意,由于每个节点的选举定时器超时时间都在 100-500 毫秒之间,且彼此不一样,以避免所有 Follower 同时转为 Candidate 并同时发起投票请求。换言之,最先转为 Candidate 并发起投票请求的节点将具有成为 Leader 的“先发优势”。

    Follower 转为 Candidate 并发起投票

    第三阶段:投票策略。

    节点收到投票请求后会根据以下情况决定是否接受投票请求(每个 follower 刚成为 Candidate 的时候会将票投给自己):

    请求节点的 Term 大于自己的 Term,且自己尚未投票给其它节点,则接受请求,把票投给它;

    请求节点的 Term 小于自己的 Term,且自己尚未投票,则拒绝请求,将票投给自己。

    投票策略

    第四阶段:Candidate 转为 Leader。

    一轮选举过后,正常情况下,会有一个 Candidate 收到超过半数节点(N/2 + 1)的投票,它将胜出并升级为 Leader。然后定时发送心跳给其它的节点,其它节点会转为 Follower 并与 Leader 保持同步,到此,本轮选举结束。

    注意:有可能一轮选举中,没有 Candidate 收到超过半数节点投票,那么将进行下一轮选举。

    Candidate 转为 Leader

    3. Raft 算法之 Log Replication 原理

    在一个 Raft 集群中,只有 Leader 节点能够处理客户端的请求(如果客户端的请求发到了 Follower,Follower 将会把请求重定向到 Leader),客户端的每一个请求都包含一条被复制状态机执行的指令。Leader 把这条指令作为一条新的日志条目(Entry)附加到日志中去,然后并行得将附加条目发送给 Followers,让它们复制这条日志条目。

    当这条日志条目被 Followers 安全复制,Leader 会将这条日志条目应用到它的状态机中,然后把执行的结果返回给客户端。如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断得重复尝试附加日志条目(尽管已经回复了客户端)直到所有的 Follower 都最终存储了所有的日志条目,确保强一致性。

    第一阶段:客户端请求提交到 Leader。

    如下图所示,Leader 收到客户端的请求,比如存储数据 5。Leader 在收到请求后,会将它作为日志条目(Entry)写入本地日志中。需要注意的是,此时该 Entry 的状态是未提交(Uncommitted),Leader 并不会更新本地数据,因此它是不可读的。

    客户端请求提交到 Leader

    第二阶段:Leader 将 Entry 发送到其它 Follower

    Leader 与 Floolwers 之间保持着心跳联系,随心跳 Leader 将追加的 Entry(AppendEntries)并行地发送给其它的 Follower,并让它们复制这条日志条目,这一过程称为复制(Replicate)。

    有几点需要注意:

    1. 为什么 Leader 向 Follower 发送的 Entry 是 AppendEntries 呢?

    因为 Leader 与 Follower 的心跳是周期性的,而一个周期间 Leader 可能接收到多条客户端的请求,因此,随心跳向 Followers 发送的大概率是多个 Entry,即 AppendEntries。当然,在本例中,我们假设只有一条请求,自然也就是一个Entry了。

    2. Leader 向 Followers 发送的不仅仅是追加的 Entry(AppendEntries)。

    在发送追加日志条目的时候,Leader 会把新的日志条目紧接着之前条目的索引位置(prevLogIndex), Leader 任期号(Term)也包含在其中。如果 Follower 在它的日志中找不到包含相同索引位置和任期号的条目,那么它就会拒绝接收新的日志条目,因为出现这种情况说明 Follower 和 Leader 不一致。

    3. 如何解决 Leader 与 Follower 不一致的问题?

    在正常情况下,Leader 和 Follower 的日志保持一致,所以追加日志的一致性检查从来不会失败。然而,Leader 和 Follower 一系列崩溃的情况会使它们的日志处于不一致状态。Follower可能会丢失一些在新的 Leader 中有的日志条目,它也可能拥有一些 Leader 没有的日志条目,或者两者都发生。丢失或者多出日志条目可能会持续多个任期。

    要使 Follower 的日志与 Leader 恢复一致,Leader 必须找到最后两者达成一致的地方(说白了就是回溯,找到两者最近的一致点),然后删除从那个点之后的所有日志条目,发送自己的日志给 Follower。所有的这些操作都在进行附加日志的一致性检查时完成。

    Leader 为每一个 Follower 维护一个 nextIndex,它表示下一个需要发送给 Follower 的日志条目的索引地址。当一个 Leader 刚获得权力的时候,它初始化所有的 nextIndex 值,为自己的最后一条日志的 index 加 1。如果一个 Follower 的日志和 Leader 不一致,那么在下一次附加日志时一致性检查就会失败。在被 Follower 拒绝之后,Leader 就会减小该 Follower 对应的 nextIndex 值并进行重试。最终 nextIndex 会在某个位置使得 Leader 和 Follower 的日志达成一致。当这种情况发生,附加日志就会成功,这时就会把 Follower 冲突的日志条目全部删除并且加上 Leader 的日志。一旦附加日志成功,那么 Follower 的日志就会和 Leader 保持一致,并且在接下来的任期继续保持一致。

    如何解决 Leader 与 Follower 不一致的问题

    第三阶段:Leader 等待 Followers 回应。

    Followers 接收到 Leader 发来的复制请求后,有两种可能的回应:

    写入本地日志中,返回 Success;

    一致性检查失败,拒绝写入,返回 False,原因和解决办法上面已做了详细说明。

    需要注意的是,此时该 Entry 的状态也是未提交(Uncommitted)。完成上述步骤后,Followers 会向 Leader 发出 Success 的回应,当 Leader 收到大多数 Followers 的回应后,会将第一阶段写入的 Entry 标记为提交状态(Committed),并把这条日志条目应用到它的状态机中。

    Leader 等待 Followers 回应

    第四阶段:Leader 回应客户端。

    完成前三个阶段后,Leader会向客户端回应 OK,表示写操作成功。

    Leader 回应客户端

    第五阶段,Leader 通知 Followers Entry 已提交

    Leader 回应客户端后,将随着下一个心跳通知 Followers,Followers 收到通知后也会将 Entry 标记为提交状态。至此,Raft 集群超过半数节点已经达到一致状态,可以确保强一致性。

    需要注意的是,由于网络、性能、故障等各种原因导致“反应慢”、“不一致”等问题的节点,最终也会与 Leader 达成一致。

    Leader 通知 Followers Entry 已提交

    4. Raft 算法之安全性

    前面描述了 Raft 算法是如何选举 Leader 和复制日志的。然而,到目前为止描述的机制并不能充分地保证每一个状态机会按照相同的顺序执行相同的指令。例如,一个 Follower 可能处于不可用状态,同时 Leader 已经提交了若干的日志条目;然后这个 Follower 恢复(尚未与 Leader 达成一致)而 Leader 故障;如果该 Follower 被选举为 Leader 并且覆盖这些日志条目,就会出现问题,即不同的状态机执行不同的指令序列。

    鉴于此,在 Leader 选举的时候需增加一些限制来完善 Raft 算法。这些限制可保证任何的 Leader 对于给定的任期号(Term),都拥有之前任期的所有被提交的日志条目(所谓 Leader 的完整特性)。关于这一选举时的限制,下文将详细说明。

    4.1 选举限制

    在所有基于 Leader 机制的一致性算法中,Leader 都必须存储所有已经提交的日志条目。为了保障这一点,Raft 使用了一种简单而有效的方法,以保证所有之前的任期号中已经提交的日志条目在选举的时候都会出现在新的 Leader 中。换言之,日志条目的传送是单向的,只从 Leader 传给 Follower,并且 Leader 从不会覆盖自身本地日志中已经存在的条目。

    Raft 使用投票的方式来阻止一个 Candidate 赢得选举,除非这个 Candidate 包含了所有已经提交的日志条目。Candidate 为了赢得选举必须联系集群中的大部分节点。这意味着每一个已经提交的日志条目肯定存在于至少一个服务器节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么它一定持有了所有已经提交的日志条目(多数派的思想)。投票请求的限制中请求中包含了 Candidate 的日志信息,然后投票人会拒绝那些日志没有自己新的投票请求。

    Raft 通过比较两份日志中最后一条日志条目的索引值和任期号,确定谁的日志比较新。如果两份日志最后条目的任期号不同,那么任期号大的日志更加新。如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。

    4.2 提交之前任期内的日志条目

    如同 4.1 节介绍的那样,Leader 知道一条当前任期内的日志记录是可以被提交的,只要它被复制到了大多数的 Follower 上(多数派的思想)。如果一个 Leader 在提交日志条目之前崩溃了,继任的 Leader 会继续尝试复制这条日志记录。然而,一个 Leader 并不能断定被保存到大多数 Follower 上的一个之前任期里的日志条目 就一定已经提交了。这很明显,从日志复制的过程可以看出。

    鉴于上述情况,Raft 算法不会通过计算副本数目的方式去提交一个之前任期内的日志条目。只有 Leader 当前任期里的日志条目通过计算副本数目可以被提交;一旦当前任期的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。在某些情况下,Leader 可以安全地知道一个老的日志条目是否已经被提交(只需判断该条目是否存储到所有节点上),但是 Raft 为了简化问题使用了一种更加保守的方法。

    当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的任期号,这在提交规则上产生了额外的复杂性。但是,这种策略更加容易辨别出日志,即使随着时间和日志的变化,日志仍维护着同一个任期编号。此外,该策略使得新 Leader 只需要发送较少日志条目。

    5、后记

    这篇文章我也忘了在哪抄的了,因为写的还不错,所以想粘贴到博客中自己看,侵删。还有一点,这篇博客或者 Raft 论文其实没有描述太多的技术细节,只是做了一些概念的描述,具体细化的东西还得看 Raft 工程实现,并不是说工程实现跟 Raft 有些不一致就说谁错了,具体的工程实现各不相同,掌握核心比较重要。

    链接:https://www.jianshu.com/p/6d9017289cd5

    更多相关内容
  • 一、Raft概念copy一下其他小伙伴写的文章: Raft算法详解不同于Paxos算法直接从分布式一致性问题出发推导出来,Raft算法则是从多副本状态机的角度提出,用于管理多副本状态机的日志复制。Raft实现了和Paxos相同的...

    一、Raft概念

    copy一下其他小伙伴写的文章: Raft算法详解

    不同于Paxos算法直接从分布式一致性问题出发推导出来,Raft算法则是从多副本状态机的角度提出,用于管理多副本状态机的日志复制。Raft实现了和Paxos相同的功能,它将一致性分解为多个子问题:Leader选举(Leader election)、日志同步(Log replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等。同时,Raft算法使用了更强的假设来减少了需要考虑的状态,使之变的易于理解和实现。

    Raft将系统中的角色分为领导者(Leader)、跟从者(Follower)和候选人(Candidate):

    Leader:接受客户端请求,并向Follower同步请求日志,当日志同步到大多数节点上后告诉Follower提交日志。

    Follower:接受并持久化Leader同步的日志,在Leader告之日志可以提交之后,提交日志。

    Candidate:Leader选举过程中的临时角色。

    c1ebea9add0d69952422f5e8cce5871a.png

    本文不过多赘述 raft 算法是个什么东西... 这里再贴一个十分好理解的文章:The Raft Consensus Algorithm

    二、系统初步设计

    在对raft有一定理解后,我们简单梳理一下在raft选举过程中,我们需要的一些角色,以及角色的司职。

    首先我们需要一个选举控制类,单例实现即可,节点的选举全权交给此选举控制类的实现,我们称其为 ElectOperator。

    先讲一个 raft 中重要的概念:世代,也称为 epoch,但在这篇文章,将其称为 generation(不要纠结这个 = =)。 世代可以认为是一个标记当前发送的操作是否有效的标识,如果收到了小于本节点世代的请求,则可无视其内容,如果收到了大于本世代的请求,则需要更新本节点世代,并重置自己的身份,变为 Follower,类似于乐观锁的设计理念。

    我们知道,raft中一共有三种角色:Follower、Candidate、Leader

    (1)Follower

    Follower 需要做什么呢:

    接收心跳

    Follower 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate

    接收拉票请求,并返回自己的投票

    好的,Follower非常简单,只需要做三件事即可。

    (2)Candidate

    Candidate 扮演什么样的职能呢:

    接收心跳

    Candidate 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate

    接收拉票请求,并返回自己的投票

    向集群中的其他节点发起拉票请求

    当收到的投票大于半数( n/2 + 1, n为集群内的节点数量),转变为 Leader

    Candidate 比起 Follower 稍微复杂一些,但前三件事情都是一样的。

    (3)Leader

    Leader 在选举过程中扮演的角色最为简单:

    接收心跳

    向集群内所有节点发送心跳

    Leader 也是可以接收心跳的,当收到大于当前世代的心跳或请求后,Leader 需要转变为 Follower。Leader 不可能收到同世代的心跳请求,因为 (1) 在 raft 算法中,同一世代中,节点仅对同一个节点进行投票。(2) 需要收到过半投票才可以转变为 Leader。

    三、系统初步实现

    简单贴一下选举控制器需要的一些属性代码,下面的注释都说的很清楚了,其中需要补充的一点是定时任务使用了时间轮来实现,不理解没有关系...就是个定时任务,定时任务的一个引用放在 Map taskMap; 中,便于取消任务。

    public class ElectOperator extends ReentrantLocker implements Runnable {

    // 成为 Candidate 的退避时间(真实退避时间需要 randomized to be between 150ms and 300ms )

    private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs();

    // 心跳间隔

    private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs();

    /**

    * 该投票箱的世代信息,如果一直进行选举,一直能达到 {@link #ELECTION_TIMEOUT_MS},而选不出 Leader ,也需要15年,generation才会不够用,如果

    * generation 的初始值设置为 Long.Min (现在是0,则可以撑30年,所以完全呆胶布)

    */

    private long generation;

    /**

    * 当前节点的角色

    */

    private NodeRole nodeRole;

    /**

    * 所有正在跑的定时任务

    */

    private Map taskMap;

    /**

    * 投票箱

    */

    private Map box;

    /**

    * 投票给了谁的投票记录

    */

    private Votes voteRecord;

    /**

    * 缓存一份集群信息,因为集群信息是可能变化的,我们要保证在一次选举中,集群信息是不变的

    */

    private List clusters;

    /**

    * 心跳内容

    */

    private HeartBeat heartBeat;

    /**

    * 现在集群的leader是哪个节点

    */

    private String leaderServerName;

    private volatile static ElectOperator INSTANCE;

    public static ElectOperator getInstance() {

    if (INSTANCE == null) {

    synchronized (ElectOperator.class) {

    if (INSTANCE == null) {

    INSTANCE = new ElectOperator();

    ElectControllerPool.execute(INSTANCE);

    }

    }

    }

    return INSTANCE;

    }

    另外,上面罗列的这些值大都是需要在更新世代时重置的,我们先拟定一下更新世代的逻辑,通用的来讲,就是清除投票记录,清除自己的投票箱,更新自己的世代,身份变更为 Follower 等等,我们将这个方法称为 init。

    /**

    * 初始化

    *

    * 1、成为follower

    * 2、先取消所有的定时任务

    * 3、重置本地变量

    * 4、新增成为Candidate的定时任务

    */

    private boolean init(long generation, String reason) {

    return this.lockSupplier(() -> {

    if (generation > this.generation) {// 如果有选票的世代已经大于当前世代,那么重置投票箱

    logger.debug("初始化投票箱,原因:{}", reason);

    // 1、成为follower

    this.becomeFollower();

    // 2、先取消所有的定时任务

    this.cancelAllTask();

    // 3、重置本地变量

    logger.debug("更新世代:旧世代 {} => 新世代 {}", this.generation, generation);

    this.generation = generation;

    this.voteRecord = null;

    this.box = new HashMap<>();

    this.leaderServerName = null;

    // 4、新增成为Candidate的定时任务

    this.becomeCandidateAndBeginElectTask(this.generation);

    return true;

    } else {

    return false;

    }

    });

    }

    (1) Follower的实现

    基于上面的分析,我们可以归纳一下 Follower 需要一些什么样的方法:

    1、转变为 Candidate 的定时任务

    实际上就是 ELECTION_TIMEOUT_MS (randomized to be between 150ms and 300ms) 后,如果没收到 Leader 的心跳,或者自己变为 Candidate 后,在这个时间内没有成功上位,则继续转变为 Candidate。

    为什么我们成为 Candidate 的退避时间需要随机 150ms - 300ms呢?这是为了避免所有节点的选举发起发生碰撞,如果说都是相同的退避时间,每个节点又会优先投自己一票,那么这个集群系统就会陷入无限发起投票,但又无法成为 Leader 的局面。

    简而言之就是我们需要提供一个可刷新的定时任务,如果在一定时间内没刷新这个任务,则节点转变为 Candidate,并发起选举,代码如下。首先取消之前的 becomeCandidate 定时定时任务,然后设定在 electionTimeout 后调用 beginElect(generation) 方法。

    /**

    * 成为候选者的任务,(重复调用则会取消之前的任务,收到来自leader的心跳包,就可以重置一下这个任务)

    *

    * 没加锁,因为这个任务需要频繁被调用,只要收到leader来的消息就可以调用一下

    */

    private void becomeCandidateAndBeginElectTask(long generation) {

    this.lockSupplier(() -> {

    this.cancelCandidateAndBeginElectTask("正在重置发起下一轮选举的退避时间");

    // The election timeout is randomized to be between 150ms and 300ms.

    long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat());

    TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));

    Timer.getInstance()

    .addTask(timedTask);

    taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask);

    return null;

    });

    }

    2、接收心跳与心跳回复

    接收心跳十分简单,如果当前心跳大于等于当前世代,且还未认定某个节点为 Leader,则取消所有定时任务,成为Follower,并记录心跳包中 Leader 节点的信息,最后重置一下成为候选者的任务。

    如果已经成为某个 Leader 的 Follower,则直接成为候选者的任务即可。

    另外一个要注意的是,needToSendHeartBeatInfection,是否需要发送心跳感染包,当收到低世代 Leader 的心跳时,如果当前集群已经选出 Leader ,则回复此心跳包,告诉旧 Leader,现在已经是新世代了!(代码中没有展现,其实就是再次封装一个心跳包,带上世代信息和 Leader 节点信息,回复给 Leader 即可)

    public void receiveHeatBeat(String leaderServerName, long generation, String msg) {

    return this.lockSupplier(() -> {

    boolean needToSendHeartBeatInfection = true;

    // 世代大于当前世代

    if (generation >= this.generation) {

    needToSendHeartBeatInfection = false;

    if (this.leaderServerName == null) {

    logger.info("集群中,节点 {} 已经成功在世代 {} 上位成为 Leader,本节点将成为 Follower,直到与 Leader 的网络通讯出现问题", leaderServerName, generation);

    // 取消所有任务

    this.cancelAllTask();

    // 成为follower

    this.becomeFollower();

    // 将那个节点设为leader节点

    this.leaderServerName = leaderServerName;

    }

    // 重置成为候选者任务

    this.becomeCandidateAndBeginElectTask(this.generation);

    }

    return null;

    });

    }

    3、接收拉票请求与回复投票

    我们知道,raft 在一个世代只能投票给一个节点,且发起投票者会首先投票给自己。所以逻辑就很简单了,只有当世代大于等于当前,且还未投票时,则拉票请求成功,返回true即可,否则都视为失败,返回false。

    /**

    * 某个节点来请求本节点给他投票了,只有当世代大于当前世代,才有投票一说,其他情况都是失败的

    *

    * 返回结果

    *

    * 为true代表接受投票成功。

    * 为false代表已经给其他节点投过票了,

    */

    public VotesResponse receiveVotes(Votes votes) {

    return this.lockSupplier(() -> {

    logger.debug("收到节点 {} 的投票请求,其世代为 {}", votes.getServerName(), votes.getGeneration());

    String cause = "";

    if (votes.getGeneration() < this.generation) {

    cause = String.format("投票请求 %s 世代小于当前世代 %s", votes.getGeneration(), this.generation);

    } else if (this.voteRecord != null) {

    cause = String.format("在世代 %s,本节点已投票给 => %s 节点", this.generation, this.voteRecord.getServerName());

    } else {

    this.voteRecord = votes; // 代表投票成功了

    }

    boolean result = votes.equals(this.voteRecord);

    if (result) {

    logger.debug("投票记录更新成功:在世代 {},本节点投票给 => {} 节点", this.generation, this.voteRecord.getServerName());

    } else {

    logger.debug("投票记录更新失败:原因:{}", cause);

    }

    String serverName = InetSocketAddressConfigHelper.getServerName();

    return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration());

    });

    }

    (2) Candidate的实现

    可以看出 Follower 十分简单, Candidate 在 Follower 的基础上增加了发起选举的拉票请求,与接收投票,并上位成为Leader两个功能,实际上也十分简单。

    1、发起拉票请求

    回顾一下前面的转变成 Candidate 的定时任务,定时任务实际上就是调用一个方法

    TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));

    这个 beginElect 就是转变为 Candidate 并发起选举的实现。让我们先想想需要做什么,首先肯定是

    更新一下自己的世代,因为已经长时间没收到 Leader 的心跳包了,我们需要自立门户。

    给自己投一票

    要求其他节点给自己投票

    分析到这里就很明了了。下面首先执行 updateGeneration 方法,实际上就是执行前面所说的 init 方法,传入 generation + 1 的世代,重置一下上个世代各种保存的状态;然后调用 becomeCandidate,实际上就是切换一下身份,将 Follower 或者 Candidate 切换为 Candidate;给自己的 voteRecord 投一票,最后带上自己的节点标识和世代信息,去拉票。

    /**

    * 开始进行选举

    *

    * 1、首先更新一下世代信息,重置投票箱和投票记录

    * 2、成为候选者

    * 3、给自己投一票

    * 4、请求其他节点,要求其他节点给自己投票

    */

    private void beginElect(long generation) {

    this.lockSupplier(() -> {

    if (this.generation != generation) {// 存在这么一种情况,虽然取消了选举任务,但是选举任务还是被执行了,所以这里要多做一重处理,避免上个周期的任务被执行

    return null;

    }

    logger.info("Election Timeout 到期,可能期间内未收到来自 Leader 的心跳包或上一轮选举没有在期间内选出 Leader,故本节点即将发起选举");

    updateGeneration("本节点发起了选举");// this.generation ++

    // 成为候选者

    logger.info("本节点正式开始世代 {} 的选举", this.generation);

    if (this.becomeCandidate()) {

    VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation);

    // 给自己投票箱投票

    this.receiveVotesResponse(votes);

    // 记录一下,自己给自己投了票

    this.voteRecord = votes;

    // 让其他节点给自己投一票

    this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0);

    }

    return null;

    });

    }

    2、接收投票,并成为 Leader

    如果说在 150ms and 300ms 之间,本节点收到了过半投票,则可上位成 Leader,否则定时任务会再次调用 beginElect,再次更新本节点世代,然后发起新一轮选举。

    接收投票其实十分简单,回忆一下前面接收拉票请求与回复投票,实际上就是拉票成功,就返回true,否则返回flase。

    我们每次都判断一下是否拿到过半的票数,如果拿到,则成为 Leader,另外有一个值得注意的是,为了加快集群恢复可用的进程,类似于心跳感染(如果心跳发到Leader那里去了,Leader会告诉本节点,它才是真正的Leader),投票也存在投票感染,下面的代码由 votesResponse.isFromLeaderNode() 来表示。

    投票的记录也是十分简单,就是把每个投票记录扔到 Map box; 里,true 表示同意投给本节点,flase 则不同意,如果同意达到半数以上,则调用 becomeLeader 成为本世代 Leader。

    /**

    * 给当前节点的投票箱投票

    */

    public void receiveVotesResponse(VotesResponse votesResponse) {

    this.lockSupplier(() -> {

    if (votesResponse.isFromLeaderNode()) {

    logger.info("来自节点 {} 的投票应答表明其身份为 Leader,本轮拉票结束。", votesResponse.getServerName());

    this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(),

    String.format("收到来自 Leader 节点的投票应答,自动将其视为来自 Leader %s 世代 %s 节点的心跳包", heartBeat.getServerName(), votesResponse.getGeneration()));

    }

    if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果选票的世代小于当前世代,投票无效

    logger.info("来自节点 {} 的投票应答世代是以前世代 {} 的选票,选票无效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration());

    return null;

    }

    if (votesResponse.isAgreed()) {

    if (!voteSelf) {

    logger.info("来自节点 {} 的投票应答有效,投票箱 + 1", votesResponse.getServerName());

    }

    // 记录一下投票结果

    box.put(votesResponse.getServerName(), votesResponse.isAgreed());

    List hanabiNodeList = this.clusters;

    int clusterSize = hanabiNodeList.size();

    int votesNeed = clusterSize / 2 + 1;

    long voteCount = box.values()

    .stream()

    .filter(aBoolean -> aBoolean)

    .count();

    logger.info("集群中共 {} 个节点,本节点当前投票箱进度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed);

    // 如果获得的选票已经大于了集群数量的一半以上,则成为leader

    if (voteCount == votesNeed) {

    logger.info("选票过半,准备上位成为 leader 节点", votesResponse.getServerName());

    this.becomeLeader();

    }

    } else {

    logger.info("节点 {} 在世代 {} 的投票应答为:拒绝给本节点在世代 {} 的选举投票(当前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation);

    // 记录一下投票结果

    box.put(votesResponse.getServerName(), votesResponse.isAgreed());

    }

    return null;

    });

    }

    (3) Leader 的实现

    作为 Leader,在 raft 中的实现却是最简单的,我们只需要给子节点发心跳包即可。然后如果收到大于自己世代的心跳感染,则成为新世代的 Follower,接收心跳的逻辑和 Follower 没有区别。

    /**

    * 当选票大于一半以上时调用这个方法,如何去成为一个leader

    */

    private void becomeLeader() {

    this.lockSupplier(() -> {

    long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime;

    this.beginElectTime = 0L;

    logger.info("本节点 {} 在世代 {} 角色由 {} 变更为 {} 选举耗时 {} ms,并开始向其他节点发送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(),

    becomeLeaderCostTime);

    this.nodeRole = NodeRole.Leader;

    this.cancelAllTask();

    this.heartBeatTask();

    this.leaderServerName = InetSocketAddressConfigHelper.getServerName();

    return null;

    });

    }

    四、运行我们的 raft!

    看到这里,不用怀疑.. 一个 raft 算法已经实现了。至于一些细枝末节的东西,我相信大家都能处理好的.. 比如如何给其他节点发送各种包,包怎么去定义之类的,都和 raft 本身没什么关系。

    一般来说,在集群可用后,我们就可以让 Follower 连接 Leader 的业务端口,开始真正的业务了。 raft作为一个能快速选主的分布式算法,一次选主基本只需要一次 RTT(Round-Trip Time)时间即可,非常迅速。

    运行一下我们的项目,简单测试,我们只用三台机子,想测试多台机子可以自己去玩玩...我们可以看到就像 zookeeper,我们需要配置两个端口,前一个作为选举端口,后一个则作为业务端口。

    本文章只讲了怎么选举,后面的端口可以无视,但是必填...

    e357419fc74025e50e5dd846b6a65799.png

    依次启动 hanabi.1,hanabi.2,hanabi.3

    很快,我们就能看到 hanabi.1 成为了世代28的 Leader,第一次选举耗时久是因为启动的时候有各种初始化 = =

    7af8aac2226e3537314f6eb495a5790b.png

    此时,我们关闭 hanabi.1,因为集群还有2台机器,它们之间完全可以选出新的 Leader,我们关闭 hanabi.1 试试。观察 hanabi.3,我们发现,很快,hanabi.3 就发现 Leader 已经挂掉,并发起了世代 29 的选举。

    在世代29中,仅存的 hanabi.2 拒绝为本节点投票,所以在 ELECTION_TIMEOUT_MS 到期后,hanabi.3 再次发起了选举,此次选举成功,因为 hanabi.2 还未到达 ELECTION_TIMEOUT_MS,所以还在世代 28,收到了世代 29 的拉票请求后,hanabi.2 节点将自己的票投给了 hanabi.3,hanabi.3 成功上位。

    eac80c069359098d79e4b4ab45aa57fe.png

    本项目github地址 : 基于raft算法实现的分布式kv存储框架 (项目实际上还有日志写入,日志提交,日志同步等功能,直接无视它...还没写完 = =)

    展开全文
  • Raft 共识算法的 Python 实现
  • raft算法简单实现-java

    2018-05-18 14:08:47
    raft 是一种类似于 paoxs 的分布式算法,相对于 paxos 算法raft 更容易于理解以及实现,这也是一种典型的 半数协议算法 。这里不详细介绍 raft 算法
  • Raft算法详解

    万次阅读 多人点赞 2021-05-30 20:09:40
    Raft算法属于Multi-Paxos算法,它是在Multi-Paxos思想的基础上,做了一些简化和限制,比如增加了日志必须是连续的,只支持领导者、跟随者和候选人三种状态,在理解和算法实现上都相对容易许多 从本质上说,Raft算法...

    Raft算法属于Multi-Paxos算法,它是在Multi-Paxos思想的基础上,做了一些简化和限制,比如增加了日志必须是连续的,只支持领导者、跟随者和候选人三种状态,在理解和算法实现上都相对容易许多

    从本质上说,Raft算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致

    1、领导者选举

    1)、成员身份

    Raft算法支持领导者(Leader)、跟随者(Follower)和候选人(Candidate)3种状态:

    • 跟随者:接收和处理来自领导者的消息,当等待领导者心跳信息超时的时候,就主动站出来,推荐自己当候选人
    • 候选人:候选人将向其他节点发送请求投票(RequestVote)RPC消息,通知其他节点来投票,如果赢得了大多数选票,就晋升当领导者
    • 领导者:负责处理写请求、管理日志复制和不断地发送心跳信息,通知其他节点“我是领导者,我还活着,你们现在不要发起新的选举,找个新领导者来替代我”

    Raft算法是强领导者模型,集群中只能有一个领导者

    2)、选举领导者的过程

    在初始状态下,集群中所有的节点都是跟随者状态

    Raft算法实现了随机超时时间的特性,每个节点等待领导者心跳信息的超时时间间隔是随机的。上图中,集群中没有领导者,而节点A的等待超时时间最小,它会最先因为没有等到领导者的心跳信息,发生超时

    这时,节点A增加自己的任期编号,并推举自己为候选人,先给自己投上一张选票,然后向其他节点发送请求投票RPC消息,请它们选举自己为领导者

    如果其他节点接收到候选人A的请求投票RPC消息,在编号为1的这届任期内,也还没有进行过投票,那么它将把选票投给节点A,并增加自己的任期编号

    如果候选人在选举超时时间内赢得了大多数的选票,那么它就会成为本届任期内新的领导者

    节点A当选领导者后,它将周期性地发送心跳消息,通知其他服务器我是领导者,阻止跟随者发起新的选举

    3)、节点间如何通讯?

    在Raft算法中,服务器节点间的沟通联络采用的是远程过程调用(RPC),在领导者选举中,需要用到这两类的RPC:

    • 请求投票(RequestVote)RPC:是由候选人在选举期间发起,通知各节点进行投票
    • 日志复制(AppendEntries)RPC:是由领导者发起,用来复制日志和提供心跳消息

    4)、什么是任期?

    Raft算法中每个任期由单调递增的数字(任期编号)标识,任期编号是随着选举的举行而变化的

    1. 跟随者在等待领导者心跳信息超时后,推举自己为候选人时,会增加自己的任期编号,比如节点A的任期编号为0,那么在推举自己为候选人时,会将自己的任期编号增加为1
    2. 如果一个服务器节点,发现自己的任期编号比其他节点小,那么它会更新自己的任期编号到较大的编号值,比如节点B的任期编号是0,当收到来自节点A的请求投票RPC消息时,因为消息中包含了节点A的任期编号,且编号为1,那么节点B将把自己的任期编号更新为1
    3. 如果一个候选人或者领导者,发现自己的任期编号比其他节点小,那么它会立即恢复成跟随者状态。比如分区错误恢复后,任期编号为3的领导者节点B,收到来自新领导者的包含任期编号为4的心跳消息,那么节点B将立即恢复成跟随者状态
    4. 如果一个节点接收到一个包含较小的任期编号值的请求,那么它会直接拒绝这个请求。比如节点C的任期编号为4,收到包含任期编号为3的请求投票RPC消息,那么它将拒绝这个消息

    5)、选举有哪些规则?

    1. 领导者周期性地向所有跟随者发送心跳消息(即不包含日志项的日志复制RPC消息),通知大家我是领导者,组织跟随者发起新的选举

    2. 如果在指定时间内,跟随者没有接收到来自领导者的消息,那么它就认为当前没有领导者,推举自己为候选人,发起领导者选举

    3. 在一次选举中,赢得大多数选票的候选人,将晋升为领导者

    4. 在一个任期内,领导者一直都会是领导者,直到它自身出现问题(比如宕机),或者因为网络延迟,其他节点发起一轮新的选举

    5. 在一次选举中,每一个服务器节点最多会对一个任期编号投出一张选票,并且按照先来先服务的原则进行投票。比如节点C的任期编号为3,先收到了一个包含任期编号为4的投票请求(来自节点A),然后又收到了一个包含任期编号为4的投票请求(来自节点B)。那么节点C将会把唯一一张选票投给节点A,当再收到节点B的投票请求RPC消息时,对于编号为4的任期,已没有选票可投了

    1. 日志完整性高的跟随者(也就是最后一条日志项对应的任期编号值更大,索引号更大)拒绝投票给日志完整性低的候选人。比如节点B的任期编号为3,节点C的任期编号为4,节点B的最后一条日志项对应的任期编号为3,而节点C为2,那么当节点C请求节点B投票给自己时,节点B将拒绝投票

    在这里插入图片描述

    选举是跟随者发起的,推举自己为候选人;大多数选票是指集群成员半数以上的选票;大多数选票规则的目标,是为了保证在一个给定的任期内最多只有一个领导者

    6)、随机超时时间是什么?

    Raft算法使用随机选举超时时间的方法,把超时时间都分散开来,在大多数情况下只有一个服务器节点先发起选举,而不是同时发起选举,这样就能减少因选票瓜分导致选举失败的情况

    在Raft算法中,随机超时时间有2种含义:

    1. 跟随者等待领导者心跳信息超时的时间间隔是随机的
    2. 如果候选人在一个随机时间间隔内,没有赢得过半票数,那么选举就无效了,然后候选人发起新一轮的选举,也就是说,等待选举超时的时间间隔是随机的

    7)、补充

    1)Raft算法的强领导者模型选举限制和局限如下:

    1. 读写请求和数据转发压力落在领导者节点,相当于单机,性能和吞吐量也会受到限制
    2. 大规模跟随者的集群,领导者需要承担大量元数据维护和心跳通知的成本
    3. 领导者单点问题,故障后直到新领导者选举出来期间集群不可用
    4. 随着候选人规模增长,收集半数以上投票的成本更大

    2)强领导者模型会限制集群的写性能,有什么办法能突破Raft集群的写性能瓶颈呢?

    参考Kafka的分区和ES的主分片副本分片这种机制,虽然写入只能通过Leader写,但每个Leader可以负责不同的片区,来提高写入的性能

    2、日志复制

    1)、如何理解日志?

    副本数据是以日志的形式存在的,日志是由日志项组成,日志项是一种数据格式,它主要包含用户指定的数据,也就是指令(Command),还包含一些附加信息,比如索引值(Log index)、任期编号(Term)

    • 指令:一条由客户端请求指定的、状态机需要执行的指令,可以理解成客户端指定的数据
    • 索引值:日志项对应的整数索引值,用来标识日志项的,是一个连续的、单调递增的证书号码
    • 任期编号:创建这条日志项的领导者的任期编号

    2)、如何复制日志?

    首先,领导者通过日志复制(AppendEntries)RPC消息,将日志项复制到集群其他节点上

    接着,如果领导者接收到大多数的复制成功响应后,它将日志项应用到它的状态机,并返回成功给客户端。如果领导者没有接收到大多数的复制成功响应,那么就返回错误给客户端

    领导者将日志项应用到它的状态机,怎么没通知跟随者应用日志项呢?

    因为领导者的日志复制RPC消息或心跳消息,包含了当前最大的、将会被提交的日志项索引值。所以通过日志复制RPC消息或心跳消息,跟随者就可以知道领导者的日志提交位置信息

    1. 接收到客户端请求后,领导者基于客户端请求中的指令,创建一个新日志项,并附加到本地日志中
    2. 领导者通过日志复制RPC,将新的日志复制到其他的服务器
    3. 当领导者将日志项成功复制到大多数的服务器上的时候,领导者会将这条日志项应用到它的状态机中
    4. 领导者将执行的结果返回给客户端
    5. 当跟随者接收到心跳消息,或者新的日志复制RPC消息后,如果跟随者发现领导者已经提交了某条日志项,而它还没应用,那么跟随者就将这条日志项应用到本地的状态机上

    3)、如何实现日志的一致?

    在Raft算法中,领导者通过强制跟随者直接复制自己的日志项,处理不一致日志。也就是说,Raft是通过以领导者的日志为准,来实现各节点日志的一致性的

    1. 首先,领导者通过日志复制RPC的一致性检查,找到跟随者节点上与自己相同日志项的最大索引值。也就是说,这个索引值之前的日志,领导者和跟随者是一致的,之后的日志是不一致的
    2. 然后,领导者强制跟随者更新覆盖不一致的日志项,实现日志的一致

    引入2个新变量:

    • PrevLogEntry:表示当前要复制的日志项,前面一条日志项的索引值。比如下图中,如果领导者将索引值为8的日志项发送给跟随者,那么此时PrevLogEntry值为7
    • PrevLogTerm:表示当前要复制的日志项,前面一条日志项的任期编号,比如在图中,如果领导者将索引值为8的日志项发送给跟随者,那么此时PrevLogTerm值为4
    1. 领导者通过日志复制RPC消息,发送当前最新日志项到跟随者,这个消息的PrevLogEntry值为7、PrevLogTerm值为4
    2. 如果跟随者在它的日志中,找不到PrevLogEntry值为7、PrevLogTerm值为4的日志项,也就是说它的日志和领导者的不一致了,那么跟随者就会拒绝接收新的日志项,并返回失败消息给领导者
    3. 这时,领导者会递减要复制的日志项的索引值,并发送新的日志项到跟随者,这个消息的PrevLogEntry值为6、PrevLogTerm值为3
    4. 如果跟随者在它的日志中,找到了PrevLogEntry值为6、PrevLogTerm值为3的日志项,那么日志复制RPC返回成功,这样一来,领导者就知道在PrevLogEntry值为6、PrevLogTerm值为3的位置,跟随者的日志项与自己相同
    5. 领导者通过日志复制RPC复制并更新覆盖该索引值之后的日志项(也就是不一致的日志项),最终实现了集群各节点日志的一致

    领导者通过日志复制RPC一致性检查,找到跟随者节点上与自己相同日志项的最大索引值,然后复制并更新覆盖该索引值之后的日志项,实现了各节点日志的一致。跟随者中的不一致日志项会被领导者的日志覆盖,而且领导者从来不会覆盖或者删除自己的日志

    4)、补充

    1)领导者接收到大多数的“复制成功”响应后,就会将日志应用到它自己的状态机,然后返回“成功”响应客户端。如果此时有个节点不在“大多数”中,也就是说它接收日志项失败,那么在这种情况下,Raft会如何处理实现日志的一致呢?

    处理日志项一致通过RPC一致性检查,找到跟随者中与自己相同日志项的最大索引,然后把后面的日志项同步过去,让跟随者复制更新

    2)Raft在处理日志不一致时会给跟随者发送RPC一致性检查,找到和自己相同日志项的最大值,这里是对每个跟随者而言的还是所有的跟随者而言的?

    日志复制信息对每个跟随者都要单独维护的

    参考:

    07 | Raft算法(一):如何选举领导者?

    08 | Raft算法(二):如何复制日志?

    展开全文
  • Raft Consensus 协议的 C 实现,BSD 许可它是如何工作的? 有关完整文档,请参阅 raft.h。 网络超出了该项目的范围。 实现者需要做所有的管道。 目前,这是通过以下方式完成的: 实现 raft_cbs_t 中的所有回调; 和...
  • 关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色 这里我们说下阿里开源的sofa-jraft...

    关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

    这里我们说下阿里开源的sofa-jraft的实现。
    首先说明下,在sofa-jraft有几个比较重要的角色

    • Node 代表的就是一个服务节点
    • Ballot 代表的是一次投票的相关信息
    • PeerId 代表的是一个复制组里面的一个参与角色
    • StateMachine 当数据提交到Node之后,会执行其onApply方法

    另外Node中有几个比较重要的定时器:

    • electionTimer 选举定时器,如果当前leader挂了,会进行preVote
    • voteTimer 投票定时器,当投票超时后,会进行preVote
    • stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp

    选主投票

    JRaft的选举投票有两个步骤preVotevote,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
    preVote则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。

    JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init的时候,会开启electionTimer:

    this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
                TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {
                protected void onTrigger() {
                    handleElectionTimeout();
                }
                protected int adjustTimeout(final int timeoutMs) {
                    return randomTimeout(timeoutMs);
                }
            };
      private void handleElectionTimeout() {
            boolean doUnlock = true;
            this.writeLock.lock();
            try {
                if (this.state != State.STATE_FOLLOWER) {
                    return;
                }
                if (isCurrentLeaderValid()) {
                    return;
                }
                resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
                    this.leaderId));
    
                // Judge whether to launch a election.
                if (!allowLaunchElection()) {
                    return;
                }
    
                doUnlock = false;
                preVote();
    
            } finally {
                if (doUnlock) {
                    this.writeLock.unlock();
                }
            }
        }
    

    handleElectionTimeout中主要就是进行了preVote操作,这里JRaft一次投票的主要几个操作如下:

    preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest

    我们首先看下preVote:

    private void preVote() {
         .....
            final LogId lastLogId = this.logManager.getLastLogId(true);
            boolean doUnlock = true;
            this.writeLock.lock();
            try {
                // pre_vote need defense ABA after unlock&writeLock
                if (oldTerm != this.currTerm) {
                    LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
                    return;
                }
                this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
                for (final PeerId peer : this.conf.listPeers()) {
                    if (peer.equals(this.serverId)) {
                        continue;
                    }
                    if (!this.rpcService.connect(peer.getEndpoint())) {
                        LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                        continue;
                    }
                    final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
                    done.request = RequestVoteRequest.newBuilder() //
                        .setPreVote(true) // it's a pre-vote request.
                        .setGroupId(this.groupId) //
                        .setServerId(this.serverId.toString()) //
                        .setPeerId(peer.toString()) //
                        .setTerm(this.currTerm + 1) // next term
                        .setLastLogIndex(lastLogId.getIndex()) //
                        .setLastLogTerm(lastLogId.getTerm()) //
                        .build();
                    this.rpcService.preVote(peer.getEndpoint(), done.request, done);
                }
                this.prevVoteCtx.grant(this.serverId);
                if (this.prevVoteCtx.isGranted()) {
                    doUnlock = false;
                    electSelf();
                }
            } finally {
                if (doUnlock) {
                    this.writeLock.unlock();
                }
            }
        }
    

    可以看到preVote中会对当前出自己以外的节点发送RequestVoteRequest请求,主要设置信息如下:

    RequestVoteRequest.newBuilder() //
                        .setPreVote(true) // it's a pre-vote request.
                        .setGroupId(this.groupId) //
                        .setServerId(this.serverId.toString()) //
                        .setPeerId(peer.toString()) //
                        .setTerm(this.currTerm + 1) // next term
                        .setLastLogIndex(lastLogId.getIndex()) //
                        .setLastLogTerm(lastLogId.getTerm()) //
                        .build();
    

    可以看到,这时候并没有将自己的currTerm设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++

    我们看下其他节点收到这个请求是怎么处理的:

    public Message handlePreVoteRequest(final RequestVoteRequest request) {
            boolean doUnlock = true;
            this.writeLock.lock();
            try {
                if (!this.state.isActive()) {
                    LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
                    return RpcFactoryHelper //
                        .responseFactory() //
                        .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                            "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
                }
                final PeerId candidateId = new PeerId();
                if (!candidateId.parse(request.getServerId())) {
                    LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
                        request.getServerId());
                    return RpcFactoryHelper //
                        .responseFactory() //
                        .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                            "Parse candidateId failed: %s.", request.getServerId());
                }
                boolean granted = false;
                // noinspection ConstantConditions
                do {
                    if (!this.conf.contains(candidateId)) {
                        LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(),
                            request.getServerId(), this.conf);
                        break;
                    }
                    if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
                        LOG.info(
                            "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
                            getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
                        break;
                    }
                    if (request.getTerm() < this.currTerm) {
                        LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                            request.getServerId(), request.getTerm(), this.currTerm);
                        // A follower replicator may not be started when this node become leader, so we must check it.
                        checkReplicator(candidateId);
                        break;
                    }
                    // A follower replicator may not be started when this node become leader, so we must check it.
                    // check replicator state
                    checkReplicator(candidateId);
    
                    doUnlock = false;
                    this.writeLock.unlock();
    
                    final LogId lastLogId = this.logManager.getLastLogId(true);
    
                    doUnlock = true;
                    this.writeLock.lock();
                    final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
                    granted = requestLastLogId.compareTo(lastLogId) >= 0;
    
                    LOG.info(
                        "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
                        getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
                        lastLogId);
                } while (false);
    
                return RequestVoteResponse.newBuilder() //
                    .setTerm(this.currTerm) //
                    .setGranted(granted) //
                    .build();
            } finally {
                if (doUnlock) {
                    this.writeLock.unlock();
                }
            }
        }
    

    这里其他节点收到PreVoteRequest的时候,会进行如下判断:

    1. 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
    2. 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
    3. 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
    4. 如果上面都不满足,返回granted=true

    这是其他节点收到PreVoteRequest的处理,我们再看发起preVote节点收到其他节点的响应是怎么处理的:

    public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
            boolean doUnlock = true;
            this.writeLock.lock();
            try {
                if (this.state != State.STATE_FOLLOWER) {
                    LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
                        getNodeId(), peerId, this.state);
                    return;
                }
                if (term != this.currTerm) {
                    LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
                        peerId, term, this.currTerm);
                    return;
                }
                if (response.getTerm() > this.currTerm) {
                    LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
                        response.getTerm(), this.currTerm);
                    stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                        "Raft node receives higher term pre_vote_response."));
                    return;
                }
                LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
                    response.getTerm(), response.getGranted());
                // check granted quorum?
                if (response.getGranted()) {
                    this.prevVoteCtx.grant(peerId);
                    if (this.prevVoteCtx.isGranted()) {
                        doUnlock = false;
                        electSelf();
                    }
                }
            } finally {
                if (doUnlock) {
                    this.writeLock.unlock();
                }
            }
        }
    

    这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息,我们看下其判断逻辑:

    1. 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
    2. 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
    3. 如果响应允许这次投票,即response.getGranted=true,判断本轮发起的投票同意是否过半。

    在NodeImpl中有两个Ballot,一个是支持preVote的prevVoteCtx,一个是支持vote,在发起preVote的时候,会对prevVoteCtx进行初始化:

    public boolean init(final Configuration conf, final Configuration oldConf) {
            this.peers.clear();
            this.oldPeers.clear();
            this.quorum = this.oldQuorum = 0;
            int index = 0;
            if (conf != null) {
                for (final PeerId peer : conf) {
                    this.peers.add(new UnfoundPeerId(peer, index++, false));
                }
            }
    
            this.quorum = this.peers.size() / 2 + 1;
            if (oldConf == null) {
                return true;
            }
            index = 0;
            for (final PeerId peer : oldConf) {
                this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
            }
    
            this.oldQuorum = this.oldPeers.size() / 2 + 1;
            return true;
        }
    

    可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
    这里我们在来看当preVote请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse的最后,会执行this.prevVoteCtx.grant(peerId);

    public void grant(final PeerId peerId) {
            grant(peerId, new PosHint());
        }
    public PosHint grant(final PeerId peerId, final PosHint hint) {
            UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
            if (peer != null) {
                if (!peer.found) {
                    peer.found = true;
                    this.quorum--;
                }
                hint.pos0 = peer.index;
            } else {
                hint.pos0 = -1;
            }
            if (this.oldPeers.isEmpty()) {
                hint.pos1 = -1;
                return hint;
            }
            peer = findPeer(peerId, this.oldPeers, hint.pos1);
            if (peer != null) {
                if (!peer.found) {
                    peer.found = true;
                    this.oldQuorum--;
                }
                hint.pos1 = peer.index;
            } else {
                hint.pos1 = -1;
            }
    
            return hint;
        }
    

    这里的判断逻辑很简单,响应的节点如果同意了这次投票,那么对应的投票信息Ballot法定人数quorum–,同时这里为了防止一个节点多次响应,标记每个节点只能响应一次。然后判断本次preVote投票是否过半:

    public boolean isGranted() {
            return this.quorum <= 0 && this.oldQuorum <= 0;
        }
    

    如果过半,开始正式选举electSelf:

    private void electSelf() {
            long oldTerm;
            try {
                LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
                if (!this.conf.contains(this.serverId)) {
                    LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
                    return;
                }
                if (this.state == State.STATE_FOLLOWER) {
                    LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
                    this.electionTimer.stop();
                }
                resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
                    "A follower's leader_id is reset to NULL as it begins to request_vote."));
                this.state = State.STATE_CANDIDATE;
                this.currTerm++;
                this.votedId = this.serverId.copy();
                LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
                this.voteTimer.start();
                this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
                oldTerm = this.currTerm;
            } finally {
                this.writeLock.unlock();
            }
    
            final LogId lastLogId = this.logManager.getLastLogId(true);
    
            this.writeLock.lock();
            try {
                // vote need defense ABA after unlock&writeLock
                if (oldTerm != this.currTerm) {
                    LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
                    return;
                }
                for (final PeerId peer : this.conf.listPeers()) {
                    if (peer.equals(this.serverId)) {
                        continue;
                    }
                    if (!this.rpcService.connect(peer.getEndpoint())) {
                        LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                        continue;
                    }
                    final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
                    done.request = RequestVoteRequest.newBuilder() //
                        .setPreVote(false) // It's not a pre-vote request.
                        .setGroupId(this.groupId) //
                        .setServerId(this.serverId.toString()) //
                        .setPeerId(peer.toString()) //
                        .setTerm(this.currTerm) //
                        .setLastLogIndex(lastLogId.getIndex()) //
                        .setLastLogTerm(lastLogId.getTerm()) //
                        .build();
                    this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
                }
    
                this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
                this.voteCtx.grant(this.serverId);
                if (this.voteCtx.isGranted()) {
                    becomeLeader();
                }
            } finally {
                this.writeLock.unlock();
            }
        }
    

    正式投票会进行如下操作:

    1. 当前currTerm++,voteTimer启动,voteCtx初始化
    2. 发送RequestVoteRequest请求,与preVote基本差不多,唯一区别PreVote=false

    我们再看其他节点收到投票怎么处理的:

    public Message handleRequestVoteRequest(final RequestVoteRequest request) {
            boolean doUnlock = true;
            this.writeLock.lock();
            try {
                if (!this.state.isActive()) {
                    LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
                    return RpcFactoryHelper //
                        .responseFactory() //
                        .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                            "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
                }
                final PeerId candidateId = new PeerId();
                if (!candidateId.parse(request.getServerId())) {
                    LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
                        request.getServerId());
                    return RpcFactoryHelper //
                        .responseFactory() //
                        .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                            "Parse candidateId failed: %s.", request.getServerId());
                }
    
                // noinspection ConstantConditions
                do {
                    // check term
                    if (request.getTerm() >= this.currTerm) {
                        LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                            request.getServerId(), request.getTerm(), this.currTerm);
                        // increase current term, change state to follower
                        if (request.getTerm() > this.currTerm) {
                            stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                                "Raft node receives higher term RequestVoteRequest."));
                        }
                    } else {
                        // ignore older term
                        LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                            request.getServerId(), request.getTerm(), this.currTerm);
                        break;
                    }
                    doUnlock = false;
                    this.writeLock.unlock();
    
                    final LogId lastLogId = this.logManager.getLastLogId(true);
    
                    doUnlock = true;
                    this.writeLock.lock();
                    // vote need ABA check after unlock&writeLock
                    if (request.getTerm() != this.currTerm) {
                        LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
                        break;
                    }
    
                    final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
                        .compareTo(lastLogId) >= 0;
    
                    if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
                        stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
                            "Raft node votes for some candidate, step down to restart election_timer."));
                        this.votedId = candidateId.copy();
                        this.metaStorage.setVotedFor(candidateId);
                    }
                } while (false);
    
                return RequestVoteResponse.newBuilder() //
                    .setTerm(this.currTerm) //
                    .setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
                    .build();
            } finally {
                if (doUnlock) {
                    this.writeLock.unlock();
                }
            }
        }
    

    收到投票请求节点的处理与preVote请求的处理逻辑上差不多:

    1. 判断请求的term和当前term大小,比请求的term大,返回Granted=false
    2. 判读当前log的位置是否比请求位置小,如果小证明发起请求节点数据位置比当前节点新,如果当前节点没有投票给其他节点,那么设置当前节点term为请求节点的term,同时将当前节点的投票votedId设置为请求节点,表名当前节点将选票投给了请求节点,当前节点会执行stepDown操作,不会进行选举,节点变为STATE_FOLLOWER,同时开启electionTimer定时器
    3. 返回判断当前节点term是否等于请求term且当前节点的选票ID和请求节点是否一致,如果满足上面两个条件,表明当前节点将票投给了请求节点

    接下来看请求节点收到响应节点的响应是如何处理的:

    public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
            this.writeLock.lock();
            try {
                if (this.state != State.STATE_CANDIDATE) {
                    LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",
                        getNodeId(), peerId, this.state);
                    return;
                }
                // check stale term
                if (term != this.currTerm) {
                    LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
                        peerId, term, this.currTerm);
                    return;
                }
                // check response term
                if (response.getTerm() > this.currTerm) {
                    LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(),
                        peerId, response.getTerm(), this.currTerm);
                    stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                        "Raft node receives higher term request_vote_response."));
                    return;
                }
                // check granted quorum?
                if (response.getGranted()) {
                    this.voteCtx.grant(peerId);
                    if (this.voteCtx.isGranted()) {
                        becomeLeader();
                    }
                }
            } finally {
                this.writeLock.unlock();
            }
        }
    

    大致逻辑如下:

    1. 判断响应的reponse的term和当前节点发起投票前的term是否一致,如果不一致,直接返回
    2. 判断响应的reposen的term是否比当前节点发起投票前的term大,如果满足,直接返回
    3. 判断当前节点已经获取的选票是否过半,如果过半,将当前节点晋升为Leader节点,执行becomeLeader逻辑

    becomeLeader使当前节点晋升为Leader节点,我们看看其实现:

    private void becomeLeader() {
            Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
            LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
                this.conf.getConf(), this.conf.getOldConf());
            // cancel candidate vote timer
            stopVoteTimer();
            this.state = State.STATE_LEADER;
            this.leaderId = this.serverId.copy();
            this.replicatorGroup.resetTerm(this.currTerm);
            // Start follower's replicators
            for (final PeerId peer : this.conf.listPeers()) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
                if (!this.replicatorGroup.addReplicator(peer)) {
                    LOG.error("Fail to add a replicator, peer={}.", peer);
                }
            }
    
            // Start learner's replicators
            for (final PeerId peer : this.conf.listLearners()) {
                LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
                if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
                    LOG.error("Fail to add a learner replicator, peer={}.", peer);
                }
            }
    
            // init commit manager
            this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
            // Register _conf_ctx to reject configuration changing before the first log
            // is committed.
            if (this.confCtx.isBusy()) {
                throw new IllegalStateException();
            }
            this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
            this.stepDownTimer.start();
        }
    

    主要逻辑为:

    1. 停止当前投票定时器
    2. 将所有非当前节点、角色为Follower的节点加入到复制组里面去
    3. 将所有角色为Learner的节点加入到复制组里面去
    4. 重置ballotBox(用来管理选票的)
    5. stepDownTimer启动

    这样Leader节点就被选出来。

    我们在看看如何写入数据的。

    数据写入和复制

    客户端通过RouteTable.getInstance().selectLeader(groupId)能够获取当前分组下的Leader节点信息,拼接待写入数据的Request对象,然后通过CliClientServiceImpl..getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {}能够向集群Leader节点进行数据写入。

    服务端则通过对应的RpcProcessor来处理写入的请求,获取到请求后取出数据部分封装成Task,然后通过Node.apply将task写入到node中去。
    而Node.apply只是将Task写入到了Disruptor的RingBuffer中去,如果对这块有疑问,可以看看这篇文章高性能队列Disruptor使用入门,原理和代码实现

    数据写入的时候,首先会将task转换成LogEntryAndClosure,同时会将Task.done相关信息放入到BallotBox的pendingMetaQueue和closureQueue队列中去(当数据写入完成之后会通过这两个queue取出task对应的done执行),然后将一批LogEntryAndClosure通过logManagerj将数据持久化写入。
    这里Node.apply传入的是一个Task类型:

    public class Task implements Serializable {
        private static final long serialVersionUID = 2971309899898274575L;
        private ByteBuffer        data             = LogEntry.EMPTY_DATA;
        private Closure           done;
        private long              expectedTerm     = -1;
    }
    

    这里的data就是我们写入的数据,而Closure done则是一个回调接口,当数据被写入到集群1/2+1节点成功之后会调用Closure.run(final Status status)方法。

    而NodeImpl中在写入Task是写入到了RingBuffer中,实际处理在LogEntryAndClosureHandler中:

    public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
                                                                                                              throws Exception {
                if (event.shutdownLatch != null) {
                    if (!this.tasks.isEmpty()) {
                        executeApplyingTasks(this.tasks);
                        reset();
                    }
                    final int num = GLOBAL_NUM_NODES.decrementAndGet();
                    event.shutdownLatch.countDown();
                    return;
                }
                this.tasks.add(event);
                if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                    executeApplyingTasks(this.tasks);
                    reset();
                }
            }
    

    最终在executeApplyingTasks进行实际写入,这块代码有点长,就不贴代码了,大概描述下:

    1. 首先判断当前节点是不是Leader节点,如果不是的话,Closure.run(错误状态)返回
    2. task的term和当前节点term不一致,同上,返回
    3. 调用BallotBoxappendPendingTask,这个逻辑需要注意下:
    public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
    		// 每个写入Task都生成一个Ballot ,并放入pendingMetaQueue,后续其他
            final Ballot bl = new Ballot();
            if (!bl.init(conf, oldConf)) {
                LOG.error("Fail to init ballot.");
                return false;
            }
            final long stamp = this.stampedLock.writeLock();
            try {
                if (this.pendingIndex <= 0) {
                    LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);
                    return false;
                }
                this.pendingMetaQueue.add(bl);
                this.closureQueue.appendPendingClosure(done);
                return true;
            } finally {
                this.stampedLock.unlockWrite(stamp);
            }
        }
    

    这里为每个写入任务都生成了一个Ballot,还记得上面选主投票的时候,用的也是这个来标记选举是否过半,这里也是一样,后续其他节点复制Leader该Task的数据的时候,会对应更新Leader中对应该Task的Ballot 投票信息,通过该Ballot 能够判断集群是否有过半节点已经完成了该Task的写入。同时也将Task写入集群过半节点成功之后的回调入口Closure 保存在closureQueue中,当其他节点写入Task成功更新对应Task的Ballot 的时候,会判断是否过半节点写入成功,如果成功则会回调对应Task的Closure的run方法。
    4. 调用logManager.appendEntries将数据写入
    5. 本地节点写入完成之后,回调LeaderStableClosure接口,逻辑为:

    public void run(final Status status) {
                if (status.isOk()) {
                    NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,
                        NodeImpl.this.serverId);
                } else {
                        this.firstLogIndex + this.nEntries - 1, status);
                }
            }
    

    而ballotBox.commitAt的逻辑如下:

    public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
            final long stamp = this.stampedLock.writeLock();
            long lastCommittedIndex = 0;
            try {
                if (this.pendingIndex == 0) {
                    return false;
                }
                if (lastLogIndex < this.pendingIndex) {
                    return true;
                }
                final long startAt = Math.max(this.pendingIndex, firstLogIndex);
                Ballot.PosHint hint = new Ballot.PosHint();
                for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
                    final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
                    hint = bl.grant(peer, hint);
                    if (bl.isGranted()) {
                        lastCommittedIndex = logIndex;
                    }
                }
                if (lastCommittedIndex == 0) {
                    return true;
                }
       
                this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
                this.pendingIndex = lastCommittedIndex + 1;
                this.lastCommittedIndex = lastCommittedIndex;
            } finally {
                this.stampedLock.unlockWrite(stamp);
            }
            this.waiter.onCommitted(lastCommittedIndex);
            return true;
        }
    

    这里可以看到,就是每个节点写入成功后,调用Leader节点的.ballotBox.commitAt,更新对应写入数据的投票信息,如果bl.isGranted,即完成了过半节点的写入,那么会调用this.waiter.onCommitted逻辑,这里最终会调用到StateMachineAdapter.onApply方法。
    6. 在节点成为Leader的时候,会初始化日志复制组:this.replicatorGroup.addReplicator(peer),对于集群中的每个除当前节点的节点都会启动一个Replicator进行复制同时会开启心跳超时定时器,开始的时候首先会发送一个空的EmptyEntries给到Follower,获取Follower节点的最新日志位置,获取到Follower节点的最新日志位置之后,会再次发送需要同步的日志
    7. 在Replicator中对Follower的响应进行处理onAppendEntriesReturned,如果Follower写入成功,会调用Node.BallotBox().commitAt 这里和步骤5处理一样

    这样就完成了数据的写入和日志的复制。

    可以看到jraft中的日志复制就是Leader向Follower节点发送数据然后Follower将发送的日志写入到本地。

    展开全文
  • )功能Leader electionLog replicationPersistenceTODOLog compactionTestBenchmark实现一个raft节点有两个线程(即两个EventLoop),一个跑rpc server,另一个跑raft算法以及rpc client。若将这两部分放在一个线程...
  • 笔者开源了自己实现的Java版Raft算法框架raft-core项目链接:https://github.com/wujiuye/delay-scheduler/tree/main/raft...
  • Raft算法的Java实现

    万次阅读 2019-06-06 11:29:20
    自己这几天在学习Redis的Sentinel高可用解决方案,Sentinel选主使用的是著名的Raft一致性算法,本文对Raft的选主作了介绍,具体的算法内容,请参考 Raft 英文Paper Raft的整体结构 Raft 通过选举一个高贵的领导...
  • 原标题:Etcd raft算法实现原理分析 Raft是一种为了管理日志的一致性算法,Raft算法出现之前主要用的是Paxos算法,但是由于Paxos算法的复杂性太高,理解难度较大,Raft在这样一个背景下诞生。Raft算法又是Etcd的核心...
  • broker 在集群模式下,需要实现两个功能点: 1、多节点broker之间commitLog 日志文件内容同步;...自动选主及切换: Zookeeper组件, 以及基于Raft等 分布式一致性算法实现。 基于zookeeper组件,需要额外部署zk集群
  • .NET Core平台的Raft算法实现。 该项目是我在奥维耶多大学的学士期末项目的一部分。 您可以找到使用D3js和ASP.NET的实现的可视化 1.内容 RaftCore :由Raft算法的可扩展实现组成的库项目。 RaftCoreTest :测试...
  • raftjava分布式一致性算法raft的java实现
  • 一致性算法之Raft算法

    千次阅读 2021-11-19 11:15:04
    一致性raft算法简单理解
  • ETCD------Raft算法

    2022-01-08 13:12:27
    相较于 Paxos,Raft 通过逻辑分离使其更容易理解和实现,目前,已经有十多种语言的 Raft 算法实现框架,较为出名的有 etcd、Consul 。 角色 Raft 角色 根据官方文档解释,一个 Raft 集群包含若干节点,Raft 把这些...
  • etcd中raft算法实现原理

    千次阅读 2018-07-12 17:28:01
    它提供了和 Paxos 算法相同的功能和性能,但是它的算法结构和 Paxos 不同,使得 Raft 算法更加容易理解并且更容易构建实际的系统。为了提升可理解性,Raft 将一致性算法分解成了几个关键模块,例如领导人选举、日志...
  • 我们直接看来raft算法可以解决什么问题,比如在分布式场景下,我们的中间件为了达到高可用,那么必定不可能是单体架构,那么有一种常见的架构就是leader-follower架构,如nacos的高可用架构就是采用的raft算法,如下...
  • Raft算法实现复制

    2019-12-03 22:38:24
    二、Raft算法复制 Raft 是由 Atomix 实施以在节点之间共享状态的特定分布式系统协议 。Atomix 核心依赖几种不同类型的协议来进行状态复制,范围从强一致性到弱一致性。 该 Raft 协议是 atomix 用于强一致的,分区...
  • Raft算法实现分布式的一致性什么是分布式一致性?Leader选举日志同步选举超时&心跳超时分裂投票日志同步的一致性 演示地址:http://thesecretlivesofdata.com/raft/ 什么是分布式一致性? 假设我们有一个单节点...
  • 分布式 raft 共识算法 go 实现 一、需要实现的接口 Fsm 客户端状态机接口,在 raft 内部调用此接口来实现状态机的相关操作,比如应用日志,生成快照,安装快照等。 Transport 在 raft 内部调用此接口的各个方法用于...
  • raft算法详解

    千次阅读 2022-01-24 11:05:23
    文章目录一、raft算法的背景1.什么是raft算法1.2Paxos 算法的缺点二、raft算法的基础1.raft角色2.Term3.RPC4.raft 三个子问题5.raft 五个特性三、选举(Leader Election)四、日志复制(Log Replication)1.日志的...
  • Nacos的Raft 算法

    千次阅读 2020-11-09 15:04:44
    Nacos Discovery 集群为了保证集群中数据的一致性,其采用了 Raft 算法。...Raft 通过选举Leader 并由 Leader 节点负责管理日志复制来实现各个节点间数据的一致性。 Raft 算法不是强一致性算法,是最终一致性算法
  • Raft算法

    2021-06-01 09:29:48
    Raft 的设计目标是简化 Paxos,使得算法既容易理解,也容易实现。 Paxos 和 Raft 都是分布式一致性算法,这个过程如同投票选举领袖(Leader),参选者(Candidate)需要说服大多数投票者(Follower)投票给他,一旦...
  • 一文带你领略Raft算法的魅力
  • 早期我们在rabbitmq的基础上搭建了一套可扩展消息中间件CRMQ1.0,由于rabbitmq的GM同步算法在性能等方面存在瓶颈,所以自研了基于raft算法的内部版本CRMQ2.0和腾讯云CMQ,在保证强一致高可靠的前提下,性能和可用性...
  • 分布式系统之Raft算法

    2021-06-22 20:22:36
    Raft 的设计目标是简化 Paxos,使得算法既容易理解,也容易实现。Paxos 和 Raft 都是分布式一致性算法,这个过程如同投票选举领袖(Leader),参选者(Candidate)需要说服大多数投票者(Follower)投票给他,一旦...
  • raft 协议是一个一致性算法,解决多台机器之间数据一致的问题。raft 声称简洁明了,可以取代非常复杂的 PAXOS 算法。然而翻看 raft 的论文后,会发现即便声称简洁明了,自己完整地实现 raft 还是很麻烦的。 etcd是一...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 15,984
精华内容 6,393
关键字:

raft算法实现