精华内容
下载资源
问答
  • 镜像队列
    2020-12-23 14:24:58

    RabbitMQ 经典镜像队列

    1. What’s Queue Mirroring

    • 镜像队列机制可以将队列镜像到集群中的其他Broker节点之上。

    • 一般来说,RabbitMQ中一个队列的内容保存在其声明的节点中。交换器和绑定起保存在所有的节点中。

    • 一个镜像队列由一个主队列和许多镜像队列组成,主队列通常所在的节点通常被认作主节点。每个队列都有自己的主节点。对一个镜像队列的操作会首先作用在其主节点,后传播到其镜像上。

    • 镜像队列只能用于集群的节点之间,不推荐在广域网上使用。

    • 消息会先发送到主队列,之后复制到其他队列上面。消费者无论连接哪一个节点,都会先消费主节点上的消息,镜像队列会丢弃已经被确认的消息。队列镜像机制强化了可用性,但没用在节点间分配负载、没有负载均衡机制。

    • 主队列节点挂掉以后,资历最老的镜像在同步完成后装配提升为主队列,

    2. How Mirroring is Configured

    • 镜像队列使用policy进行配置。policy通过正则表达式匹配一个或多个队列,其包含一个可以被加到整个匹配队列属性上的定义。

    • 和操控镜像相关的队列参数

      1. ha-mode

      2. ha-params

      3. ha-sync-mode 可选automatic或mannual

        • 将新节点加入已存在的镜像队列,默认情况下ha-sync-mode的取值为mannual,镜像队列中的消息不会主动同步到新的slave中,除非显示调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行其他操作,直到同步完成。
        • 当ha-sync-mode设置为automatic时,新加入的slave会默认同步已知的镜像队列。由于同步过程的限制,所以不建议对正在使用的队列进行操作。
      4. ha-promote-on-shutdown 决定未同步的镜像队列是否会接管关闭的主队列

        • 默认为when-synced,在所有slave均未同步的情况下,如果master因为主动原因停止,那么slave不会接管master,也就是此时镜像队列不可用。如果master因为被动原因停止,那么slave会接管master。保证消息可靠不丢失,放弃可用性。
        • 如果设置为always,那么不论master以何种原因停止,slave都会接管master,有限保证可用性,不过消息可能会丢失。
      5. ha-sync-batch-size 镜像同步批处理消息的数量。默认为1 ,一次只同步一条消息,

    • 平时使用rabbitmqctl list_queues只能看到本节点内的队列,当集群内某个节点挂掉时,可以看到该节点的内容。如果开启了镜像模式,则该队列的数据也会同步传送过来。

    • 排他队列不可以创建镜像

    3. Using Mirror Queue

    1. 使用rabbitmqctl set_policy --apply-to queues 指令来进行设置

      ##设置为vhost / 中的node2.queue1队列启用镜像队列,在所有添加进来的节点中为其增加镜像,同步策略为自动同步
      rabbitmqctl set_policy --apply-to queues mirrorP2 "node2.queue1" \
      > '{
      > "ha-mode":"all",
      > "ha-sync-mode":"automatic"}'
      
      Setting policy "mirrorP2" for pattern "node2.queue1" to "{
      "ha-mode":"all"
      ,"ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...
      

      设置成功后,rabbitmq会在日志中添加

      2020-12-22 22:00:10.409 [info] <0.15827.0> Successfully set policy 'mirrorP2' matching queues names in virtual host '/' using pattern 'node2.queue1'
      2020-12-22 22:00:10.411 [info] <0.7027.0> Mirrored queue 'node2.queue1' in vhost '/': Adding mirror on node rabbit@node2: <46700.11962.0>
      2020-12-22 22:00:10.430 [info] <0.7027.0> Mirrored queue 'node2.queue1' in vhost '/': Synchronising: 0 messages to synchronise
      2020-12-22 22:00:10.430 [info] <0.7027.0> Mirrored queue 'node2.queue1' in vhost '/': Synchronising: batch size: 4096
      2020-12-22 22:00:10.445 [info] <0.15836.0> Mirrored queue 'node2.queue1' in vhost '/': Synchronising: all mirrors already synced
      
    2. 检测镜像队列状态,不管是否已经同步

      rabbitmqctl list_queues name slave_pids synchronised_slave_pids
      
    3. 手动同步镜像队列

      rabbitmqctl sync_queue {name}
      
    4. 手动取消同步镜像

      rabbitmqctl cancel_sync_queue {name}
      
    更多相关内容
  • RabbitMQ 镜像队列 使用和原理详解

    千次阅读 2022-04-11 00:29:32
    文章目录1. 背景2. 镜像队列概述3. 使用方法和注意事项3.1 配置方法3.1.1 管理界面配置3.1.2 命令行3.1.3 HTTP API3.2 镜像队列... 镜像队列原理4.1 镜像队列的数据流4.1.1 客户端连接主节点4.1.2 客户端连接从节点4.

    1. 背景

    单节点的 RabbitMQ 存在性能上限,可以通过垂直或者水平扩容的方式增加 RabbitMQ 的吞吐量。垂直扩容指的是提高 CPU 和内存的规格;水平扩容指部署 RabbitMQ 集群。

    通过将单个节点的队列相对平均地分配到集群的不同节点,单节点的压力被分散,RabbitMQ 可以充分利用多个节点的计算和存储资源,以提升消息的吞吐量。

    但是多节点的集群并不意味着有更好的可靠性——每个队列仍只存在于一个节点,当这个节点故障,这个节点上的所有队列都不再可用。

    在 3.8 以前的版本,RabbitMQ 通过镜像队列(Classic Queue Mirroring)来提供高可用性。但镜像队列存在很大的局限性,在 3.8 之后的版本 RabbitMQ 推出了 Quorum queues 来替代镜像队列,在之后的版本中镜像队列将被移除。

    镜像队列通过将一个队列镜像(消息广播)到其他节点的方式来提升消息的高可用性。当主节点宕机,从节点会提升为主节点继续向外提供服务。

    本文将讲解镜像队列的使用方法和原理。

    2. 镜像队列概述

    RabbitMQ 以队列维度提供高可用的解决方案——镜像队列。

    配置镜像队列规则后,新创建的队列按照规则成为镜像队列。每个镜像队列都包含一个主节点(Leader)和若干个从节点(Follower),其中只有主节点向外提供服务(生产消息和消费消息),从节点仅仅接收主节点发送的消息。

    从节点会准确地按照主节点执行命令的顺序执行动作,所以从节点的状态与主节点应是一致的。

    3. 使用方法和注意事项

    3.1 配置方法

    3.1.1 管理界面配置

    使用策略(Policy)来配置镜像策略,策略使用正则表达式来配置需要应用镜像策略的队列名称,以及在参数中配置镜像队列的具体参数。

    按此步骤创建镜像策略,该策略为所有 mirror_ 开头的队列创建 3 副本镜像

    创建完的策略如下图显示

    参数解释:

    • Name: policy的名称,用户自定义。

    • Pattern: queue的匹配模式(正则表达式)。^表示所有队列都是镜像队列。

    • Definition: 镜像定义,包括三个部分ha-sync-mode、ha-mode、ha-params。

      • ha-mode: 指明镜像队列的模式,有效取值范围为all/exactly/nodes。
        • all:表示在集群所有的代理上进行镜像。
        • exactly:表示在指定个数的代理上进行镜像,代理的个数由ha-params指定。
        • nodes:表示在指定的代理上进行镜像,代理名称通过ha-params指定。
      • ha-params: ha-mode模式需要用到的参数。
      • ha-sync-mode: 表示镜像队列中消息的同步方式,有效取值范围为:automatic,manually。
        • automatic:表示自动向master同步数据。
        • manually:表示手动向master同步数据。
    • Priority: 可选参数, policy的优先级。

    3.1.2 命令行

    rabbitmqctl set_policy [-p vhost] [–priority priority] [–apply-to apply-to] name pattern definition

    例如,对队列名称以“queue_”开头的所有队列进行镜像,并在集群的两个节点上完成进行,policy的设置命令为:

    rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^mirror_" '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
    

    3.1.3 HTTP API

    https://www.rabbitmq.com/ha.html#examples

    PUT /api/policies/%2f/ha-two
    {
      "pattern":"^mirror_",
      "definition": {
        "ha-mode":"exactly",
        "ha-params":3,
        "ha-sync-mode":"automatic"
      }
    }
    

    3.2 镜像队列配置观测

    配置完 Policy 后,创建新的队列,或者原有的的队列,如果队列名称符合 Policy 的匹配规则,则该队列会自动创建为镜像队列。

    下图中 mirror_queue 匹配之前创建的镜像策略,为镜像队列。normal_queue 为普通队列

    镜像队列显示的蓝色 +2 表示同步副本数为 2 个。此处如果用红色显示,则表示为同步副本数

    显示的 mirror-policy 为该队列应用的镜像策略。

    点击队列名称可以进入查看队列详细信息,从中可以看出队列的主节点、从节点和镜像策略

    3.3 配置参数

    镜像队列有许多配置参数,表达了镜像队列的镜像策略和异常后的晋升策略。

    下面来详细解释一下这些配置参数的意义

    3.3.1 镜像策略

    ha-modeha-params结果
    exactlycount集群中队列副本的数量(主队列加上镜像)。count值为1表示一个副本:只有主节点。如果主节点不可用,则其行为取决于队列是否持久化。count值为2表示两个副本:一个队列主队列和一个队列镜像。换句话说:“镜像数=节点数-1”。如果运行队列主服务器的节点变得不可用,队列镜像将根据配置的镜像提升策略自动提升到主服务器。如果集群中的可用节点数少于count,则将队列镜像到所有节点。如果集群中有多个计数节点,并且一个包含镜像的节点宕机,那么将在另一个节点上创建一个新镜像。使用’ exactly ‘模式和’ ha-promot-on-shutdown ': ’ always '可能是危险的,因为队列可以跨集群迁移,并在停机时变得不同步。
    all不设置队列跨集群中的所有节点镜像。当一个新节点被添加到集群中时,队列将被镜像到该节点。这个设置非常保守。建议设置的副本值为大多数节点N / 2 + 1。镜像到所有节点会给所有集群节点带来额外的负担,包括网络I/O、磁盘I/O和磁盘空间的使用。
    nodes节点名称队列被镜像到节点名中列出的节点。节点名是在rabbitmqctl cluster_status中出现的Erlang节点名;它们的形式通常是“rabbit@hostname”。如果这些节点名中有任何一个不是集群的一部分,则不构成错误。如果在声明队列时列表中的节点都不在线,则将在声明客户机连接的节点上创建队列。

    3.3.2 新镜像同步策略

    ha-sync-mode说明
    manual这是默认模式。新队列镜像将不接收现有消息,它只接收新消息。一旦使用者耗尽了仅存在于主服务器上的消息,新的队列镜像将随着时间的推移成为主服务器的精确副本。如果主队列在所有未同步的消息耗尽之前失败,则这些消息将丢失。您可以手动完全同步队列,详情请参阅未同步的镜像部分。
    automatic当新镜像加入时,队列将自动同步。值得重申的是,队列同步是一个阻塞操作。如果队列很小,或者您在RabbitMQ节点和ha-sync-batch-size之间有一个快速的网络,那么这是一个很好的选择。

    3.3.3 从节点晋升策略

    镜像队列主节点出现故障时,最老的从节点会被提升为新的主节点。如果新提升为主节点的这个副本与原有的主节点并未完成数据的同步,那么就会出现数据的丢失,而实际应用中,出现数据丢失可能会导致出现严重后果。

    rabbitmq 提供了 ha-promote-on-shutdownha-promote-on-failure 两个参数让用户决策是保证队列的可用性,还是保证队列的一致性;两个参数分别控制正常关闭、异常故障情况下从节点是否提升为主节点,其可设置的值为 when-syncedalways

    ha-promote-on-shutdown/ha-promote-on-failure说明
    when-synced从节点与主节点完成数据同步,才会被提升为主节点
    always无论什么情况下从节点都将被提升为主节点

    这里要注意的是ha-promote-on-failure设置为always,插拔网线模拟网络异常的两个测试场景:当网络恢复后,其中一个会重新变为mirror,具体是哪个变为mirror,受cluster_partition_handling处理策略的影响。

    例如两台节点A,B组成集群,并且cluster_partition_handling设置为autoheal,队列的master位于节点A上,具有全量数据,mirror位于节点B上,并且还未完成消息的同步,此时出现网络异常,网络异常后两个节点交互决策:如果节点A节点成为赢家,此时B节点内部会重启,这样数据全部保留不会丢失;相反如果B节点成为赢家,A需要重启,那么由于ha-prromote-on-failure设置为always,B节点上的mirror提升为master,这样就出现了数据丢失。

    3.3.4 主队列选择策略

    RabbitMQ中的每个队列都有一个主队列。该节点称为队列主服务器。所有队列操作首先经过主队列,然后复制到镜像。这对于保证消息的FIFO排序是必要的。

    通过在策略中设置 queue-master-locator 键的方法可以定义主队列选择策略,这是常用的方法。

    此外,也可以用队列参数 x-queue-master-locator 或配置文件中定义 queue_master_locator 的方式指定,此处不再赘述。

    下面是该策略的可选参数列表

    queue-master-locator说明
    min-masters选择承载最小绑定主机数量的节点
    client-local选择客户机声明队列连接到的节点
    min-masters随机选择一个节点

    3.4 注意事项

    3.4.1 多少个镜像才是最优的

    镜像到所有节点会增加所有集群节点的负载,包括网络 I/O、磁盘 I/O 和磁盘空间的使用。

    在大多数情况下,在每个节点上都有一个副本是不必要的。对于3个或更多节点的集群,建议复制到(N/2+1)个节点,例如 3 个节点集群中的 2 个节点或 5 个节点集群中的 3 个节点。

    由于某些数据可能天生是短暂的或对时间非常敏感,因此对某些队列使用较少的镜像(甚至不使用任何镜像)是完全合理的。

    3.4.2 生产者确认和事务

    镜像队列同时支持生产者确认和事务机制。在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx.Commit-OK 的消息。

    同样的,在生产者确认机制中,生产者进行当前消息确认的前提是该消息被全部镜像接收。

    3.4.3 流控

    RabbitMQ 使用信用证机制限制消息生产的速度。当生产者收到队列的所有镜像授予的信用时,才允许发送新的消息。(这里的信用指的时发送许可。)如果有镜像没有授予生产者信用,会导致生产者生产阻塞。生产者会一直被阻塞,直到所有镜像都授予它信用值,或者有的镜像从集群中断开。

    Erlang 通过定时向所有节点发送心跳的方式检测断开的情况。发送心跳的间隔可以用 net_ticktime 来控制。

    3.4.4 主节点失效和消费者取消

    从镜像队列中消费的客户端可能希望知道他们所消费的队列已经失败转移。当镜像队列发生故障时,哪些消息被发送到哪个消费者的信息就丢失了,因此所有未被确认的消息都会被重新发送,并设置了 redelivered 的标志。消费者可能希望知道这将会发生。

    如果是这样,他们可以使用参数 x-cancel-on-ha-failover 设置为 true。然后,它们的消费将在故障转移时被取消,并发送消费者取消通知。然后消费者就有责任重新发行基本版。消费来重新开始消费。

    Channel channel = ...;
    Consumer consumer = ...;
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-cancel-on-ha-failover", true);
    channel.basicConsume("my-queue", false, args, consumer);
    

    这将使用参数集创建一个新的消费者。

    4. 镜像队列原理

    4.1 镜像队列的数据流

    4.1.1 客户端连接主节点

    首先看生产者消费者直接与主节点连接的情况。该情况下队列的主副本所在的节点与生产者/消费者直接连接,效率较高。

    生产者,消费者连接到 RabbitMQ 后,在 RabbitMQ 内部会创建对应的 Connection,Channel 进程。

    Connecton 进程从 socket 上接收生产者发送的消息后投递到 Channel 进程。

    在 Channel 进程中,根据消息发送的 exchange 与消息的 routing-key,在内部数据库的路由表中,查找所有匹配的 Queue 的进程 PID,然后将消息投递到Queue 的进程中。在镜像队列的情况下,Channel 进程除了将消息发送给队列的 Leader 进程外,还会将消息发送给队列所有的 Follower 进程,而 Follower 进程都在远端节点上,因此这里就多了一次集群间的网络交互。

    镜像队列的 Leader 进程收到消息后,需要将消息同步给所有的 Follower 进程。RabbitMQ 采用 GM(组播)算法实现,镜像队列中的 Leader 和所有 Follower 都会发送一次消息和接收一次消息,同时还会发送一次对消息的 ACK,和接收一次消息的 ACK。

    综上所述,生产者发送一条消息,队列 Leader 进程所在节点会收到两次:一次是生产者发送的,一次是队列 Follower 进程发送的;同样也会将消息对外发送两次:一次是生产者对应的 Channel 进程将消息发送给队列的 Follower 进程;一次是队列的 Leader 进程进行广播同步将消息发送给 Follower 进程。此外,镜像队列的GM算法实现 ,每条消息还会有额外的确认消息在集群间进行发送。

    再结合图中的情况,一条消息从生产者到消费,Node1节点是2进3出的流量,Node2节点是2进1出的流量。

    4.1.2 客户端连接从节点

    如果生产者和消费者连接的是从节点,根据镜像队列的机制,只有主节点向外提供服务,所以镜像队列的消费需要由 node2 的队列消费消息。

    一条消息从生产到消费,生产者消费者连接的节点是3进3出,队列master进程所在的节点是2进2出

    4.2 镜像队列的实现原理

    4.2.1 普通队列结构

    通常队列由两部分组成

    1. amqqueue_process ,负责协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息 confirm、acknowledge 等等
    2. backing_queue,它提供了相关的接口供 amqqueue_process 调用,完成消息的存储以及可能的持久化工作等。

    4.2.2 镜像队列结构

    镜像队列同样由这两部分组成,amqqueue_process 仍旧进行协议相关的消息处理,backing_queue 则是由 master 节点和 slave 节点组成的一个特殊的 backing_queue。Leader 节点和 Follower 节点都由一组进程组成,一个负责消息广播的 GM,一个负责对 GM 收到的广播消息进行回调处理。

    在 Leader 节点上回调处理是 coordinator,在slave节点上则是 mirror_queue_slave。mirror_queue_slave 中包含了普通的 backing_queue 进行消息的存储,Leader 节点中 backing_queue 包含在 mirror_queue_master 中由 amqqueue_process 进行调用。

    4.2.3 GM(Guaranteed Multicast)

    GM 模块实现的是一种可靠的组播通信协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到。

    它的实现大致为:将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上 : 当有节点失效时,相邻的节点会接管以保证本次广播的消息会复制到所有的节点。在 Leader 和 Follower 上的这些 GM 形成一个组 (gm_group) ,这个组的信息会记录在 Mnesia 中。不同的镜像队列形成不同的组。操作命令从 Leader 对应的 GM 发出后,顺着链表传送到所有的节点。由于所有节点组成了一个循环链表, Leader 对应的 GM 最终会收到自己发送的操作命令,这个时候 Leader 就知道该操作命令都同步到了所有的 slave 上。

    4.2.4 消息的广播

    消息从 Leader 节点发出,顺着节点链表发送。在这期间,所有的 Follower 节点都会对消息进行缓存,当 Leader 节点收到自己发送的消息后,会再次广播 ack 消息,同样 ack 消息会顺着节点链表经过所有的 Follower 节点,其作用是通知 Follower 节点可以清除缓存的消息,当 ack 消息回到 Leader 节点时对应广播消息的生命周期结束。

    下图为一个简单的示意图,A 节点为 Leader 节点,广播一条内容为 test 的消息。1 表示消息为广播的第一条消息;id=A表示消息的发送者为节点 A。右边是Follower 节点记录的状态信息。

    为什么所有的节点都需要缓存一份发布的消息呢?

    master发布的消息是依次经过所有slave节点,在这期间的任何时刻,有可能有节点失效,那么相邻的节点可能需要重新发送给新的节点。例如,A->B->C->D->A形成的循环链表,A为master节点,广播消息发送给节点B,B再发送给C,如果节点C收到B发送的消息还未发送给D时异常结束了,那么节点B感知后节点C失效后需要重新将消息发送给D。同样,如果B节点将消息发送给C后,B,C节点中新增了E节点,那么B节点需要再将消息发送给新增的E节点。

    5. 镜像队列实践

    在 RabbitMQ 3.8 中发布了新的 Quorum Queues,旨在完全代替原有的镜像队列。

    在许多情况下,仲裁队列将是比传统队列镜像更好的选择。鼓励读者熟悉仲裁队列,并考虑它们而不是经典的镜像队列

    5.1 镜像队列的缺点

    镜像队列最大的问题是其同步算法造成的低性能。镜像队列有如下几个设计缺陷

    5.1.1 设计缺陷 1:broker 离线后重新上线

    基本的问题是,当 broker 离线并再次恢复时,它在镜像中的任何数据都将被丢弃。这是关键的设计缺陷。现在,镜像已恢复在线,但为空,管理员需要做出决定:是否同步镜像。“同步”意味着将当前消息从 leader 复制到镜像。

    5.1.2 设计缺陷 2:同步阻塞

    此时第二个致命的设计缺陷显露了出来。如果要同步消息,会阻塞整个队列,让这个队列不可用。当队列比较短的时候这通常不是什么问题,但当队列很长或者消息总大小很大的时候,同步将会需要很长时间。不仅如此,同步会导致集群中与内存相关的问题,有时甚至会导致同步卡住,需要重新启动。

    默认情况下,所有镜像队列都会自动同步,但也有人用户不同步镜像。这样,所有新消息都将被复制,老消息都不会被复制,这将减少冗余,会使消息丢失的概率加大。

    这个问题也引发滚动升级的问题,因为重新启动的 broker 将丢弃其所有数据,并需要同步来恢复全部数据冗余。

    参考资料

    展开全文
  • 一、镜像队列使用 1.镜像队列作用 ​ RabbitMQ默认集群模式,并不包管队列的高可用性,尽管队列信息,交换机、绑定这些可以复制到集群里的任何一个节点,然则队列内容不会复制,固然该模式解决一项目组节点压力,但...

    一、镜像队列使用

    1.镜像队列作用

    ​ RabbitMQ默认集群模式,并不包管队列的高可用性,尽管队列信息,交换机、绑定这些可以复制到集群里的任何一个节点,然则队列内容不会复制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,须要创建镜像队列。

    2.策略设置

    镜像队列设置可以基于策略设置,策略设置可以通过如下两种方法:

    (1)RabbitMQ 管理后台

    (2)rabbitmqctl 设置

    policy 添加命令:

     

    rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
    

    指令参数详情

    参数名称描述
    -p可选参数,针对指定 vhost 下的exchange或 queue
    --priority可选参数,policy 的优先级
    --apply-to可选参数,策略适用的对象类型,其值可为 "queues", "exchanges" 或 "all".默认是"all"
    namepolicy 的名称
    pattern匹配模式(正则表达式)
    definition镜像定义,json 格式,包括三部分(ha-mode,ha-params,ha-sync-mode)具体配置见下表

    definition参数详情

    参数名称描述
    ha-mode指名镜像队列模式,其值可为"all","exactly"或"nodes",all:表示在集群所有节点上进行镜像;exactly:表示在指定个数的节点上镜像,节点个数由 ha-params 指定;nodes:表示在指定节点上进行镜像,节点名称通过ha-params 指定。
    ha-paramsha-mode模式需要用到的参数:exactly 模式下为数字表述镜像节点数,nodes 模式下为节点列表表示需要镜像的节点。
    ha-sync-mode镜像队列中消息的同步方式,其值可为"automatic"或"manually".

    例如:对队列名称为 hello 开头的所有队列镜像镜像,并且在集群的节点 rabbit@10.18.195.57上进行镜像,队列消息自动同步,policy 的设置命令:

     

    rabbitmqctl set_policy --apply-to queues hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@10.18.195.57"],"ha-sync-mode":"automatic"}'
    

    3.ha 策略确认

    镜像队列策略是否生效可以通过如下两种方式验证:

    (1)RabbitMQ 管理后台

    可以通过策略管理验证策略是否配置正确

    通过队列列表也可以查看队列应用的策略,如果是镜像策略,可以看到当前队列副本数

    通过队列详情可以查看镜像队列当前主副本在哪个节点,从副本在哪几个节点

    (2)rabbitmqctl 查看

    查看策略详情指令:

     

    rabbitmqctl list_policies
    

    返回:

    查看队列是否镜像指令:

     

    rabbitmqctl list_queues name pid slave_pids
    

    返回:

    二、镜像队列实现原理

    1.整体介绍

    ​ 通常队列由两部分组成:一部分是 amqqueue_process, 负责协议相关的消息处理,即接收生产者发布的消息,向消费者投递消息,处理消息 confirm,ack 等等;另外一部分是 backing_queue, 作为消息存储的具体形式和引擎,提供了相关接口供进程amqqueue_process调用,用来完成消息的存储及可能的持久化工作等。

    ​ 镜像队列和普通队列组成有所不同,镜像队列存在两类进程:master队列进程为 amqqueue_process,slave 队列进程为 rabbit_mirror_queue_slave,每个进程会创建一个 gm(guaranteed multicast)进程,镜像队列中所有 gm 进程会组成一个进程组用于广播和接收消息。同时和普通队列一样,每个进程都包含一个用于处理消息逻辑的队列 backing_queue(默认为rabbit_variable_queue)。集群中每个有客户端连接的节点都会启动若干个channel进程,channel进程中记录着镜像队列中master和所有slave进程的Pid,以便直接与队列进程通信。整体结构如下:

    ​ gm 负责消息广播,至于广播消息处理,master 队列上回掉处理是通过coordinator,消息相关协议操作是通过amqqueue_process处理,而 slave 队列都是由rabbit_mirror_queue_slave进行处理。

    注意:消息的发布和消费都是通过 master 队列完成,master 队列对消息进行处理同时将消息的处理动作通过 gm 广播给所有 slave 队列,slave 的 gm 收到消息后,通过回调交由 rabbit_mirror_queue_slave 进行实际处理。

    2.gm(Guaranteed Muticast)

    ​ 镜像队列 gm 组通过将所有 gm 进程形成一个循环链表,每个 gm 都会监控位于自己左右两边的 gm,当有 gm 新增时,相邻的 gm 保证当前广播的消息会通知到新的 gm 上;当有 gm 失效时,相邻的 gm 会接管保证本次广播消息会通知到所有 gm。

    ​ gm 组信息会记录在本地数据库(mnesia)中,不同的镜像队列行程的 gm 组也是不同的。

    ​ 消息从 master 队列对应的 gm 发出后,顺着链表依次传送到所有 gm 进程,由于所有 gm 进程组成一个循环链表,master 队列的 gm 线程最终会收到自己发送的消息,这个时候 master 队列就知道消息已经复制到所有 slave 队列了。

    3.重要的数据结构

    queue 队列相关信息

     

    -record(q, 
            { q,                    %% 队列信息数据结构amqqueue
              exclusive_consumer,   %% 当前队列的独有消费者
              has_had_consumers,    %% 当前队列中是否有消费者的标识
              backing_queue,        %% backing_queue对应的模块名字
              backing_queue_state,  %% backing_queue对应的状态结构
              consumers,            %% 消费者存储的优先级队列
              expires,              %% 当前队列未使用就删除自己的时间
              sync_timer_ref,       %% 同步confirm的定时器,当前队列大部分接收一次消息就要确保当前定时器的存在(200ms的定时器)
              rate_timer_ref,       %% 队列中消息进入和出去的速率定时器
              expiry_timer_ref,     %% 队列中未使用就删除自己的定时器
              stats_timer,          %% 向rabbit_event发布信息的数据结构状态字段
              msg_id_to_channel,    %% 当前队列进程中等待confirm的消息gb_trees结构,里面的结构是Key:MsgId Value:{SenderPid, MsgSeqNo}
              ttl,                  %% 队列中设置的消息存在的时间
              ttl_timer_ref,        %% 队列中消息存在的定时器
              ttl_timer_expiry,     %% 当前队列头部消息的过期时间点
              senders,              %% 向当前队列发送消息的rabbit_channel进程列表
              dlx,                  %% 死亡消息要发送的exchange交换机(通过队列声明的参数或者policy接口来设置)
              dlx_routing_key,      %% 死亡消息要发送的路由规则(通过队列声明的参数或者policy接口来设置)
              max_length,           %% 当前队列中消息的最大上限(通过队列声明的参数或者policy接口来设置)
              max_bytes,            %% 队列中消息内容占的最大空间
              args_policy_version,  %% 当前队列中参数设置对应的版本号,每设置一次都会将版本号加一
              status                %% 当前队列的状态
            }).
    
    

    state 记录 gm 进程状态

     

    -record(state,
            { self,                 %% gm本身的ID
              left,                 %% 该节点左边的节点
              right,                %% 该节点右边的节点
              group_name,           %% group名称与队列名一致
              module,               %% 回调模块rabbit_mirror_queue_slave或者rabbit_mirror_queue_coordinator
              view,                 %% group成员列表视图信息,记录了成员的ID及每个成员的左右邻居节点(组装成一个循环列表)
              pub_count,            %% 当前已发布的消息计数
              members_state,        %% group成员状态列表 记录了广播状态:[#member{}]
              callback_args,        %% 回调函数的参数信息,rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator进程PID
              confirms,             %% confirm列表
              broadcast_buffer,     %% 缓存待广播的消息
              broadcast_buffer_sz,  %% 当前缓存带广播中消息实体总的大小
              broadcast_timer,      %% 广播消息定时器
              txn_executor          %% 操作Mnesia数据库的操作函数
            }).
    
    

    gm_group 整个镜像队列群组的信息,该信息会存储到Mnesia数据库

     

    -record(gm_group, 
            { name,    %% group的名称,与queue的名称一致
              version, %% group的版本号, 新增节点/节点失效时会递增
              members  %% group的成员列表, 按照节点组成的链表顺序进行排序
            }).
    
    

    view_member 镜像队列群组视图成员数据结构

     

    -record(view_member, 
            { id,       %% 单个镜像队列(结构是{版本号,该镜像队列的Pid})
              aliases,  %% 记录id对应的左侧死亡的GM进程列表
              left,     %% 当前镜像队列左边的镜像队列(结构是{版本号,该镜像队列的Pid})
              right     %% 当前镜像队列右边的镜像队列(结构是{版本号,该镜像队列的Pid})
            }).
    

    三、镜像队列组群维护

    1.节点新加入组群

    目前已有节点 A,B,C,新加入节点 B,如图:

    节点加入集群流程如下:

    (1)新增节点先从 gm_group 中获取对应 group 成员信息;

    (2)随机选择一个节点并向这个节点发送加入请求;

    (3)集群节点收到新增节点请求后,更新 gm_group 对应信息,同时更新左右节点更新邻居信息(调整对左右节点的监控);

    (4)集群节点回复通知新增节点成功加入 group;

    (5)新增节点收到回复后更新 rabbit_queue 中的相关信息,同时根据策略同步消息。

    核心流程详解:

    (1)新增节点 D 的 GM 进程请求加入组群

     

    %% 同步处理将自己加入到镜像队列的群组中的消息
    handle_cast(join, State = #state { self          = Self,
                                       group_name    = GroupName,
                                       members_state = undefined,
                                       module        = Module,
                                       callback_args = Args,
                                       txn_executor  = TxnFun })->
        %% join_group函数主要执行逻辑
        %% 1.判断时候有存活节点,如果没有存活,则重新创建gm_group数据库数据
        %% 2.如果有存活GM进程,随机选择一个GM进程
        %% 3.将当前新增节点GM进程加入到选择的GM进程右侧
        %% 4.将所有存活的镜像队列组装成镜像队列循环队列视图A->D->B->C->A
        View = join_group(Self, GroupName, TxnFun),
        MembersState =
            %% 获取镜像队列视图的所有key列表
            case alive_view_members(View) of
                %% 如果是第一个GM进程的启动则初始化成员状态数据结构
                [Self] -> blank_member_state();
                %% 如果不是第一个GM进程加入到Group中,则成员状态先不做初始化,让自己左侧的GM进程发送过来的信息进行初始化
                _      -> undefined
            end,
        %% 检查当前镜像队列的邻居信息(根据消息镜像队列的群组循环视图更新自己最新的左右两边的镜像队列)
        State1 = check_neighbours(State #state { view = View, members_state = MembersState }),
        %% 通知启动该GM进程的进程已经成功加入镜像队列群组(rabbit_mirror_queue_coordinator或rabbit_mirror_queue_slave模块回调)
        handle_callback_result(
          {Module:joined(Args, get_pids(all_known_members(View))), State1});
    

    (2)GM 进程 A 处理新增 GM 进程到自己右侧

     

    %% 处理将新的镜像队列加入到本镜像队列的右侧的消息
    handle_call({add_on_right, NewMember}, _From,
                State = #state { self          = Self,
                                 group_name    = GroupName,
                                 members_state = MembersState,
                                 txn_executor  = TxnFun }) ->
        %% 记录将新的镜像队列成员加入到镜像队列组中,将新加入的镜像队列写入gm_group结构中的members字段中(有新成员加入群组的时候,则将版本号增加一)
        Group = record_new_member_in_group(NewMember, Self, 
                                           GroupName, TxnFun),
        %% 根据组成员信息生成新的镜像队列视图数据结构
        View1 = group_to_view(Group),
        %% 删除擦除的成员
        MembersState1 = remove_erased_members(MembersState, 
                                              View1),
        %% 向新加入的成员即右边成员发送加入成功的消息
        ok = send_right(NewMember, View1,
                        {catchup, Self,          
                         prepare_members_state(MembersState1)}),
        %% 根据新的镜像队列循环队列视图和老的视图修改视图,同时根据镜像队列循环视图更新自己左右邻居信息
        {Result, State1} = change_view(View1, State #state {
                                                            members_state = MembersState1 }),
        %% 向请求加入的镜像队列发送最新的当前镜像队列的群组信息
        handle_callback_result({Result, {ok, Group}, State1}).
    

    (3) GM进程 D 处理 GM 进程 A 发送过来成员状态信息

     

    %% 左侧的GM进程通知右侧的GM进程最新的成员状态(此情况是本GM进程是新加入Group的,等待左侧GM进程发送过来的消息进行初始化成员状态)
    handle_msg({catchup, Left, MembersStateLeft},
               State = #state { self          = Self,
                                left          = {Left, _MRefL},
                                right         = {Right, _MRefR},
                                view          = View,
                                %% 新加入的GM进程在加入后是没有初始化成员状态,是等待左侧玩家发送消息来进行初始化
                                members_state = undefined }) ->
        %% 异步向自己右侧的镜像队列发送最新的所有成员信息,让Group中的所有成员更新成员信息
        ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
        %% 将成员信息转化成字典数据结构
        MembersStateLeft1 = build_members_state(MembersStateLeft),
        %% 新增加的GM进程更新最新的成员信息
        {ok, State #state { members_state = MembersStateLeft1 }};
    

    2.节点失效

    ​ 当 Slave 节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息,更新 rabbit_queue, gm_group的记录。

    ​ 当 Master 节点失效时流程如下:

    (1)由于所有 mirror_queue_slave进程会对 amqqueue_process 进程监控,如果 Master 节点失效,mirror_queue_slave感知后通过 GM 进行广播;

    (2)存活最久的 Slave 节点会提升自己为 master 节点;

    (3)该节点会创建出新的 coordinator,并通知 GM 进程修改回调处理器为 coordinator;

    (4)原来的 mirror_queue_slave 作为 amqqueue_process 处理生产发布的消息,向消费者投递消息。

    核心流程详解:

    (1)GM 进程挂掉处理

     

    %% 接收到自己左右两边的镜像队列GM进程挂掉的消息
    handle_info({'DOWN', MRef, process, _Pid, Reason},
                State = #state { self          = Self,
                                 left          = Left,
                                 right         = Right,
                                 group_name    = GroupName,
                                 confirms      = Confirms,
                                 txn_executor  = TxnFun }) ->
        %% 得到挂掉的GM进程
        Member = case {Left, Right} of
                     %% 左侧的镜像队列GM进程挂掉的情况
                     {{Member1, MRef}, _} -> Member1;
                     %% 右侧的镜像队列GM进程挂掉的情况
                     {_, {Member1, MRef}} -> Member1;
                     _                    -> undefined
                 end,
        case {Member, Reason} of
            {undefined, _} ->
                noreply(State);
            {_, {shutdown, ring_shutdown}} ->
                noreply(State);
            _ -> timer:sleep(100),
                %% 先记录有镜像队列成员死亡的信息,然后将所有存活的镜像队列组装镜像队列群组循环队列视图
                %% 有成员死亡的时候会将版本号增加一,record_dead_member_in_group函数是更新gm_group数据库表中的数据,将死亡信息写入数据库表
                View1 = group_to_view(record_dead_member_in_group(
                                        Member, GroupName, 
                                        TxnFun)),
                handle_callback_result(
                  case alive_view_members(View1) of
                      %% 当存活的镜像队列GM进程只剩自己的情况
                      [Self] -> maybe_erase_aliases(
                                  State #state {
                                                members_state = blank_member_state(),
                                                confirms      = purge_confirms(Confirms) },
                                               View1);
                      %% 当存活的镜像队列GM进程不止自己(根据新的镜像队列循环队列视图和老的视图修改视图,同时根据镜像队列循环视图更新自己左右邻居信息)
                      %% 同时将当前自己节点的消息信息发布到自己右侧的GM进程
                      _      -> change_view(View1, State)
                  end)
        end.
    

    (2)主镜像队列回调 rabbit_mirror_queue_coordinator处理 GM 进程挂掉

     

    %% 处理循环镜像队列中有死亡的镜像队列(主镜像队列接收到死亡的镜像队列不可能是主镜像队列死亡的消息,它监视的左右两侧的从镜像队列进程)
    handle_cast({gm_deaths, DeadGMPids},
                State = #state { q  = #amqqueue { name = QueueName, pid = MPid } })
      when node(MPid) =:= node() ->
        %% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表
        case rabbit_mirror_queue_misc:remove_from_queue(
               QueueName, MPid, DeadGMPids) of
            {ok, MPid, DeadPids, ExtraNodes} ->
                %% 打印镜像队列死亡的日志
                rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
                                                       DeadPids),
                %% 异步在ExtraNodes的所有节点上增加QName队列的从镜像队列
                rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
                noreply(State);
            {error, not_found} ->
                {stop, normal, State}
        end;
    

    (3)从镜像队列回调 rabbit_mirror_queue_coordinator处理 GM 进程挂掉

     

    %% 从镜像队列处理有镜像队列成员死亡的消息(从镜像队列接收到主镜像队列死亡的消息)
    handle_call({gm_deaths, DeadGMPids}, From,
                State = #state { gm = GM, q = Q = #amqqueue {
                                                             name = QName, pid = MPid }}) ->
        Self = self(),
        %% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表
        case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
            {error, not_found} -> gen_server2:reply(From, ok),
            {stop, normal, State};
            {ok, Pid, DeadPids, ExtraNodes} ->
                %% 打印镜像队列死亡的日志(Self是副镜像队列)
                rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids),
                case Pid of
                    %% 此情况是主镜像队列没有变化
                    MPid ->
                        gen_server2:reply(From, ok),
                        %% 异步在ExtraNodes的所有节点上增加QName队列的副镜像队列
                        rabbit_mirror_queue_misc:add_mirrors(
                          QName, ExtraNodes, async),
                        noreply(State);
                    %% 此情况是本从镜像队列成为主镜像队列
                    Self ->
                        %% 将自己这个从镜像队列提升为主镜像队列
                        QueueState = promote_me(From, State),
                        %% 异步在ExtraNodes的所有节点上增加QName队列的副镜像队列
                        rabbit_mirror_queue_misc:add_mirrors(
                          QName, ExtraNodes, async),
                        %% 返回消息,告知自己这个从镜像队列成为主镜像队列
                        {become, rabbit_amqqueue_process, QueueState, hibernate};
                    _ ->
                        %% 主镜像队列已经发生变化
                        gen_server2:reply(From, ok),
                        [] = ExtraNodes,
                        %% 确认在主节点宕机时否有为完成传输的数据,确认所有从节点都接收到主节点宕机的消息,然后传输未传输的消息。
                        ok = gm:broadcast(GM, process_death),
                        noreply(State #state { q = Q #amqqueue { pid = Pid } })
                end
        end;
    

    (4)主镜像队列挂掉否选取新的主镜像队列

     

    %% 返回新的主镜像队列进程,死亡的镜像队列进程列表,需要新增加镜像队列的节点列表
    remove_from_queue(QueueName, Self, DeadGMPids) ->
        rabbit_misc:execute_mnesia_transaction(
          fun () ->
                   %% 代码运行到这一步有可能队列已经被删除
                   case mnesia:read({rabbit_queue, QueueName}) of
                       [] -> {error, not_found};
                       [Q = #amqqueue { pid        = QPid,
                                        slave_pids = SPids,
                                        gm_pids    = GMPids }] ->
                           %% 获得死亡的GM列表和存活的GM列表
                           {DeadGM, AliveGM} = lists:partition(
                                                 fun ({GM, _}) ->
                                                          lists:member(GM, DeadGMPids)
                                                 end, GMPids),
                           %% 获得死亡的实际进程的Pid列表
                           DeadPids  = [Pid || {_GM, Pid} <- DeadGM],
                           %% 获得存活的实际进程的Pid列表
                           AlivePids = [Pid || {_GM, Pid} <- AliveGM],
                           %% 获得slave_pids字段中存活的队列进程Pid列表
                           Alive     = [Pid || Pid <- [QPid | SPids],
                                               lists:member(Pid, AlivePids)],
                           %% 从存活的镜像队列提取出第一个镜像队列进程Pid,它是最老的镜像队列,它将作为新的主镜像队列进程
                           {QPid1, SPids1} = promote_slave(Alive),
                           Extra =
                               case {{QPid, SPids}, {QPid1, SPids1}} of
                                   {Same, Same} ->
                                       [];
                                   %% 此处的情况是主镜像队列没有变化,或者调用此接口的从镜像队列成为新的主镜像队列
                                   _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
                                       %% 主镜像队列已经变化,当前从队列变更为主队列,信息更新到数据库(mnesia)
                                       Q1 = Q#amqqueue{pid        = QPid1,
                                                       slave_pids = SPids1,
                                                       gm_pids    = AliveGM},
                                       store_updated_slaves(Q1),
                                       
                                       %% 根据队列的策略如果启动的从镜像队列需要自动同步,则进行同步操作
                                       maybe_auto_sync(Q1),
                                       %% 根据当前集群节点和从镜像队列进程所在的节点得到新增加的节点列表
                                   slaves_to_start_on_failure(Q1, DeadGMPids);
                                   %% 此处的情况是主镜像队列已经发生变化,且调用此接口的从镜像队列没有成为新的主镜像队列
                                   _ ->
                                       %% 更新最新的存活的从镜像队列进程Pid列表和存活的GM进程列表
                                       Q1 = Q#amqqueue{slave_pids = Alive,
                                                       gm_pids    = AliveGM},
                                       %% 存储更新队列的从镜像队列信息
                                       store_updated_slaves(Q1),
                                       []
                               end,
                           {ok, QPid1, DeadPids, Extra}
                   end
          end).
    

    四、镜像队列消息同步

    1.消息广播

    消息广播流程如下:

    (1)Master 节点发出消息,顺着镜像队列循环列表发送;

    (2)所有 Slave 节点收到消息会对消息进行缓存(Slave 节点缓存消息用于在广播过程中,有节点失效或者新增节点,这样左侧节点感知变化后会重新将消息推送给右侧节点);

    (3)当 Master 节点收到自己发送的消息后意味着所有节点都收到了消息,会再次广播 Ack 消息;

    (4)Ack 消息同样会顺着循环列表经过所有 Slave 节点,通知 Slave 节点可以清除缓存消息;

    (5)当 Ack 消息回到 Master 节点,对应消息的广播结束。

    核心流程详解:

    (1)GM 组群中消息广播

     

    %% 节点挂掉的情况或者新增节点发送给自己右侧GM进程的信息
    %% 左侧的GM进程通知右侧的GM进程最新的成员状态(此情况是有新GM进程加入Group,但是自己不是最新加入的GM进程,但是自己仍然需要更新成员信息)
    handle_msg({catchup, Left, MembersStateLeft},
               State = #state { self = Self,
                                left = {Left, _MRefL},
                                view = View,
                                members_state = MembersState })
      when MembersState =/= undefined ->
        %% 将最新的成员信息转化成字典数据结构
        MembersStateLeft1 = build_members_state(MembersStateLeft),
        %% 获取左侧镜像队列传入的成员信息和自己进程存储的成员信息的ID的去重
        AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
                            ?DICT:fetch_keys(MembersStateLeft1)),
        %% 根据左侧GM进程发送过来的成员状态和自己GM进程里的成员状态得到需要广播给后续GM进程的信息
        {MembersState1, Activity} =
            lists:foldl(
              fun (Id, MembersStateActivity) ->
                       %% 获取左侧镜像队列传入Id对应的镜像队列成员信息
                       #member { pending_ack = PALeft, last_ack = LA } =
                                   find_member_or_blank(Id, MembersStateLeft1),
                       with_member_acc(
                         %% 函数的第一个参数是Id对应的自己进程存储的镜像队列成员信息
                         fun (#member { pending_ack = PA } = Member, Activity1) ->
                                  %% 发送者和自己是一个人则表示消息已经发送回来,或者判断发送者是否在死亡列表中
                                  case is_member_alias(Id, Self, View) of
                                      %% 此情况是发送者和自己是同一个人或者发送者已经死亡
                                      true ->
                                          %% 根据左侧GM进程发送过来的ID最新的成员信息和本GM进程ID对应的成员信息得到已经发布的信息
                                          {_AcksInFlight, Pubs, _PA1} = find_prefix_common_suffix(PALeft, PA),
                                          %% 重新将自己的消息发布
                                          {Member #member { last_ack = LA },
                                        %% 组装发送的内容和ack消息结构
                                          activity_cons(Id, pubs_from_queue(Pubs), [], Activity1)};
                                      false ->
                                          %% 根据左侧GM进程发送过来的ID最新的成员信息和本GM进程ID对应的成员信息得到Ack和Pub列表
                                          %% 上一个节点少的消息就是已经得到确认的消息,多出来的是新发布的消息
                                          {Acks, _Common, Pubs} =
                          find_prefix_common_suffix(PA, PALeft),
                                          {Member,
                                           %% 组装发送的发布和ack消息结构
                                           activity_cons(Id, pubs_from_queue(Pubs), acks_from_queue(Acks), Activity1)}
                                  end
                         end, Id, MembersStateActivity)
              end, {MembersState, activity_nil()}, AllMembers),
        handle_msg({activity, Left, activity_finalise(Activity)},
                   State #state { members_state = MembersState1 });
    

    (2) GM 进程内部广播

     

    %% GM进程内部广播的接口(先调用本GM进程的回调进程进行处理消息,然后将广播数据放入广播缓存中)
    internal_broadcast(Msg, SizeHint,
                       State = #state { self                = Self,
                                        pub_count           = PubCount,
                                        module              = Module,
                                        callback_args       = Args,
                                        broadcast_buffer    = Buffer,
                                        broadcast_buffer_sz = BufferSize }) ->
        %% 将发布次数加一
        PubCount1 = PubCount + 1,
        {%% 先将消息调用回调模块进行处理
         Module:handle_msg(Args, get_pid(Self), Msg),
         %% 然后将广播消息放入广播缓存
         State #state { pub_count           = PubCount1,
                        broadcast_buffer    = [{PubCount1, Msg} | Buffer],
                        broadcast_buffer_sz = BufferSize + SizeHint}}.
    
    

    (3)缓存消息发送定时器

     

    %% 确保广播定时器的关闭和开启,当广播缓存中有数据则启动定时器,当广播缓存中没有数据则停止定时器
    %% 广播缓存中没有数据,同时广播定时器不存在的情况
    ensure_broadcast_timer(State = #state { broadcast_buffer = [],
                                            broadcast_timer  = undefined }) ->
        State;
    %% 广播缓存中没有数据,同时广播定时器存在,则直接将定时器删除掉
    ensure_broadcast_timer(State = #state { broadcast_buffer = [],
                                            broadcast_timer  = TRef }) ->
        erlang:cancel_timer(TRef),
        State #state { broadcast_timer = undefined };
    %% 广播缓存中有数据且没有定时器的情况
    ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
        TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
        State #state { broadcast_timer = TRef };
    ensure_broadcast_timer(State) ->
        State.
    

    注:当处理消息时,缓存中的内容大小超过100M 则不会等定时器触发,会立刻将消息发给自己右侧的 GM 进程。

    2.消息同步

    ​ 配置镜像队列时有一个属性ha-sync-mode,支持两种模式 automatic 或 manually 默认为 manually。

    ​ 当 ha-sync-mode = manually,新节点加入到镜像队列组后,可以从左节点获取当前正在广播的消息,但是在加入之前已经广播的消息无法获取,所以会处于镜像队列之间数据不一致的情况,直到加入之前的消息都被消费后,主从镜像队列数据保持一致。当加入之前的消息未全部消费完之前,主节点宕机,新节点选为主节点时,这部分消息将丢失。

    ​ 当 ha-sync-mode = automatic,新加入组群的 Slave 节点会自动进行消息同步,使主从镜像队列数据保持一致。



    作者:jaredCoder
    链接:https://www.jianshu.com/p/f917067bcee3
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    展开全文
  • RabbitMQ(六)镜像队列

    2020-10-10 15:43:20
    一、镜像队列 默认情况下,RabbitMQ集群中的队列只会存储在某一个节点上,就是队列声明的那个节点上。当访问集群中的其他节点时,会把请求转发给这个节点来进行处理。当这个节点故障时,集群中的这个队列就表现为不...
    一、镜像队列
    默认情况下,RabbitMQ集群中的队列只会存储在某一个节点上,就是队列声明的那个节点上。当访问集群中的其他节点时,会把请求转发给这个节点来进行处理。当这个节点故障时,集群中的这个队列就表现为不可用。队列可以在多个节点中复制镜像以保障可用性,称之为镜像队列。
     
    每一个镜像队列由一个master和若干个slave组成。队列的master通常存储在集群的主节点上,没个队列有自己的主节点,镜像队列的所有操作都会首先在mastEr上执行然后广播给其他镜像。包括消息入队,推送给消费者、和消费确认等。
     
    生产者发送的消息会在所有的镜像中存储一份副本,消费者不论连接哪个节点最终都会在master上操作,一旦master确认消费(ack)以后,镜像队列会丢弃这条消息。因此镜像队列虽然增加了可用性(存在多个可用副本),但是多个节点间并没有分摊负载,因为所有节点都会处理全量的消息。
     
    如果镜像队列的master宕机了,最老的镜像将会晋升为 新的master。未同步的镜像也可以晋升为master,取决于队列的镜像参数。
     
    二、如何配置镜像队列
    镜像参数通过policy来配置,一个policy通过正则表达式匹配一个或多个队列。
     
    命令行设置:e.g.:
    rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
    控制台设置:
     
    参数说明
    ha-mode
    ha-params
    Result
    exactly
    count
    Number of queue replicas (master plus mirrors) in the cluster. A  count  value of 1 means a single replica: just the queue master. If the node running the queue master becomes unavailable,  the behaviour depends on queue durability . A  count  value of 2 means 2 replicas: 1 queue master and 1 queue mirror. In other words: `NumberOfQueueMirrors = NumberOfNodes - 1`. If the node running the queue master becomes unavailable, the queue mirror will be automatically promoted to master according to the  mirror promotion strategy  configured. If there are fewer than  count  nodes in the cluster, the queue is mirrored to all nodes. If there are more than  count  nodes in the cluster, and a node containing a mirror goes down, then a new mirror will be created on another node. Use of `exactly` mode with  `"ha-promote-on-shutdown": "always"`  can be dangerous since queues can migrate across a cluster and become unsynced as it is brought down.
    all
    (none)
    Queue is mirrored across all nodes in the cluster. When a new node is added to the cluster, the queue will be mirrored to that node. This setting is very conservative. Mirroring to a quorum (N/2 + 1) of cluster nodes is  recommended instead . Mirroring to all nodes will put additional strain on all cluster nodes, including network I/O, disk I/O and disk space usage.
    nodes
    node names
    Queue is mirrored to the nodes listed in  node names . Node names are the Erlang node names as they appear in  rabbitmqctl cluster_status ; they usually have the form "rabbit@hostname". If any of those node names are not a part of the cluster, this does not constitute an error. If none of the nodes in the list are online at the time when the queue is declared then the queue will be created on the node that the declaring client is connected to.
    三、多少个镜像是最优的
    在所有的节点上设置镜像是最保守的策略,可用性最高,但是会给集群中的所有节点带来额外的压力,包括网络IO、磁盘IO和硬盘空间占用。大多数场景下都没有必要在每个节点上都存储一份镜像。
    一般来说推荐设置过半节点的镜像,例如3节点集群设置2个镜像,5节点集群设置3个镜像。
    一些瞬变的数据或者时间敏感的数据,比如股票净值数据,最好设置少量的镜像甚至不要使用镜像。
     
    四、怎么检查队列是否是镜像状态
    镜像队列会在后台管理页面显示策略名称和额外的副本数量,例如下面这个队列存在两个副本,即一主一从
     
    队列详情
     
    如果仅有的一个镜像节点宕机了
     
    当添加了一个镜像队列的时候,会打印如下日志
    2018-03-01 07:26:33.121 [info] <0.1360.0> Mirrored queue 'two.replicas' in vhost '/': Adding mirror on node hare@warp10: <37324.1148.0>
     
    5.队列master定位节点
    rabbitmq中的每个队列有一个primary副本,那个副本所在的节点称之为队列master。所有队列的操作都要首选通过master执行然后传播给其他镜像,为了保证消息的FIFO顺序。
    队列master可以使用几种不同的策略分布在集群的节点上, 三种声明策略的方式如下:
    1. 使用x-queue-master-locator队列声明参数
    2. 设置queue-master-locator策略key
    3. 在配置文件中定义queue_master_locator
    可选择的策略类型:
    • min-masters 选择承载了队列master数量最少的节点
    • client-local 选择客户端声明队列时连接上的那个节点
    • random 随机选择一个节点
     
    6.节点策略和迁移master
    如果新的策略里面没有指定原来的master所在的节点,设置和修改队列策略可能导致存在的队列master下线。为了保证消息不丢失,rabbitmq会保留现有的master直到至少有一个镜像已经同步,即使同步需要很长时间。一旦同步成功,原master就会下线,随着原来的队列master下线,消费者将会从原来的master丢失连接并且重连。
    例如:原来队列在【A,B】节点,A是master,B是mirror。如果我们这个时候设置了新的policy定位在【C,D】节点,设置完成以后,队列将会存在【A,C, D】节点上,等到C或D节点上的镜像队列同步完成,A上面的队列将会下线。
     
    7.排他队列的镜像
    当声明排他队列的连接关闭的时候,排他队列将会被删除,所以对一个排他队列做镜像或者持久化是没有意义的。因为一旦承载他的节点宕机了,声明他的连接就会被关闭,随之队列将会被删除。
    所以排他队列永远不会配置镜像副本或者持久化。
     
    8.集群中的非镜像队列
    非镜像队列如果队列的master可用的时候(队列存储的那个节点),客户端可以连接任意一个节点对当前队列进行操作,包括声明、绑定、消费管理和消息路由,对当前队列的操作将会被集群路由到对应master节点上进行执行。一旦master节点不可用,
    如果队列是持久化的队列,队列将会保持不可用状态直到节点恢复。所有对队列的操作将会失败
    如果队列没有持久化,队列将会被删除。
    如果想要保证在任何时候队列依然可用,可以把镜像队列配置为  promoted to master even when not in sync 即使镜像没有同步也可以晋升为master
     
    9.配置镜像队列的几种方式
    例如:想要声明一个名称为“ha-two”的策略,策略内容是匹配队列名以“two.”开头的队列,在集群中任意2个节点上保持镜像,并且自动同步
    9.1命令行方式
         rabbitmqctl set_policy ha-two "^two\." \
    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
     
    9.2发送HTTP请求
         PUT /api/policies/%2f/ha-two
    {
    "pattern":"^two\.",
    "definition": {
    "ha-mode":"exactly",
    "ha-params":2,
    "ha-sync-mode":"automatic"
    }
    }
     
    9.3后台管理页面上声明
    • Navigate to Admin > Policies > Add / update a policy.
    • Enter "ha-two" next to Name and "^two\." next to Pattern.
    • Enter "ha-mode" = "exactly" in the first line next to Policy, then "ha-params" = 2 in the second line, then "ha-sync-mode" = "automatic" in the third, and set the type on the second line to "Number".
    • Click Add policy.
    10.镜像队列主从切换
    当队列master不可用
    1.运行时间最长的镜像将会晋升为master,因为这个镜像最后可能从master全量同步。如果没有镜像完成了全量同步,那么仅存在于master上的消息将会丢失
    2.之前连上镜像节点的所有客户端连接都会突然中断。所有发送给客户端但是还没有ACK的消息将会重新入队。这里面包括客户端已经发出ACK但是在发送给master过程中master宕机的,也包括从master广播给镜像队列过程中丢失的消息。无论是哪种情况,新晋升的master只能重新入队还没有ACK的消息。
    3.那些监听了cancel事件的消费者将会被通知队列下线
    4.因为第二步过程中消息的重新入队,消费者有可能会重复消费到之前已经消费过的消息
    5.随着选择的镜像晋升成为master以后,这时发送给镜像队列的消息就不会丢失了。消息发送给镜像节点的操作将会路由到队列master然后广播给其他镜像,如果master再宕机,一旦新的镜像晋升为master以后,发送给镜像的消息将会重新加入到队列中,重复这个过程。
    6.在消息正在发布并且客户端已经接受到确认的时候,如果master或镜像节点下线了,同步还没有完成,客户端发送的消息依然会被确认。从这点上来看,发送给镜像队列和非镜像队列没有区别
     
    如果消费者使用自动签收模式,消息可能会丢失,这一点和非镜像队列没有区别。因为broker认为自动签收模式下,消息只要发送到消费者了就认为是确认发送成功了。
     
    镜像队列的master如果突然挂了,客户端的连接突然中断,如果消费者是自动签收模式,这个时候正在发送给客户端的消息可能永远也不会被消费者接收到了
     
    11.master节点故障消费感知
    消费者正在消费镜像队列的消息时,如果发生了故障转移,主从切换,哪条消息(发送中的)被发送到哪个消费者的记录将会丢失,因此所有未确认的消息被会被标记 redelivered并且重新发送。
    如果消费者想要感知这一行为,可以设置 x-cancel-on-ha-failover参数为true,然后在故障转移的时候,消费行为将会被取消,将会接收到cancel通知。
     
    java代码示例
     
    Channel channel = ...;
    Consumer consumer = ...;
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-cancel-on-ha-failover", true);
    channel.basicConsume("my-queue", false, args, consumer);
     
     
    展开全文
  • “对于 RabbitMQ 的节点来说,有单节点模式和集群模式两种,其中集群模式又分为普通集群模式和镜像队列集群模式,而镜像队列集群模式的搭建步骤和普通集群模式是基本相同的,唯一不同的是,镜...
  • RabbitMQ集群实现镜像队列(高可用)

    千次阅读 2021-12-14 16:16:03
    镜像队列 为什么要使用镜像? 如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性...
  • 推荐阅读: 这套Github上40K+star学习笔记,可以帮你搞定95%以上的Java面试 毫不夸张的说,这份SpringBoot学习指南能解决你遇到的98%的问题 给跪了!这套万人期待的 SQL 成神之路...Mirror镜像队列,目的是为了保证...
  • 1.使用镜像的原因 如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable...引入镜像队列
  • RabbitMQ镜像队列与负载均衡

    万次阅读 2021-07-18 20:15:18
    镜像队列 RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的...
  • 配置镜像队列有两种方式,一是在代码中进行配置,二是使用管理命令进行配置。 一、在代码中进行配置: 1、创建一个fanout类型交换器的生产者,连接node1节点: 2、创建一个消费者,消费生产者生产的消息,消费者有...
  • RabbitMQ的镜像队列配置

    千次阅读 2021-12-14 20:08:32
    1、浏览器登录rabbitmq的web管理界面 ...2、 找到Add/update a policy 3、 设置如下:Pattern的设置一定要这样 4、 完成添加后 ... //测试镜像队列 public static final String QUEUE_NAME = "mirrior_
  • 镜像队列

    2019-06-03 20:31:00
    可以将所有的消息都设置为持久化,并且对应的队列也可以将durable属性设置为true,但是这样仍然无法避免由于缓存的问题:因为在消息发送后和被写入磁盘并执行刷盘动作之间存在一个短暂却会产生问题的时间窗。...
  • RabbitMQ之惰性队列与镜像队列

    千次阅读 多人点赞 2020-04-04 00:58:18
    文章目录1、惰性队列1.1、使用场景1.2、定义1.3、队列模式1.4、工作流程1.5、总结2、镜像队列2.1、消息流转过程2.2、负载均衡2.3、消息的可靠性2.4、GM协议2.5、镜像队列宕机2.6、镜像队列启动与停止顺序在这里插入...
  • 对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会...
  • app)6、从节点加入集群(主节点host名称为a)7、管理工具查看集群节点8、设置镜像队列三、问题1、.erlang.cookie2、集群启动命令报错(rabbitmqctl join_cluster)3、使用rpm -i 安装erlang报错:error: Failed ...
  • RabbitMQ集群+ 镜像队列,并借助HAProxy 实现负载均衡的集群 一、集群管理 1. 环境介绍 节点名称 地址信息 mq1 192.168.80.16 mq2 192.168.80.17 mq3 192.168.80.18 单机部署 单机版安装地址:...
  • RabbitMQ之镜像队列

    万次阅读 2017-05-02 19:39:56
    镜像队列的设置 镜像队列的配置通过添加policy完成,policy添加的命令为: rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority] -p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: ...
  • RabbitMQ进阶 RabbitMQ进阶,是对RabbitMQ基础进行一个...镜像队列 在RabbitMQ集群模式下,创建的队列是某一个节点下的,并不是集群节点之间共享的,所以当某个节点宕机那么该节点下的所有队列也无法访问了,通过镜像
  • 目录1、集群架构1.1、普通集群(副本集群)1.2、普通集群搭建1.3、镜像集群1.4、 镜像集群怎么搭? 1、集群架构 1.1、普通集群(副本集群) 1、master主节点上的所有数据都会同步到Slave从节点上,但是有一个意外,就是...
  • RabbitMQ的mirror queue(镜像队列)机制是最简单的队列HA方案,它通过在cluster的基础上增加ha-mode、ha-param等policy选项,可以根据需求将cluster中的队列镜像到多个节点上,从而实现高可用,消除cluster模式中队列...
  • 图解集群 一个好消息是,RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式...我们只需在搭建好集群后,添加一个镜像队列的策略即可。 ❷ 如何实现多个节点的透明代理?如何实现各个节
  • 镜像队列不是负载均衡,镜像队列无法提升消息的传输效率,或者更进一步说,由于镜像队列会在不同节点之间进行同步,会消耗消息的传输效率。 对exclusive队列设置镜像并不会有任何作用,因为exclusive队列是连接...
  • 5、设置镜像队列策略 在以上任意节点执行 $ rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' 这行命令创建了一个策略,策略名称为ha-all,“^”为正则表达式,表示所有匹配所有队列名称,策略...
  • 第三节课:Rabbitmq镜像队列搭建 开发应用场景一:集群节点安装1.1)集群节点安装(老师的是192.168.159.86;在这里老师是以我的86服务器单台
  • RabbitMQ——镜像队列问题(一)

    千次阅读 2019-10-23 23:48:54
    最近在使用镜像队列的过程中遇到了一些坑,通过阅读相关源码,大量的测试,不敢说对其中的原理掌握得非常透彻, 但基本能分析定位问题的原因,并且能自圆其说。这里整理总结下, 方便后续的回溯。欢...
  • RabbitMQ中队列有两种模式  1.默认 Default   2.镜像 Mirror 【类似于mongoDB,从一直在通过主的操作日志来进行同步】 *如果将队列定义为镜像模式,那么这个队列也将区分主从,从而做到...如何配置镜像队列...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 56,596
精华内容 22,638
关键字:

镜像队列