精华内容
下载资源
问答
  • CAP原则

    2019-12-12 11:25:01
    CAP 原则又称 CAP 定理,指的是在一个分布式系统中, Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼,最多只能同时满足其中的 2 个。 一致性(Consistency)...

    Overview

    CAP 原则又称 CAP 定理,指的是在一个分布式系统中, Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼,最多只能同时满足其中的 2 个。

    20191212103254

    • 一致性(Consistency

      在分布式系统中的所有数据备份,在同一时刻是否同样的值。(严格的一致性,所有节点访问同一份最新的数据副本)

    • 可用性(Availability

      在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性,不保证获取的数据为最新数据,但是保证最终一致性)

    • 分区容错性(Partition tolerance

      分布式系统在遇到任何网络分区故障的时候,仍然能够对外提供满足一致性和可用性的服务,除非整个网络环境都发生了故障。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在 CA 之间做出选择。

    CAP原则论证

    基本场景

    在一个CAP的基本场景中,网络中有两个节点N1和N2,可以简单的理解N1和N2分别是两台计算机,他们之间网络可以连通,N1中有一个应用程序A,和一个数据库V,N2也有一个应用程序B2和一个数据库V。现在,A和B是分布式系统的两个部分,V是分布式系统的数据存储的两个子数据库。

    20191212110038

    • 在满足一致性的时候,N1和N2中的数据是一样的,V0=V0。
    • 在满足可用性的时候,用户不管是请求N1或者N2,都会得到立即响应。
    • 在满足分区容错性的情况下,N1和N2有任何一方宕机,或者网络不通的时候,都不会影响N1和N2彼此之间的正常运作。

    网络正常运行,实际上同时满足CAP

    20191212111643

    用户向N1机器请求数据更新,程序A更新数据库V0为V1。分布式系统将数据进行同步操作M,将V1同步的N2中V0,使得N2中的数据V0也更新为V1,N2中的数据再响应N2的请求。

    • 一致性:N1和N2的数据库V之间的数据是否完全一样。
    • 可用性:N1和N2的对外部的请求能否做出正常的响应。
    • 分区容错性:N1和N2之间的网络是否互通。

    网络异常,CAP只能同时满足其中2个

    ]

    假设在N1和N2之间网络断开的时候,有用户向N1发送数据更新请求,那N1中的数据V0将被更新为V1。由于网络是断开的,所以分布式系统同步操作M,所以N2中的数据依旧是V0。这个时候,有用户向N2发送数据读取请求,由于数据还没有进行同步,应用程序没办法立即给用户返回最新的数据V1,怎么办呢?
    这里有两种选择:

    • 第一:牺牲数据一致性,保证可用性。响应旧的数据V0给用户。
    • 第二:牺牲可用性,保证数据一致性。阻塞等待,直到网络连接恢复,数据更新操作M完成之后,再给用户响应最新的数据V1。

    总结

    实际上,对于分布式系统来说,并不是 CAP 只能同时满足其中的 2 个,而是当网络出现问题时:因为 P 必须满足,所以只能再 AC 中二选一。

    取舍策略

    因为 CAP 最多只能同时满足其中的 2 个,所以不得不做个取舍。

    • CA without P

      如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。传统的关系型数据库RDBMS:Oracle、MySQL就是CA。

    • CP without A

      如果不要求A(可用),相当于每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。设计成CP的系统其实不少,最典型的就是分布式数据库,如Redis、HBase等。对于这些分布式数据库来说,数据的一致性是最基本的要求,因为如果连这个标准都达不到,那么直接采用关系型数据库就好,没必要再浪费资源来部署分布式数据库。

    • AP wihtout C

      要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。典型的应用就如某米的抢购手机场景,可能前几秒你浏览商品的时候页面提示是有库存的,当你选择完商品准备下单的时候,系统提示你下单失败,商品已售完。这其实就是先在 A(可用性)方面保证系统可以正常的服务,然后在数据的一致性方面做了些牺牲,虽然多少会影响一些用户体验,但也不至于造成用户购物流程的严重阻塞。

    主流分布式系统是如何选择的

    EurekaConsulZookeeperNacosEtcd
    CAPAPCPCPAP/CPCP
    一致性算法RaftZAB(类 PAXOS 协议)RaftRaft

    对于AP 来说实际上,不用关心一致性算法,所以 Eureka 中没有使用任何的数据强一致性算法保证不同集群间的 Server 的数据一致,仅通过数据拷贝的方式争取注册中心数据的最终一致性。

    而像 Zookeeper 这种分布式协调组件,数据的一致性是他们最最基本的要求。所以在极端环境下, ZooKeeper 可能会丢弃一些请求,消费者程序需要重新请求才能获得结果,也要保证数据一致性。

    对于 Nacos 来说,实现 AP 的同时,也使用了一致性算法 RaftNacos 是如何同时实现AP与CP的

    延伸阅读

    对于一个分布式系统来说。 P 是一个基本要求, CAP 三者中,只能在 CA 两者之间做权衡,并且要想尽办法提升 P
    某些情况下,APCP 的选择,可以看下这些公司是如何选择的:

    参考

    展开全文
  • 分布式之CAP原则详解

    万次阅读 多人点赞 2019-07-11 23:37:53
    CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容忍性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。 1998...

    一、概述

    CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)

    CAP原则的精髓就是要么AP,要么CP,要么AC,但是不存在CAP。

                                    

    It states, that though its desirable to have Consistency, High-Availability and Partition-tolerance in every system, unfortunately no system can achieve all three at the same time.
    在分布式系统的设计中,没有一种设计可以同时满足一致性,可用性,分区容错性 3个特性 

    1998年,加州大学的计算机科学家 Eric Brewer 提出,分布式系统有三个指标。

    1. 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值,即写操作之后的读操作,必须返回该值。(分为弱一致性、强一致性和最终一致性
    2. 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
    3. 分区容忍性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

    二、取舍策略

    CAP三个特性只能满足其中两个,那么取舍的策略就共有三种:

    2.1、CA without P:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。传统的关系型数据库RDBMS:Oracle、MySQL就是CA。

    2.2、CP without A:如果不要求A(可用),相当于每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。设计成CP的系统其实不少,最典型的就是分布式数据库,如Redis、HBase等。对于这些分布式数据库来说,数据的一致性是最基本的要求,因为如果连这个标准都达不到,那么直接采用关系型数据库就好,没必要再浪费资源来部署分布式数据库。

    2.3、 AP wihtout C:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。典型的应用就如某米的抢购手机场景,可能前几秒你浏览商品的时候页面提示是有库存的,当你选择完商品准备下单的时候,系统提示你下单失败,商品已售完。这其实就是先在 A(可用性)方面保证系统可以正常的服务,然后在数据的一致性方面做了些牺牲,虽然多少会影响一些用户体验,但也不至于造成用户购物流程的严重阻塞。

    三、主要矛盾-Consistency和Availability

    CAP理论就是说在分布式存储系统中,最多只能实现上面的两点。而由于网络硬件肯定会出现延迟丢包等问题,所以分区容错性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡,没有NoSQL系统能同时保证这三点。对于web2.0网站来说,关系数据库的很多主要特性却往往无用武之地

    1. 数据库事务一致性需求  —— 很多web实时系统并不要求严格的数据库事务,对读一致性的要求很低,有些场合对写一致性要求并不高。允许实现最终一致性。
    2. 数据库的写实时性和读实时性需求 —— 对关系数据库来说,插入一条数据之后立刻查询,是肯定可以读出来这条数据的,但是对于很多web应用来说,并不要求这么高的实时性,比方说发一条消息之 后,过几秒乃至十几秒之后,我的订阅者才看到这条动态是完全可以接受的。
    3. 对复杂的SQL查询,特别是多表关联查询的需求  —— 任何大数据量的web系统,都非常忌讳多个大表的关联查询,以及复杂的数据分析类型的报表查询,特别是SNS类型的网站,从需求以及产品设计角 度,就避免了这种情况的产生。往往更多的只是单表的主键查询,以及单表的简单条件分页查询,SQL的功能被极大的弱化了。

    与数据库的关系

    传统的关系型数据库(CA)在功能支持上通常很宽泛,从简单的键值查询,到复杂的多表联合查询再到事务机制的支持。而与之不同的是,NoSQL系统通常注重性能和扩展性,而非事务机制(事务就是强一致性的体现)。
      传统的SQL数据库的事务通常都是支持ACID的强事务机制。A代表原子性,即在事务中执行多个操作是原子性的,要么事务中的操作全部执行,要么一个都不执行;C代表一致性,即保证进行事务的过程中整个数据库的状态是一致的,不会出现数据花掉的情况;I代表隔离性,即两个事务不会相互影响,覆盖彼此数据等;D表示持久化,即事务一旦完成,那么数据应该是被写到安全的,持久化存储的设备上(比如磁盘)。
      NoSQL系统仅提供对行级别的原子性保证,也就是说同时对同一个Key下的数据进行的两个操作,在实际执行的时候是会串行的执行,保证了每一个Key-Value对不会被破坏。

    四、解决方案——BASE

    BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的简写,BASE是对CAP中一致性和可用性权衡的结果。

    核心思想:即使无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

    4.1、基本可用Basically Available

    基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性——但请注意,这绝不等价于系统不可用,以下两个就是“基本可用”的典型例子。

    • 响应时间上的损失:正常情况下,一个在线搜索引擎需要0.5秒内返回给用户相应的查询结果,但由于出现异常(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2秒。
    • 功能上的损失:正常情况下,在一个电子商务网站上进行购物,消费者几乎能够顺利地完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面。

    4.2、软状态Soft state

    软状态也称弱状态,和硬状态相对,是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时

    4.3、最终一致性Eventually consistent

    最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

    亚马逊首席技术官Werner Vogels在于2008年发表的一篇文章中对最终一致性进行了非常详细的介绍。他认为最终一致性时一种特殊的弱一致性:系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问都能够胡渠道最新的值。同时,在没有发生故障的前提下,数据达到一致状态的时间延迟,取决于网络延迟,系统负载和数据复制方案设计等因素。

    在实际工程实践中,最终一致性存在以下五类主要变种。

    因果一致性:

            因果一致性是指,如果进程A在更新完某个数据项后通知了进程B,那么进程B之后对该数据项的访问都应该能够获取到进程A更新后的最新值,并且如果进程B要对该数据项进行更新操作的话,务必基于进程A更新后的最新值,即不能发生丢失更新情况。与此同时,与进程A无因果关系的进程C的数据访问则没有这样的限制。

    读己之所写:

            读己之所写是指,进程A更新一个数据项之后,它自己总是能够访问到更新过的最新值,而不会看到旧值。也就是说,对于单个数据获取者而言,其读取到的数据一定不会比自己上次写入的值旧。因此,读己之所写也可以看作是一种特殊的因果一致性。

    会话一致性:

            会话一致性将对系统数据的访问过程框定在了一个会话当中:系统能保证在同一个有效的会话中实现“读己之所写”的一致性,也就是说,执行更新操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值。

    单调读一致性:

            单调读一致性是指如果一个进程从系统中读取出一个数据项的某个值后,那么系统对于该进程后续的任何数据访问都不应该返回更旧的值。

    单调写一致性:

             单调写一致性是指,一个系统需要能够保证来自同一个进程的写操作被顺序地执行。

    以上就是最终一致性的五类常见的变种,在时间系统实践中,可以将其中的若干个变种互相结合起来,以构建一个具有最终一致性的分布式系统。事实上,可以将其中的若干个变种相互结合起来,以构建一个具有最终一致性特性的分布式系统。事实上,最终一致性并不是只有那些大型分布式系统才设计的特性,许多现代的关系型数据库都采用了最终一致性模型。

    在现代关系型数据库中,大多都会采用同步和异步方式来实现主备数据复制技术。在同步方式中,数据的复制通常是更新事务的一部分,因此在事务完成后,主备数据库的数据就会达到一致。而在异步方式中,备库的更新往往存在延时,这取决于事务日志在主备数据库之间传输的时间长短,如果传输时间过长或者甚至在日志传输过程中出现异常导致无法及时将事务应用到备库上,那么很显然,从备库中读取的的数据将是旧的,因此就出现了不一致的情况。当然,无论是采用多次重试还是认为数据订正,关系型数据库还是能搞保证最终数据达到一致——这就是系统提供最终一致性保证的经典案例。

    总的来说,BASE理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性使相反的,它完全不同于ACID的强一致性模型,而是提出通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。但同时,在实际的分布式场景中,不同业务单元和组件对数据一致性的要求是不同的,因此在具体的分布式系统架构设计过程中,ACID特性与BASE理论往往又会结合在一起使用。

    小结:计算机系统从集中式向分布式的变革随着包括分布式网络、分布式事务和分布式数据一致性等在内的一系列问题与挑战,同时也催生了一大批诸如ACID、CAP和BASE等经典理论的快速发展。

    展开全文
  • CAP原则什么是CAP原则 什么是CAP原则 CAP原则是分布式里很重要的原则,具体如下: C(Consistency)一致性原则: 对于一个写操作,在任何一个节点都应当可以正确的读到该数据,各个节点都需要保证数据的一致性。 A ...


    什么是CAP原则

    CAP原则是分布式里很重要的原则,具体如下:

    1. C(Consistency)一致性原则: 对于一个写操作,在任何一个节点都应当可以正确的读到该数据,各个节点都需要保证数据的一致性。
    2. A (Availability) 可用性:对数据更新具备高可用性,请求能够及时处理,不会一直等待,即使出现节点失效。
    3. P (Partition tolerance) 分区容错性 : 能容忍网络分区,在网络断开的情况下,被分隔的节点仍能正常对外提供服务,简而言之,部分服务宕机不会影响整体的正确性。

    例如一个服务有两个副本,如果两个副本中间网络出现问题导致不能通信,那么会出现如下情况:

    1. 如果一个副本更新成功就返回成功,那么就保证了服务的可用性(A),但是牺牲了一致性(C),因为另一个副本没有得到更新。
    2. 如果必须等到两个副本都执行了更新命令,才会返回服务结果。这样可以保证一致性(C),但是牺牲了可用性(A)
    3. 如果一个副本更新成功返回成功,但是记录本次未同步的副本,等到副本直接通讯恢复,再执行数据同步,这样保证了可用性,保证了最终一致性,但存在了副本之间一段时间内的数据不一致(很多AP模型都是采用了这种方式保证最终一致性)。

    著名的一致性算法 Paxos , ZAB ,Raft 等

    CP模型

    常见的CP模型: zookeeper(ZAB算法)

    例如:数据库含有主从节点。

    在这里插入图片描述

    1. 客户端发起更新命令
    2. 主数据库发起同步数据命令,然后等待从库更新。
    3. 从数据库更新完成,告知主数据库数据同步完成。
    4. 主数据库得到从数据库的更新结果,然后告知客户端数据库更新完成。

    以上流程是一个简单的CP模型,它保证了数据的一致性。但是如果从节点这时与主节点是不能通信的,也就是说数据第二步的同步请求是无法发送的,自然也无法得到从节点的返回,这样主数据库就会返回客户端失败,丢失了可用性


    AP模型

    常见的AP模型:Nacos(同时支持CP模型),Eureka

    例如:同样一个更新操作
    在这里插入图片描述

    1. 客户端发送更新命令。
    2. 主数据库异步调用从数据库更新,但是不等待从数据库返回,直接将更新结果返回给客户端。

    以上流程中如果从数据库与主数据库不能通信,依旧可以保证数据库可用,保证了可用性,但是丢失主从数据库之间的一致性.

    必须三者取其二吗?

    其实并不是,自古忠孝难两全,但是那是指定特殊时期,和平时期是可以保证忠孝两全的。

    1. 副本不能通信造成分区的情况很好发生,在不存在分区问题时CA都是可以保证的。
    2. 即便发生了分区问题,服务治理算法也可以做部分分区可用,故障分区停用,保证可用分区的一致性可用性。牺牲掉部分故障分区,但是依旧能保证整体的C和A。
    3. 另外一致性和可用性并不是水火不容的关系,而是在故障时更倾向于保证什么而已。

    行业应用实例

    以下是当前主流技术栈对CAP原则的支持。一下实例可以发现,CAP原则如何选择在于它们应对的场景,CP模型和AP模型本质并没有优劣的区别。只是它们在解决不同的问题,它们的解决方案绝对它们应该去使用那种模型。


    Eureka

    Eureka使用AP模型,保证可用性作为首要条件。Eureka是服务发现中心,中心存在宕机时间,对服务提供服务有很大影响。另外Eureka客户端会对一致性做一次筛选,因此在短时间内的数据不一致是不会出现问题的。

    Nacos

    阿里巴巴用来替代Eureka的产品,它默认使用AP模型,但是支持CP模型。其原因并不是Nacos更强大,而是它们解决的问题不同。Nacos不只是发现中心,也是配置中心(配置中心一致性是数据库保证的),以后可能也会有更远大的设计目标,需要保证数据的强一致性。

    RocketMQ

    rocketMQ自己实现了一个服务治理服务NameServer,它采用了AP模型,原因是它只是个服务治理服务,它的实现相对简陋,应该说相当简洁,所以效率也相当的高,具体可以参考 : RocketMq总决式-NameServer源码1

    Zookeeper

    zookeeper采用CP模型保证数据一致性。缺点存在服务拒绝服务间隔(选举),好处数据一致性。原因是:zookeeper还提供了分布式锁的功能,数据一致性是必须的。

    展开全文
  • CAP原则 CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)这三个要素最多只能同时实现两点,不可能三者兼顾。 C:一致性...


    1. CAP原则

            CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)这三个要素最多只能同时实现两点,不可能三者兼顾。

    • C:一致性(Consistency)
      • 一致性要求各个节点查询的数据都一致,如果某个节点由于出现了分区,不能及时同步其他节点的数据,那么这个节点要么停止使用,要么整个系统停止使用。所以如果要保证强一致性,则会牺牲掉节点的可用性!比如zookeeper,当发生网络分区时,为了保证数据一致性,非Leader分区下的节点将变为不可用,并重新进入选举状态。
    • A:可用性(Availability)
      • 可用性要求所有节点尽量可用,就算出现了网络分区,不同节点之间的数据出现了不一致,但是仍然让该节点可用,所以会牺牲数据一致性。因为从不同节点读取到的数据可能不同!比如eurak、nacos都支持AP架构,当发生网络分区时,所有节点仍然可以读数据,但不保证读到的数据是最新的,不过也不用担心,最终会通过心跳保证数据的最终一致性!
    • P:分区容错性(Partition tolerance)
      • CAP协议的P是一定要保证的!不能说发生了网络分区,系统就不能提供服务了
      • 分区:分区是指网络分区。在多节点部署的系统中,由于网络原因,节点之间无法通信、无法同步数据,就出现了网络分区
      • 容错:容错是指当因为网络原因,系统节点出现了分区,对外仍然要提供服务。

    各种分布式中间件使用的架构如下:

    • mysql单机:CA架构
    • eureka集群:AP架构
    • zookeeper集群:CP架构
    • nacos集群:AP或CP架构,可根据实例的(临时、持久)类型,来选择AP或CP
    • redis集群:主要是AP架构,也可通过配置min-slaves-to-write = x(大于1个节点)去模拟CP架构


    2. BASE原则

    • BA:基本可用(Basically Available)
    • S:软状态(Soft State)
    • E:最终一致性(Eventual Consistency)

            CAP原则是三选二,BASE原则是CAP的折中,C,A,P三个都要,但不用100%的保证每一个原则。分布式系统肯定优先保证P,多数时候是在CA之间做权衡选择!

            满足AP的系统在一定程度上也可以说是符合 BASE原则的,比如eurka集群,三个节点挂了两个,系统还是基本可用的(BA)。此时如果有系统来注册了,因为挂了两个节点,这时整个系统各个节点的数据是不一致的,但是等挂掉的两个节点恢复了,数据会同步过去,保证最终一致性(E),对于中间数据暂时不一致的状态可以称为软状态(S)!


    3. CP架构是如何防止脑裂问题的

    AP架构

    • 向一个节点A写入数据成功后,立刻给客户端响应写成功的信号。
    • 如果此时集群节点之间网络断开了,由于其可用性,其他节点仍然提供服务,但是A节点的数据还未写入到其他节点,当访问除A之外的其他节点时,就会出现数据不一致的问题,当网络恢复后,才会通过心跳保证最终一致性!

    CP架构

    • 在向一个节点A写入数据成功后,并不是马上给客户端响应写成功的信号,而是等待数据同步到其他节点后(个数取决于配置),才响应客户端,表示此次写数据成功了!这在一定程度上保证了数据一致性。为了防止数据混乱,写数据时只允许往Leader节点写,读数据时可以从所有节点读取!
    • CP架构下具有特殊的Leader - Flower机制,当发生网络分区时,非Leader分区下的节点会变成不可用,重新进入选举状态,

    nacos和zookeeper是如何防止脑裂的?

    • 集群的脑裂通常是发生在集群之间通信不可达(分区)的情况下,一个大集群会分裂成不同的小集群,小集群中又各自选举出自己的master节点,导致原先的集群出现多个master节点对外提供服务的情况!
    • leader选举时,要求节点获取到的投票数量 > 总节点数量/2,有了这个选举原则,当发生网络分区时,无论如何最多只有一个小集群选出leader,避免集群发生脑裂。

    集群节点个数为什么推荐是奇数个?

    • 在集群启动时,偶数个节点的集群一旦节点对半分区(比如4个节点分区成2个节点和2个节点的情况),整个集群无法选出leader,集群无法提供服务,无法满足CAP中的P
    • 容错能力相同的情况下,奇数节点比偶数节点更节省资源,比如5个节点最多挂掉2个节点还能选leader,6个节点最多也只能挂掉2个节点才能保证可以选leader


    4. Nacos的CP架构源码解析

            Nacos的 CP 和 AP 架构的选择,取决于我们配置的服务实例是临时实例还是持久实例

    spring:
      cloud:
        nacos:
          discovery:
            server-addr: 127.0.0.1:8848
            group:  mall-order
            cluster-name: SH
            ephemeral: false   //持久化实例,使用 CP架构
            ephemeral: true	   //临时实例,使用 AP架构        
    

            AP架构的源码解析之前已经发表过一篇文章,可以点击查看:Nacos的AP架构下,服务的注册与发现!本节主要解释一下Nacos的CP架构的以下几点:其他功能与AP架构类似,不做赘述!

    • 持久化服务注册
    • Leader选举
    • 发送心跳、同步数据


    4.1 注册持久化服务,同步其他节点!

            CP架构下的服务的注册与AP架构不同点在于:向nacos服务端注册实例时的consistencyService.put(key, instances)方法实现不同!

        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
                throws NacosException {
    
            //获取实例的key。key分为临时实例 和 持久化实例
            //根据入参ephemeral去判断,ephemeral默认为true,默认是临时实例
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
            Service service = getService(namespaceId, serviceName);
    
            synchronized (service) {
                //更新或者新增(临时、持久)实例
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                //把(临时、持久)实例放入队列
                //注意:此处会根据实例类型 选择AP架构或者CP架构 的存储方式!
                consistencyService.put(key, instances);
            }
        }
    

    在执行consistencyService.put方法时,nacos会根据不同的实例类型选择不同的架构

    1. 临时实例,选择AP架构,使用Distro协议,分布式协议的一种,阿里内部的协议,服务是放在内存中!
    2. 持久实例,选择CP架构,使用Raft协议来实现,点击查看Raft协议详情!服务是放在磁盘中!

    在这里插入图片描述
    由于本节探讨的CP架构,使用的Raft协议,所以进入RaftConsistencyServiceImpl类中,查看真正的注册逻辑

    • 由于Raft协议规定,写操作只能由Leader节点去操作你,所以要先查看本节点是否是leader,如果当前节点不是leader,需要把这个写请求发给Leader,让Leader节点去操作,进行服务注册!
    • 如果本节点就是Leader,则开始服务注册
      • 服务注册时,先加同步锁,再向磁盘中写入文件,文件内容就是当前服务
      • 把服务写入磁盘中nacos/data/naming/datas/public/xxx服务文件 目录下,并发布服务变更事件,nacos监听到此事件并修改服务列表
      • Leader节点写完后,需要把数据同步到其他节点,使用CountDownLatch(集群节点数/2 + 1)保证集群半数节点以上同步成功

    代码如下:

    com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#RaftCore 类中signalPublish方法如下:

        public void signalPublish(String key, Record value) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            //如果当前server节点不是leader
            if (!isLeader()) {
                ObjectNode params = JacksonUtils.createEmptyJsonNode();
                params.put("key", key);
                params.replace("value", JacksonUtils.transferToJsonNode(value));
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
    
                //获取leader节点
                final RaftPeer leader = getLeader();
                //把当前服务的写入请求,发给leader节点,让leader去做
                raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
                return;
            }
            //如果当前节点是leader,先加锁,再向磁盘写入文件,文件内容就是当前服务
            OPERATE_LOCK.lock();
            try {
                final long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
    
                ObjectNode json = JacksonUtils.createEmptyJsonNode();
                json.replace("datum", JacksonUtils.transferToJsonNode(datum));
                json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
                //把服务写入磁盘,并发布服务变动事件,
                // 监听器监听到事件后,更新内存中服务列表(双层map结构)
                //注意:如果严格按照Raft的协议来做的话,应该是写入本地磁盘文件后,立马通知其他集群节点
                // 但在这里的顺序是: 写磁盘--更新内存服务列表--通知集群其他节点 
                onPublish(datum, peers.local());
    
                final String content = json.toString();
                //CP架构下,leader写完后需要同步给其他节点
                //使用CountDownLatch来做同步
                //peers.majorityCount() = 集群节点数/2 + 1,代表集群半数节点同步成功
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                //peers.allServersIncludeMyself():遍历包括当前节点在内的节点
                for (final String server : peers.allServersIncludeMyself()) {
                    if (isLeader(server)) {
                        //如果是当前节点 CountDownLatch-1
                        latch.countDown();
                        //继续循环
                        continue;
                    }
                    //如果是其他节点,调API发送服务信息
                    final String url = buildUrl(server, API_ON_PUB);
                    //异步发送
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            //发送回调
                            if (!result.ok()) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, result.getCode());
                                return;
                            }
                            //发送完 CountDownLatch-1
                            latch.countDown();
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                        }
    
                        @Override
                        public void onCancel() {
    
                        }
                    });
    
                }
                // await 等待CountDownLunch执行完毕!
                //如果超时 ,抛异常!
                //但是这里有个bug:往其他节点写数据出现问题时,这里跑了异常,但是主节点却保存服务成功了!!理论上主节点应该同时保存失败的!
                //新版本使用了jRaft协议来替换,使用两段式提交的方式避免了这个bug
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
    
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }
    

            其中向磁盘写入服务,并发布服务变更时事件ValueChangeEventonPublish方法如下:该服务变更时事件ValueChangeEvent被监听到后会触发updateIps(),该方法与nacos的AP架构中的介绍一致!可自行前往查看!

        public void onPublish(Datum datum, RaftPeer source) throws Exception {
        
    		。。。。。 //省略代码
    	
            // if data should be persisted, usually this is true:
            if (KeyBuilder.matchPersistentKey(datum.key)) {
                //向磁盘写入服务文件,目录为:nacos/data/naming/datas/public/xxx服务文件
                raftStore.write(datum);
            }
            
    		。。。。。 //省略代码
    	
            //服务写完后,发布服务变动事件,nacos监听到此事件并修改服务列表
            NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }
    

    接下来总结一下使用CP架构时,nacos服务端的服务注册逻辑:

    1. Leader节点把服务写入磁盘中nacos/data/naming/datas/public/xxx服务文件 目录下
    2. 同时发布服务变动事件,由其他线程监听事件,更新nacos server端的服务列表(双层map结构)
    3. 通过异步的方式同步集群中其他节点,其他节点与Leader节点的操作一致!


    4.2 Leader选举

            上面介绍了CP架构下的服务注册,接下来看一下nacos集群启动时是如何进行Leader选举的!由于nacos的CP架构使用的Raft协议,所以在Nacos集群启动时,也会经过半数选举机制为集群选择一个Leader节点,负责接收数据,同步数据!Raft协议中只是简单画出了Leader选举示意图,点击可查看!.,接下来看一下Ncaos底层是如何实践Raft协议的:

            Nacos的leader选举是发生在RaftCore类中的!但是这个类在源码中使用了@Deprecated标识,说明这个类可能在未来会过期,被新的实现替换掉。目前还是先研究一下这个类吧!

    com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore

            RaftCore类中有一个init方法,使用了@PostConstruct标识,说明这个方法会在RaftCore类初始化完成时调用,进入init方法:

       //leader选举方法
        @PostConstruct
        public void init() throws Exception {
            Loggers.RAFT.info("initializing Raft sub-system");
            final long start = System.currentTimeMillis();
            //从服务存储目录中加载服务文件到内存中
            raftStore.loadDatums(notifier, datums);
    
            setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    
            Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
    
            initialized = true;
    
            Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
    
            //两个延时定时线程池 执行两个任务
            //1.leader选举任务
            masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
            //2.心跳任务
            heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
    
            versionJudgement.registerObserver(isAllNewVersion -> {
                stopWork = isAllNewVersion;
                if (stopWork) {
                    try {
                        shutdown();
                        raftListener.removeOldRaftMetadata();
                    } catch (NacosException e) {
                        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
                    }
                }
            }, 100);
    
            NotifyCenter.registerSubscriber(notifier);
    
            Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
                    GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
        }
    
    

    可以看到在init方法内部有两个延时定时线程池,分别执行了leader选举任务 和 心跳任务!

     	//Leader选举线程池 ,立即执行一次选择,然后每500ms执行一次
     	//TICK_PERIOD_MS : 500 毫秒
        public static ScheduledFuture registerMasterElection(Runnable runnable) {
            return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
        }
    
    	//心跳任务线程池: 与leader选举一致
        public static ScheduledFuture registerHeartbeat(Runnable runnable) {
            return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
        }
    

            我们先看Leader选举任务是如何执行的,进入new MasterElection()类中,由于MasterElection类实现了Runnable接口,所以直接进入其run()方法内部!

       public class MasterElection implements Runnable {
    		
    		//进入run方法!
            @Override
            public void run() {
                try {
                    if (stopWork) {
                        return;
                    }
                    //选举完成后进入,直接return,不会一直选举
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    //随机休眠
                    local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
    
                    if (local.leaderDueMs > 0) {
                        return;
                    }
    
                    // reset timeout
                    local.resetLeaderDue();
                    local.resetHeartbeatDue();
                    //休眠结束 进行投票,进入下面的投票方法
                    sendVote();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while master election {}", e);
                }
    
            }
            
            ==========================================
            
    		//投票方法
            private void sendVote() {
    
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
                        local.term);
    
                peers.reset();
                //选举周期 +1
                local.term.incrementAndGet();
                //voteFor先投给自己
                local.voteFor = local.ip;
                //设置状态为:候选者CANDIDATE,候选者才能参与投票
                local.state = RaftPeer.State.CANDIDATE;
    
                Map<String, String> params = new HashMap<>(1);
                params.put("vote", JacksonUtils.toJson(local));
                //遍历除了自己之外的节点
                for (final String server : peers.allServersWithoutMySelf()) {
                    //调用nacos提供的API给除自己之外的其他节点发送选票信息
                    final String url = buildUrl(server, API_VOTE);
                    try {
                        //异步发送
                        HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                            @Override
                            //发送后回调,回调主要是获取其他节点给当前节点的投票结果
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                                    return;
                                }
                                //从别处收到的投票结果
                                //如果当前节点随机时间最先走完,这里收到选票结果肯定其他节点都投的是自己(当前节点)!
                                RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
    
                                Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                                //根据收到的选票个数看是否大于集群半数,来决定当前节点是否被选为leader
                                peers.decideLeader(peer);
    
                            }
    
                            @Override
                            public void onError(Throwable throwable) {
                                Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
                            }
    
                            @Override
                            public void onCancel() {
    
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }
    
    

    其中sendVote方法内部的decideLeader方法中会比较选票个数是否大于集群节点个数的一半,进而决定当前节点是否当选leader

        public RaftPeer decideLeader(RaftPeer candidate) {
        
    		。。。。。。//省略代码
    		
            //如果选票大于集群半数,设置State状态为leader
            // majorityCount() = peers.size() / 2 + 1
            if (maxApproveCount >= majorityCount()) {
                RaftPeer peer = peers.get(maxApprovePeer);
                //设置State状态为leader
                peer.state = RaftPeer.State.LEADER;
    
                if (!Objects.equals(leader, peer)) {
                    leader = peer;
                    ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
                    Loggers.RAFT.info("{} has become the LEADER", leader.ip);
                }
            }
    
            return leader;
        }
    


    4.3 发送心跳

            nacos的发送心跳也是由上文的定时线程池去触发的,每隔5秒发一次心跳,进入HeartBeatrun方法中

       public class HeartBeat implements Runnable {
    
            @Override
            public void run() {
                try {
                    if (stopWork) {
                        return;
                    }
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    if (local.heartbeatDueMs > 0) {
                        return;
                    }
                    //心跳间隔5s
                    local.resetHeartbeatDue();
                    //发送心跳
                    sendBeat();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
                }
            }
    

    发送心跳的主要逻辑就是sendBeat()方法,主要内容有

    • 检查当前节点是否是Leader,只有Leader才能发心跳!
    • 遍历所有服务,把所有服务的key组装起来,并压缩一下,然后发送给出自己以外所有节点
           private void sendBeat() throws IOException, InterruptedException {
                RaftPeer local = peers.local();
                //raft协议规定:如果不是leader,不能发心跳
                if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
                    return;
                }
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
                }
    
                local.resetLeaderDue();
    
                // build data 构建数据包packet
                ObjectNode packet = JacksonUtils.createEmptyJsonNode();
                packet.replace("peer", JacksonUtils.transferToJsonNode(local));
                //数据数组,这个数组会放在数据包packet中
                ArrayNode array = JacksonUtils.createEmptyArrayNode();
    
                if (switchDomain.isSendBeatOnly()) {
                    Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
                }
    
                if (!switchDomain.isSendBeatOnly()) {
                    //遍历所有的之前已经加载到内存中的服务
                    //注意:一开始会把磁盘的服务加载到内存中去
                    for (Datum datum : datums.values()) {
                        //要发送的数据元素
                        ObjectNode element = JacksonUtils.createEmptyJsonNode();
    
                        if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                            //只获取服务的key,不发送整个服务,目的是为了更轻量
                            element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                        } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                            element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                        }
                        //添加时间戳
                        element.put("timestamp", datum.timestamp.get());
                        //把每一个服务的key放入数据数组中
                        array.add(element);
                    }
                }
    
                //把数据数组放进数据包packet中
                packet.replace("datums", array);
                // broadcast
                Map<String, String> params = new HashMap<String, String>(1);
                params.put("beat", JacksonUtils.toJson(packet));
    
                String content = JacksonUtils.toJson(params);
    
                //发送数据的输出流
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                //压缩输出流
                GZIPOutputStream gzip = new GZIPOutputStream(out);
                gzip.write(content.getBytes(StandardCharsets.UTF_8));
                gzip.close();
    
                byte[] compressedBytes = out.toByteArray();
                String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                            compressedContent.length());
                }
                //遍历除了自己以外的节点,发送心跳,请求接口 /raft/beat
                for (final String server : peers.allServersWithoutMySelf()) {
                    try {
                        final String url = buildUrl(server, API_BEAT);
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("send beat to server " + server);
                        }
                        //异步发送
                        HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                    return;
                                }
    
                                peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                                if (Loggers.RAFT.isDebugEnabled()) {
                                    Loggers.RAFT.debug("receive beat response from: {}", url);
                                }
                            }
    
    		。。。。。。。 //省略代码!
    

    Leader发送心跳会请求nacos服务端的/raft/beat接口, 其他节点在接收到心跳后会做什么呢,让我们看一下/raft/beat接口的逻辑,

        //接受心跳接口
        @PostMapping("/beat")
        public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
            if (versionJudgement.allMemberIsNewVersion()) {
                throw new IllegalStateException("old raft protocol already stop");
            }
            //解码、解压缩
            String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
            String value = URLDecoder.decode(entity, "UTF-8");
            value = URLDecoder.decode(value, "UTF-8");
    
            JsonNode json = JacksonUtils.toObj(value);
    
            //接收心跳
            RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
    
            return JacksonUtils.transferToJsonNode(peer);
        }
    
    

    receivedBeat是真正的接受心跳方法,具体接受心跳逻辑如下:

    • 先把心跳内容解压缩出来,然后接受心跳
    • 判断发送者是否是Leader,以及接受者是否是Flower,如不是,抛异常
    • 遍历leader发过来的服务的key信息,并做批量处理
    • 因为发心跳时为了轻量,只发了服务的key信息,Flower还需要根据服务的key信息请求Leader节点的ip,并根据Key信息拉取完整的服务数据,保存在Flower本地!

    具体源码如下:

       public RaftPeer receivedBeat(JsonNode beat) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            //拿到心跳信息
            final RaftPeer local = peers.local();
            final RaftPeer remote = new RaftPeer();
            JsonNode peer = beat.get("peer");
            remote.ip = peer.get("ip").asText();
            remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
            remote.term.set(peer.get("term").asLong());
            remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
            remote.leaderDueMs = peer.get("leaderDueMs").asLong();
            remote.voteFor = peer.get("voteFor").asText();
    
            //如果发送者不是leader,抛异常
            if (remote.state != RaftPeer.State.LEADER) {
                Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                        JacksonUtils.toJson(remote));
                throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
            }
            //如果当前节点票数大于发送者的票数,抛异常
            if (local.term.get() > remote.term.get()) {
                Loggers.RAFT
                        .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
                                remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
                throw new IllegalArgumentException(
                        "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
            }
            //如果当前节点不是FOLLOWER节点,直接把自己变成FOLLOWER节点,因为FOLLOWER节点才能接收心跳
            if (local.state != RaftPeer.State.FOLLOWER) {
    
                Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
                // mk follower
                //设置自己为FOLLOWER
                local.state = RaftPeer.State.FOLLOWER;
                local.voteFor = remote.ip;
            }
    
            final JsonNode beatDatums = beat.get("datums");
            local.resetLeaderDue();
            local.resetHeartbeatDue();
    
            peers.makeLeader(remote);
    
            if (!switchDomain.isSendBeatOnly()) {
    
                //创建一个map,用于接受心跳中带来的服务的key信息
                Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
    
                for (Map.Entry<String, Datum> entry : datums.entrySet()) {
                    receivedKeysMap.put(entry.getKey(), 0);
                }
    
                // now check datums
                List<String> batch = new ArrayList<>();
    
                int processedCount = 0;
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT
                            .debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                                    beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
                }
                //flower节点获取服务步骤如下:
                //遍历leader心跳发过来的数据
                for (Object object : beatDatums) {
                    processedCount = processedCount + 1;
    
                    JsonNode entry = (JsonNode) object;
                    //发过来的心跳包括:服务的key信息,
                    // 为什么心跳会发送服务的key呢?,因为key足够小,并且经过了压缩,理论上一次心跳可以发送几万个服务的key信息
                    String key = entry.get("key").asText();
                    final String datumKey;
    
                    if (KeyBuilder.matchServiceMetaKey(key)) {
                        datumKey = KeyBuilder.detailServiceMetaKey(key);
                    } else if (KeyBuilder.matchInstanceListKey(key)) {
                        datumKey = KeyBuilder.detailInstanceListkey(key);
                    } else {
                        // ignore corrupted key:
                        continue;
                    }
    
                    long timestamp = entry.get("timestamp").asLong();
    
                    receivedKeysMap.put(datumKey, 1);
    
                    try {
    
                        if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                                && processedCount < beatDatums.size()) {
                            continue;
                        }
                        //批量处理服务的key信息,先把key分批
                        if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                            batch.add(datumKey);
                        }
                        //一批key组成的数组可以有50个key,如果不够继续添加,够50个了再去处理
                        if (batch.size() < 50 && processedCount < beatDatums.size()) {
                            continue;
                        }
    
                        String keys = StringUtils.join(batch, ",");
    
                        if (batch.size() <= 0) {
                            continue;
                        }
    
                        Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
                                        + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
                                processedCount, beatDatums.size(), datums.size());
    
                        // 找到leader的url,准备根据服务的key拉取服务的完整信息。
                        // 因为此时Flower节点只是通过心跳拿到了服务的key而已,并没有服务的完整信息,所以要回调leader
                        String url = buildUrl(remote.ip, API_GET);
                        Map<String, String> queryParam = new HashMap<>(1);
                        queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
    
                        //最终flower节点会根据key信息 ,自己去leader的ip里拉取对应的服务实例,并保存在磁盘中
                        HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    return;
                                }
                                
    			。。。。。。。 //省略代码
    


    5. 集群、分布式与微服务的区别

    • 集群:同一个业务,部署在多个服务器上
    • 分布式:分为 分布式部署 和 分布式存储
      • 分布式部署:一个业务拆分成多个子业务,每个业务分别存储在不同的服务器上
      • 分布式存储:存储一台机器上的数据被拆分成多份,存储在不同的服务器上
    • 微服务:微服务就是一种分布式部署架构!
    展开全文
  • 图解: 分布式的CAP原则

    千次阅读 2019-11-04 21:30:18
    CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。 CAP原则是NOSQL数据库的基石。 分布式系统的CAP理论:...
  • 大数据CAP原则(CAP定理)、BASE理论

    千次阅读 2018-07-28 00:22:20
    CAP原则(CAP定理)、BASE理论  CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。  CAP原则是NOSQL数据库的...
  • 大白话讲解分布式里面的Cap原则

    千次阅读 2019-08-02 09:37:39
    什么叫做cap Cap分别指可用性,分区容错性,一致性 ...所以在cap原则里面,分区容错性是必须要有的 一致性 一致性的意思是,写操作之后的读操作,必须返回该修改后的值。举例来说,某条记录...
  • 首先我们需要了解一下什么是CAP原则,C代表一致性,A代表高可用,P代表分区容错性,简单地来说CAP原则就是最多只能满足两个。鱼和熊掌不能兼得。那么为什么会造就成这样的情况呢?下面且听子月娓娓道来。 首先我们...
  • 【Java】分布式CAP原则

    2021-02-12 10:23:43
    CAP原则介绍CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。CAP...
  • 分布式系统的CAP原则

    2020-11-24 21:18:50
    CAP原则 分布式系统的三大原则 1、C:一致性 每个节点中的数据是一致的 2、A:可用性 当一部分节点瘫痪掉以后,集群是否还能响应客户端的请求 3、P:分区容错性 系统是否可以容忍节点之间的通信故障 在分布式...
  • 分布式系统-CAP原则

    2021-02-23 21:19:17
    分布式CAP原则 C(Consistency)一致性:分布式系统中所有主机在同一时刻是否可以保证数据一致,即写入一台主机后其他主机均写成功则存在一致性; A(Availability)可用性:在集群中部分节点发生故障是否会影响到...
  • CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。 简单...
  • CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。 CAP原则是NOSQL数据库的基石。Consistency(一致性)。 ...
  • CAP原则概述

    2020-02-03 14:52:47
    简介:讲解分布式核心知识CAP理论 CAP定理: 指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可同时获得。 一致性(C): 在分布式系统中的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,666
精华内容 7,466
关键字:

cap原则

友情链接: 10.22.rar