精华内容
下载资源
问答
  • 异步 解耦 解耦前: 削峰

    异步体现在异步消费消息;解耦体现在不用修改代码进行扩展

    异步

    同步发送:

    异步消费:

    解耦

    解耦前:

    解耦后:

    总结:通过MQ的发布-订阅模型,系统A和其他系统彻底解耦了

    削峰

    削峰前后:

    消息队列带来的问题:

    1、系统可用性降低:
    系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!

    2、系统复杂性提高:
    加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!

    3、一致性问题:
    我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了

     

    展开全文
  • 现实世界的例子 消息队列主要由以下作用:解耦削峰异步,其实还有一个作用是提高接收着性能。 我们以一个快递员送快递的栗子来描述下队列的作用。 送快递送出了烦心事 快递员给小明送快递分为几步? 分为3步, ...

    随着技术的发展分布式系统已经成为标配,分布式系统就存在着各式各样的进程间通信。消息对列实际上就是进程间通信方式的一种,是生产者消费者模式在分布式场景下的实现。

    现实世界的例子

    消息队列主要由以下作用:解耦,削峰,异步,其实还有一个作用是提高接收着性能。

    我们以一个快递员送快递的栗子来描述下队列的作用。

    送快递送出了烦心事

    快递员给小明送快递分为几步?

    分为3步,

    第一步把快递拿到小明家门口(省略了前n步,从小明家楼下开始)

    第二步敲门(类比编程世界的调用第三方接口)

    第三步小明开门拿走快递(第三方接口执行过程)

    好了上边是送快递最简单的三步,让我们想想,这简单的三步会有什么问题?

    耦合:

    快递员什么时候完成这一单或者是否能顺利完成,十分依赖于小明的相应速度。如果小明还没起床,听见敲门声再穿衣服开门,可能消耗很多时间。如果小明没在家呢?那就要配送失败了,如何判断配送失败呢?快递员需要判断等多久开门(超时时间),打电话判断是否在家(健康检查),最终郁闷的离开,下次再来一次(重试)。
    快递员直接与小明交互,对小明的状态强依赖,产生了耦合现象。那有办法避免这种耦合呢?

    同步影响性能

    快递员的配送速度收到小明的响应速度影响极大,有一两个需要长时间等待的快件,快递员的配送效率(吞吐率)会收到很大影响。

    高峰期负载很高

    双11,618,每次到购物节的时候,快递员都很烦躁。快递太多,来的比送得快,这可如何是好。
    小明也很烦躁,一天要收100个快递,可是家里的空间都满了,要边收拾出地方边进一件快递。

    接收方还有其他事情

    如果小明准备和女朋友告白,此时来了一阵敲门声,你好,快递。
    – 还双11,小明买了100件商品,明天不定时一件件送到,小明这一天都要搭进去了。

    接收快递也成为了一件烦心事,好想把其他事情处理完再收快递,也好想一块收100件快递。

    该消息队列登场了

    现在快递员和小明都很烦躁,这个时候有个叫X巢(没收广告费)的快递柜出现了,快递员可以把快递放到柜子里,发条短信通知小明过来取快递。小明看到短信可以先做自己的事情,有空的时候过来拿走快递。

    终于,我们再看到小明和快递员的时候每个人都笑容满面。

    这里的快递柜就相当于是编程世界的消息队列,让我们看看消息队列到底起到了什么作用。

    解偶

    此时,快递员只需要把快递放到柜子里,不需要关心小明是否在家,是否在睡觉。小明也不需要一直等待给快递员开门,两个人解耦了。

    异步

    快递员把快递放到柜子里发个信息就可以去送下一件,不需同步等待结果。

    这样每个快递的处理速度(响应时间)都变得极短,每天送的快递数量(吞吐量)也变多了。

    削峰。

    这次又到了双十一,小明还是一天要到100个快递,由于小明一天只能消化10个快递,剩下的就放在了柜子里,等10天后才拿完。

    快递员由于是异步送快递,双11根本不是事,这点吞吐量完全搞得定。

    提高消费端性能

    小明以前需要一件一件收取快递,现在放在了柜子(队列)里,那等攒够了10件去取一次(buffer->reduce),好省时间!其他时间都可以快快乐乐约会了。

    总结

    让我们再来总结一下异步消息队列的作用

    解耦,生产端和消费端不需要相互依赖
    异步,生产端不需要等待消费端响应,直接返回,提高了响应时间和吞吐量
    削峰,打平高峰期的流量,消费端可以以自己的速度处理,同时也无需在高峰期增加太多资源,提高资源利用率
    提高消费端性能。消费端可以利用buffer等机制,做批量处理,提高效率。

    展开全文
  • 如何解耦异步为什么需要异步如何异步削峰为什么要削峰如何削峰 解耦 项目中为什么需要解耦? 讲一个真实的项目案例,项目是一个根据国际黄金行情进行实时交易的系统,分了两个微服务,一个是纸黄金实时交易系统,一...

    解耦

    项目中为什么需要解耦?

    讲一个真实的项目案例,项目是一个根据国际黄金行情进行实时交易的系统,分了两个微服务,一个是纸黄金实时交易系统,一个是实物黄金交易系统,两个系统都需要接收实时黄金行情数据,得到用户的交易价格,并且都有一个需求,能人工控制开市和闭市,也就是什么时候可以交易,什么不可以交易,那么这个时候就需要在后台管理系统进行控制,如果没有解耦,那么后台管理系统就需要直接通过dubbo调用纸黄金交易系统和实物黄金交易系统接口,告诉两个系统要开市还是要闭市,这样做的问题在于,如果因为公司业务的拓展,如果又增加一个其他的系统(例如实物白银交易系统),那我们怎么办,我们只能修改后台管理系统的代码,再调用实物白银交易系统的接口,告诉它开闭市,那么要是再多一个呢,中间又因为某一些原因,减少了一个系统呢,维护后台管理系统的同学可能要崩溃了,需要不停的修改代码,这就是没有解耦的极端的情况,无论是增加微服务还是减少都比较麻烦,系统越多越不好控制,越难维护,那我们怎么办?就是通过mq来进行解耦,下面是没有使用mq时的场景
    在这里插入图片描述

    如何解耦

    那么如何解耦?在后台管理系统和其他系统之间增加一层mq,无论是开市指令还是闭市指令,都直接发送到队列中,然后其他系统需要开闭市操作的就要订阅监听这个队列,这样无论是增加其他的业务系统还是减少业务系统,都不需要修改后台管理系统,负责后台管理系统的同学就轻松多了,其他的系统只要接收到指令进行相应的操作就可以了
    在这里插入图片描述

    异步

    为什么需要异步

    消息队列的主要特点是异步处理,主要目的是减少请求响应时间,实现非核心流程异步化,提高系统响应性能。

    所以典型的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作,作为消息放入消息队列。

    如何异步

    就像上面讲的为什么需要异步一样,我理解的异步主要是一些非核心的流程可以进行异步话,比如一个系统的注册方法,注册的时候,发短信通知,发邮件通知,也可能会发一些优惠券,那么就可以异步的处理这些操作,尤其是发短信,发邮件,如果是同步的,可能会因为短信服务,邮件服务出现问题导致注册失败,这些本来是锦上添花的操作却影响了主流程,如果使用异步处理,不仅缩短了相应时间,即使他们短暂的出现问题,也不会影响主流程,所以很多情况下,使用mq进行异步的处理还是非常有必要的
    下面是一般的做法,不仅发邮件和短信有可能影响主流程,也增加了主流程的相应时间
    传统的做法
    改造过之后的,接口的相应时间也会缩短
    改造后的流程图
    这里需要注意的是,需要考虑消息丢失造成的影响,也就是说不管什么原因,造成邮件服务和短信服务没有接收到消息,一般的做法是通过其他的分布式定时任务来定时的扫描,超过一定时间没有进行异步处理操作,如果在一定时间里没有处理发邮件和发短信,那么通过定时任务及时弥补,做到万无一失

    削峰

    为什么要削峰

    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

    如何削峰

    在这里插入图片描述
    在这里我们增加一个消息队列,这样的话不管你请求来多少,我先存入消息队列,然后我再让系统慢慢的处理你的请求(如右图),这样很好的减缓了数据库的访问压力。
    这里需要注意的是这里是一个队列上对应多个消费者,每个消费者每次只消费一条消息,是一个工作队列模式,这样才能有效控制一下子所有的请求都跑到数据库中,下一篇文章,可以更加深入的在研究讨论一下如果做削峰

    展开全文
  • 为什么要引入MQ消息中间件 ...能够削峰: 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮. 能够异步: 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力. MQ的处理.
    1. 为什么要引入MQ消息中间件
    • 传统的系统之间直接调用在实际工程落地中存在许多问题
      • 系统之间耦合比较严重.
      • 面对大流量并发时,容易被冲垮.(每个接口模块的吞吐能力是有限的)
      • 等待同步存在性能问题
    • 为了解决上述的问题,所以引入MQ达到以下几个目标:
      • 能够解耦: 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小.
      • 能够削峰: 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮.
      • 能够异步: 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力.
    1. MQ的处理流程
      发送者把消息发送给消息服务器,消息服务器把消息存放在若干队列/主题topic中,在合适的时候,消息服务器会将消息转发给接受者,在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期没有必然关系;尤其是在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者.

    2. activeMQ两个端口

    • java用的tcp端口: 61616
    • activeMQ图形化后台管理系统界面端口:8161
    1. ActiveMQ启动暂停的方式
    • 开启ActiveMQ

      • 普通启动: 在ActiveMQ的bin目录下执行
      ./activemq start
      
      • 按照不同的conf配置文件启动
      ./activemq start xbean:file:配置文件目录
      
      • 带运行日志的启动方式: ActiveMQ的bin目录下执行
      ./activemq start > ../data/activemq.log
      
    • 关闭ActiveMQ
      在ActiveMQ的bin目录下执行./activemq stop

    1. ActiveMQ图形化界面
    • 访问ActiveMQ图形化界面命令: 启动ActiveMQ后,访问http://ip: port, 本机上一般为localhost:8161,用户名密码为admin
    1. ActiveMQ中队列和主题topic的区别
    • 在点对点的消息传送域中,目的地被称为队列queue
    • 在发布订阅消息传送域中,目的地被称为主题topic
      在这里插入图片描述
    1. topic模式和queue模式比较
      在这里插入图片描述

    2. activeMQ的broker
      相当于一个activeMQ的服务器实例.
      broker其实就是实现了用代码的形式启动activeMQ将MQ嵌入到java代码中,以便随时用随时启动,在用的时候再去启动,这样能节省资源,也保证了可靠性.

    3. activeMQ broker使用代码

      package com.atguigu.activemq.Embed;
      import org.apache.activemq.broker.BrokerService;
      
      public class EmbedBroker {
          public static void main(String[] args) throws Exception {
              // 将迷你版的activeMQ嵌入到java程序中
              BrokerService brokerService = new BrokerService();
              // jmx: Java Management Extensions 它是一个Java平台的管理和监控接口
              brokerService.setUseJmx(true);
              brokerService.addConnector("tcp://localhost:61616");
              brokerService.start();
          }
      }
      

      注: 启动该activeMQ服务,然后启动生产者和消费者便能消费消息.

    4. activemq传输协议(重点是TCP和NIO)

    • TCP(Transmission Control Protocal 传输控制协议,默认)

      • 默认的Broker(MQ服务器实例)配置,TCP的client监听端口为61616
      • 在网路传输数据前,必须要序列化数据,消息是通过一个叫wire protocal的协议来序列化成字节流.默认情况下ActiveMQ把wire protocal叫做OpenWire,它的目的是促使网络上的数据快速交互.
      • TCP的连接URL形式为: tcp:hostname:port?key=value&key=value 后面参数可选
      • TCP传输的优点
        • 可靠性高、稳定性强
        • 高效性: 字节流方式传递,效率高
        • 有效性、可用性: 应用广泛,支持任何平台
      • 关于TCP协议的可配置参数参考activemq官网
    • NIO(New I/O API Protocal)

      • NIO协议和TCP协议类似但是NIO协议更侧重于底层的访问操作,它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载.
      • 适合NIO协议的场景
        • 可能有大量的client去连接Broker,一般情况下,大量的client去连接Broker是被操作系统的线程所限制的.因此,NIO的实现比TCP需要更少线程去运行,所以建议使用NIO协议.
        • 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能.
      • NIO的连接URI形式: nio://hostname:port?key=value
      • Transport Connector配置示例,参考activemq官网
      • 使用NIO需要在activemq.xml配置文件中配置如下内容
      <broker>
        ...
        <transportConnectors>
          <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>  
        </<transportConnectors>
        ...
      </broker>
      
    • AMQP(Advanced Message Queuing Protocal 高级消息队列协议)

      • 一个提供同一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制.
        比TCP协议的适用性更广.
    • stomp(Streaming Text Orientated Message Protocal 流文本定向消息协议)
      一种为MOM(Message Oriented Middleware 面向消息的中间件)设计的简单文本协议.

    • Secure Sockets Layer Protocal(SSL 安全套接字协议)

      • URL形式: ssl://hostname:port?key=value
      • 使用SSL需要在activemq.xml配置文件中配置如下内容
      <broker>
        ...
        <transportConnectors>
          <transportConnector name="ssl" uri="ssl://localhost:61618?trace=true"/>  
        </<transportConnectors>
        ...
      </broker>
      
    • mqtt(Message Queuing Telemetry Transport 消息队列遥测传输协议)
      IBM开发的一个即使通讯协议,有可能成为物联网的重要组成部分.该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和制动器(比如通过Twitter让房屋联网)的通信协议

    • ws(WebSockets)协议
      用于前端

    • 总结
      在这里插入图片描述

    1. 使用activemq的NIO传输协议
    • 修改activemq.xml配置文件
      在这里插入图片描述
      注: 端口不能冲突.原本openwire(即TCP)的端口为61616,因为使用了docker,容器端口映射没添加成功,所以为了演示NIO,NIO使用了61616端口.

    • 生产者和消费者代码只需要修改URL即可.其余代码参考:springboot整合activemq的queue/topic代码

    server:
      port: 6666
    
    spring:
      activemq:
      	# 只需要修改该行
        broker-url: nio://localhost:61616
        user: admin
        password: admin
      jms:
        pub-sub-domain: true   # true代表topic,false代表queue
    
    # 自己定义的主题名称
    mytopic:  boot-activemq-topic
    
    1. NIO加强
    • 默认的NIO是基于TCP的,想要让NIO支持其它协议,可以使用auto+nio

    • 修改activemq.xml配置文件(注意端口不要冲突)

      <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61606?maximumConnections=1000" />
      
    • 在代码中修改url为tcp/nio或其他协议即可使用相应协议

      server:
        port: 6666
      
      spring:
        activemq:
        	# 切换BIO的TCP协议和NIO的TCP协议只需要修改下面一行即可,如果使用其他协议,java代码不太一样.
          # broker-url: nio://localhost:61616
          broker-url: tcp://localhost:61616
          user: admin
          password: admin
        jms:
          pub-sub-domain: true   # true代表topic,false代表queue
      
      # 自己定义的主题名称
      mytopic:  boot-activemq-topic
      

    注: 其余代码参考:springboot整合activemq的queue/topic代码

    1. activemq消息持久化: mq服务器宕机了,消息不会丢失的机制
    • 目的: 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都采用持久化机制
    • activemq持久化机制: JDBC、AMQ、KahaDB、LevelDB
    • 持久化流程
      在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件,内存数据库或者远程数据库等,再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送.
      消息中心启动后首先要检查指定的存储位置,如果有未发送成功的消息,则需要将消息发送出去.
    1. activemq持久化机制
    • 官网

    • KahaDB消息存储: 基于日志文件,从ActiveMQ5.4开始默认的持久化插件

      • 说明
        • KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
        • 消息存储使用一个事务日志(正文)和仅仅用一个索引文件(目录)来存储它所有的地址。
        • KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
        • 数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
      • KahaDB的存储原理(4+1: 4类文件1把锁)
        KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。
        在这里插入图片描述
      • db-number.log
        KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
      • db.data
        该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number.log里面存储的消息。
      • db.free
        当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID
      • db.redo
        用来进行消息恢复,如果KahaDB消息存储在强制退出后(可能有些内存中的数据没有存储到磁盘)启动,用于恢复BTree索引。
        记录的内容为物理数据页面修改的信息,用数据库中的操作举例(该处不适用,只为解释),如执行下面语句:
        update table set a = 1 where id = 1;
        
        就会记录一条日志:
        把第10表空间的第90号页面的偏移量为1024处的值更新为1
        
      • lock
        文件锁,表示当前获得kahadb读写权限的broker。
    • JDBC消息存储

      • 添加mysql数据库的驱动包到activemq的lib文件夹下(-P ./ 表示下载到当前目录下)
        wget  -P ./ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.17/mysql-connector-java-8.0.17.jar
        
      • activemq.xml配置文件之jdbcPersistenceAdapter配置
        修改前:
        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
        
        修改后:
        <persistenceAdapter> 
            <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
        </persistenceAdapter>
        

      注: dataSource:是指定将要引用的持久化数据库的bean名称。
      createTableOnStartup:是否在启动的时候创建数据表,默认是true,这样每次启动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false。

      • activemq.xml配置文件之数据库连接池配置
        在这里插入图片描述
      • 创建数据库和数据表
        • 创建数据库: create database activemq
        • 创建数据表: 配置文件中的createTableOnStartup默认为true,重启activemq就会自动创建三张表.
        • ACTIVEMQ_MSGS(queue和topic消息都存在该表中)
          在这里插入图片描述
        • ACTIVEMQ_ACKS(存储订阅关系.如果是持久化topic,订阅者和服务者的订阅关系在这个表保存)
          在这里插入图片描述
        • ACTIVEMQ_LOCK
          在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待.Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker
          字段有两个: ID、BrokerName
      • 对于queue而言,开启生产者生产消息,数据库中便持久化未消费的消息,当开启消费者消费消息时,数据库中已消费的消息清除.
      • 对于topic而已,先开启消息消费者,再开启消息生产者,消息即使被消费了也会存储到数据表中.
    • AMQ消息存储: 基于文件的存储方式,以前的默认消息存储,现在不用了
      AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

    • LevelDB消息存储(了解)
      这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引
      默认配置如下:

      <persistenceAdapter>
            <levelDB directory="activemq-data"/>
      </persistenceAdapter>
      
    1. activemq消息持久化之JDBC with journal
    • ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。避免每次生产者产生新消息,都需要写库和读库(对于queue,如果数据库中的某条消息消费掉,则从库中删除).当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
    • 举例说明: 生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。一般会等待7-10分钟,如果消息还没有被消费,便会写入数据库中.
    • activemq.xml配置(将之前配置的persistenceAdapter注释掉了)
      在这里插入图片描述
    1. 基于zookeeper + replicated-leveldb-store(带复制的leveldb持久化)搭建ActiveMQ集群
    • 集群仅提供主备方式的高可用集群功能,避免单点故障

    • 集群原理示意图
      在这里插入图片描述

    • 解释

      • 使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。
      • 如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Maste的Slaves。
      • 如果Master宕机得到了最新更新的Slave会变成Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。
      • 所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。
      • 所以,如给你配置了replicas=3,那么法定大小是(3/2)+1 = 2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success。
      • 有一个node要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定mode在线以能够找到拥有最新状态的ode,这个ode才可以成为新的Master。
      • 推荐运行至少3个replica nodes以防止一个node失败后服务中断。
    • 部署步骤

      • 环境:

        • mac11.1
        • jdk1.8
        • zookeeper
        • activemq
      • 集群部署规划列表

      主机 zookeeper集群端口 AMQ集群bind端口 AMQ消息tcp端口 AMQ管理控制台端口 AMQ节点安装目录
      192.168.145.3 2181 bind=“tcp://0.0.0.0:63631” 61616 8161 /mq_cluster/mq_node01
      192.168.145.3 2182 bind=“tcp://0.0.0.0:63632” 61617 8162 /mq_cluster/mq_node02
      192.168.145.3 2183 bind=“tcp://0.0.0.0:63633” 61618 8163 /mq_cluster/mq_node03
      • 关闭防火墙并保证可以ping通activemq服务器

      • 要求具备zookeeper集群并可以成功启动(配置见zookeeper)

        • 写一个shell脚本: zk_batch_start.sh,直接执行该脚本即可启动zookeeper集群.
        #!/bin/sh
        cd /usr/local/zk-cluster/zk1/bin
        ./zkServer.sh start
        
        cd /usr/local/zk-cluster/zk2/bin
        ./zkServer.sh start
        
        cd /usr/local/zk-cluster/zk3/bin
        ./zkServer.sh start
        
        • 写一个shell脚本: zk_batch_stop.sh,直接执行该脚本即可关闭zookeeper集群
        把上面的脚本内容中的start改为stop即可
        
        • 给上述两个脚本执行权限
        sudo chmod 700 ./zk_batch_start.sh
        sudo chmod 700 ./zk_batch_stop.sh
        
      • 创建3台activemq集群目录

        mkdir mq_cluster
        cd mq_cluster
        sudo cp -r /usr/local/apache-activemq-5.16.0/ mq_node01
        sudo cp -r /usr/local/apache-activemq-5.16.0/ mq_node02
        sudo cp -r /usr/local/apache-activemq-5.16.0/ mq_node03
        
      • 修改管理控制台ip地址和端口(jetty.xml)

        • host默认为localhost,导致不能使用ip地址访问activemq图形化界面.而activemq集群部署在虚拟机中,所以本机无法访问.所以要将该处的ip地址改为虚拟机的ip地址.
        • mqnode01不变,管理控制台(图形化展示界面)端口为8161.mqnode02管理控制台端口改为8162,mqnode03管理控制台端口改为8163.
          在这里插入图片描述
      • hostname名字映射(修改ubuntu虚拟机中的/etc/hosts文件)

        # 192.168.145.3为ubuntu虚拟机的ip地址
        192.168.145.3 mq-server
        
      • activemq集群配置

        • 3个节点的BrokerName全部一致(activemq.xml)
          在这里插入图片描述
        • 3个节点的持久化配置(activemq.xml)
          将默认的kahaDB注释掉,然后粘贴如下代码.bind端口号分别为63631、63632、63633,其余配置一样
          <persistenceAdapter>
              <replicatedLevelDB
              	# 持久化数据存放地址
                  directory="${activemq.data}/leveldb"
                  # 集群节点个数
                  replicas="3"
                  # 集群通信端口. 当该节点成为master后,它将绑定已配置的地址和端口来为复制协议提供服务
                  bind="tcp://0.0.0.0:63631"
                  # zookeeper集群地址
                  zkAddress="192.168.145.3:2181,192.168.145.3:2182,192.168.145.3:2183"
                  # 主机名(在etc/hosts文件中对ip地址进行映射)
                  hostname="mq-server"
                  # 同步到本地磁盘
                  sync="local_disk"
                  # zookeeper数据挂载点
                  zkPath="/activemq/leveldb-stores"/>
          </persistenceAdapter>
          
      • 修改各节点的消息端口(activemq.xml)
        mq_node01消息端口为61616,不需要改.
        mq_node02消息端口为61617:

         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        

        mq_node03消息端口为61618:

        <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        
      • 按顺序启动3个activemq节点(前提是zk集群已经成功启动运行)

        • 写一个shell脚本: amq_batch_start.sh,直接执行该脚本即可启动activemq集群
        #!/bin/sh
        cd /usr/local/mq_cluster/mq_node01/bin
        ./activemq start
        cd /usr/local/mq_cluster/mq_node02/bin
        ./activemq start
        cd /usr/local/mq_cluster/mq_node03/bin
        ./activemq start
        
        • 写一个shell脚本: amq_batch_stop.sh,直接执行该脚本即可关闭activemq集群
        把上面的脚本内容中的start改为stop即可
        
        • 给上述两个脚本执行权限
        sudo chmod 700 ./amq_batch_start.sh
        sudo chmod 700 ./amq_batch_stop.sh
        
      • zk集群节点状态说明

        # 任意启动一台zookeeper client
        ./zkCli.sh -server localhost:2181
        # ls /查看节点
        [zk: localhost:2181(CONNECTED) 4] ls /       
        [activemq, zookeeper]
        [zk: localhost:2183(CONNECTED) 1] ls /activemq
        [leveldb-stores]
        [zk: localhost:2183(CONNECTED) 2] ls /activemq/leveldb-stores
        [00000000006, 00000000007, 00000000005]
        

        下面截图中可以看到以13为结尾的节点elected有值,而另外两个节点elected为null,所以节点13为master,另外两个为slaver
        在这里插入图片描述
        在这里插入图片描述
        在这里插入图片描述

    • activemq管理控制台展示(activemq的客户端只能访问master的Broker,根据bind端口号63631确定master管理控制台端口为8161)
      在这里插入图片描述

    1. 集群可用性测试
    • 测试使用该链接中的代码
    • 当有activemq的master节点宕机时,会自动切换到另外一个存活的节点
      • activemq的客户端只能访问master的broker(activemq服务器实例),其他处于slave的broker不能访问,所以客户端连接的broker应该使用failover协议(失败转移)
      • 当一个activemq节点挂掉或者一个zookeeper节点挂掉,activemq服务依然正常运转,如果仅剩一个activemq节点,由于不能选举master,所以activemq不能正常运行.
      • 同样,zookeeper如果仅剩一个节点,不管activemq各节点是否存活,activemq也不能提供正常服务.因为activemq集群的高可用依赖于zookeeper集群的高可用.
    • 测试: 3台机器中的activemq只会有一个mq可以被客户端连接使用,在测试时把master关掉,然后再重试客户端消息发送和消费还可以正常使用,则说明集群搭建正常.
    • 查看master
      在这里插入图片描述
    • 从上图可以看出端口63632对应的是mq_node02(客户端端口为8162),所以停止mq_node02服务,看是否会进行失败转移
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述
    • 从上图可以看出该集群搭建正常,满足可用性
    1. 集群可用性测试中碰到的问题
    • 在关闭activemq master服务器的时候碰到不能进行失败转移, 如下图查看activemq节点时,address处为null,但是exected不为空(下图是成功进行失败转移图).而且在activemq.log中报错:No IOExceptionHandler registered, ignoring IO exception | org.apache.activemq.broker.BrokerService | LevelDB IOException handler. java.io.IOException:
      在这里插入图片描述
    • 解决办法:
      将activemq节点中/data/leveldb目录删除掉,再重新启动zookeeper集群和activemq集群.参考链接
    1. activemq引入消息队列后如何保证其高可用性(前面有详细解答)
    • 持久化
    • 事务
    • 签收
    • zookeeper+replicated-leveldb-stored(带复制的leveldb存储)主从集群
    1. 异步投递发送async sends
    • activemq默认使用异步发送.除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息
    • 如果没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 Broker 返回一个确认,表示消息己经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
    • 异步投递可以最大化 produer 端的发送效率。通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 producer 性能;不过这也带来了额外的问题,就是需要消耗较多的 Client 端内存同时也会导致 Broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。
    • 开启同步投递的方式(详细代码见activemq代码)
      • 方法1
      private static final String ACTIVEMQ_URL = "cp://localhost:61616?jms.useAsyncSend=true";
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      ...
      
      • 方法2
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      activeMQConnectionFactory.setUseAsyncSend(true);
      ...
      
      • 方法3
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      ActiveMQConnection connection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
      connection.setUseAsyncSend(true);
      ...
      
    1. 异步发送怎么确保能够发送成功
    • 异步发送丢失的场景是生产者设置useAysncSend=true,使用produce.send(msg)持续发送消息.由于消息不堵塞,生产者会认为所有发送的消息均被成功发送至MQ.如果MQ突然宕机,那么生产者端内存中尚未发送到MQ的消息全部丢失.
      所以正确的异步发送方式是需要回调的.

    • 同步发送和异步发送的区别在此: 同步发送send()不堵塞了就表示一定发送成功了,异步发送需要接受回调并由客户端再判断一次是否发送成功.

    • 异步发送+回调代码:

      package com.atguigu.activemq.queue;
      import lombok.extern.log4j.Log4j;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.ActiveMQMessageProducer;
      import org.apache.activemq.AsyncCallback;
      
      import javax.jms.*;
      import java.util.UUID;
      
      @Log4j
      public class JmsSyncProduce {
      //    单机
      //    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
      //    private static final String QUEUE_NAME = "queue01";
      
      //    集群
      //    failover:失败转移协议,当某一个activemq节点挂掉时转移到另一个,如果仅剩一个activemq节点,由于不能选举master,所以activemq不能正常运行.
          private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.145.3:61616,tcp://192.168.145.3:61617,tcp://192.168.145.3:61618)?randomize=false";
          private static final String QUEUE_NAME = "queue-cluster";
      
          public static void main(String[] args) throws JMSException {
              // 1。创建连接工厂,按照给定的url地址,采用默认用户名和密码
              ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
              // 设置同步发送
              activeMQConnectionFactory.setUseAsyncSend(true);
              // 2。通过连接工厂,获得连接connection并启动访问
              Connection connection = activeMQConnectionFactory.createConnection();
              connection.start();
              // 3。创建会话
              // 两个参数,第一个叫事务,第二个叫签收
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              // 4。创建目的地(队列或主题)
              Queue queue = session.createQueue(QUEUE_NAME);
              // 5。创建消息生产者
              ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
              // 6。通过使用messageProducer生产3条消息发送到MQ的队列中
              TextMessage textMessage = null;
              for(int i = 1; i <= 3; i++){
                  // 7。 创建消息
                  textMessage = session.createTextMessage("cluster msg---" + i);  //理解为字符串
                  textMessage.setJMSMessageID(UUID.randomUUID().toString() + "---orderAtguigu");
                  String msgId = textMessage.getJMSMessageID();
                  // 8。通过messageProducer发送给mq
                  activeMQMessageProducer.send(textMessage, new AsyncCallback() {
                      @Override
                      public void onSuccess() {
                          log.info(msgId + "has send ok");
                      }
      
                      @Override
                      public void onException(JMSException e) {
                          log.info(msgId + "send failed");
                      }
                  });
              }
              // 9。关闭资源
              activeMQMessageProducer.close();
              session.close();
              connection.close();
              log.info("******消息发送到MQ完成");
          }
      }
      
    1. 延时投递和定时投递
    • 应用场景
      邮件发送等

    • 四大属性
      在这里插入图片描述

    • 具体操作

      • 在activemq.xml中配置schedulerSupport属性为true
        在这里插入图片描述
      • java代码里面封装的辅助消息类型: ScheduledMessage
      • 延时投递生产者代码
        package com.atguigu.activemq.queue;
        import lombok.extern.log4j.Log4j;
        import org.apache.activemq.ActiveMQConnectionFactory;
        import org.apache.activemq.ScheduledMessage;
        import javax.jms.*;
        
        @Log4j
        public class JmsProduct_DelayAndSchedule {
        //    单机
        //    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
        //    private static final String QUEUE_NAME = "queue01";
        
        //    集群
        //    failover:失败转移协议,当某一个activemq节点挂掉时转移到另一个,如果仅剩一个activemq节点,由于不能选举master,所以activemq不能正常运行.
            private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.145.3:61616,tcp://192.168.145.3:61617,tcp://192.168.145.3:61618)?randomize=false";
            private static final String QUEUE_NAME = "queue-cluster";
            public static void main(String[] args) throws JMSException {
                // 1。创建连接工厂,按照给定的url地址,采用默认用户名和密码
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
                // 2。通过连接工厂,获得连接connection并启动访问
                Connection connection = activeMQConnectionFactory.createConnection();
                connection.start();
                // 3。创建会话
                // 两个参数,第一个叫事务,第二个叫签收
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4。创建目的地(队列或主题)
                Queue queue = session.createQueue(QUEUE_NAME);
                // 5。创建消息生产者
                MessageProducer messageProducer = session.createProducer(queue);
                // 延时投递时间
                long delay = 3 * 1000;
                // 重复投递的时间间隔,即每4s投递一次
                long period = 4 * 1000;
                // 重复投递次数
                int repeat = 5;
                // 6。通过使用messageProducer生产3条消息发送到MQ的队列中
                for(int i = 1; i <= 3; i++){
                    // 7。 创建消息
                    TextMessage textMessage = session.createTextMessage("cluster msg---" + i);  //理解为字符串
                    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
                    textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
                    // 8。通过messageProducer发送给mq
                    messageProducer.send(textMessage);
                }
                // 9。关闭资源
                messageProducer.close();
                session.close();
                connection.close();
                log.info("******消息发送到MQ完成");
            }
        }
        
    1. activemq消费重试机制
    • 那些情况会引发消息重发

      • client使用了事务transactions且在session中调用了rollback()回滚
      • client使用了事务transactions且在调用commit()之前关闭或者没有commit
      • client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()
    • 消息重发时间间隔和最大重发次数(只算重发,不算第一次发送的)
      间隔1 次数6

    • 有毒消息Poison ACK
      一个消息被重发超过默认最大重发次数(6次)时,消费端会给MQ发送一个“poison ack”表示这个消息有毒,告诉broker不要再发了,这个时候broker会把消息放到死信队列(DLQ, dead letter queue)

    • 属性说明
      在这里插入图片描述

    • spring整和消息重发
      在这里插入图片描述

    1. 死信队列
      一个消息被重发超过默认最大重发次数(6次)时,消费端会给MQ发送一个“poison ack”表示这个消息有毒,告诉broker不要再发了,这个时候broker会把消息放到死信队列(DLQ, dead letter queue).开发人员可以在这个队列中查看出错的信息,进行人工干预.
      在这里插入图片描述
    • 死信队列的使用: 处理失败的消息
      在这里插入图片描述
      在这里插入图片描述
    1. 如何保证消息不被重复消费,谈谈幂等性
    • 幂等性
      任意多次执行所产生的影响均与一次执行的影响相同就可以称为幂等
    • 消息幂等性
      当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响
    • 背景
      网络延迟传输中,会造成MQ消息重试,在重试过程中,可能会造成重复消费.比如正常业务情况下,我们是不允许同个订单重复支付,这种业务场景我们就需要确保幂等性
    • 解决
      • 方法1: 如果消息是做数据库的插入操作,给这个消息一个唯一主键,那么就算出现重复消费的情况,会导致主键冲突,避免数据库中出现脏数据.
      • 方法2: 准备一个第三方做消费记录,比如redis.给消息分配一个全局id,只要消费过该消息,将<id, message>以K-V的形式写入redis,那消费者开始消费前,去redis中查询有没有消费记录即可.
    展开全文
  • 为什么需要消息队列?如何做消息队列的选型? 消息队列——解耦异步削峰
  • 消息队列作用(解耦异步削峰)图详解

    千次阅读 多人点赞 2020-05-16 16:41:40
    文章目录一、消息队列简介二、解耦三、异步四、流量削峰五、消息通讯六、日志处理七 .使用消息队列带来的一些问题 一、消息队列简介 百度百科 消息队列概念 MQ全称为Message Queue,消息队列(MQ)是一种应用...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,041
精华内容 816
关键字:

解耦异步削峰