消息队列 订阅
“消息队列”是在消息的传输过程中保存消息的容器。 展开全文
“消息队列”是在消息的传输过程中保存消息的容器。
信息
外文名
Message queue
对    象
两台计算机间
特    点
消息可以非常简单也可以更复杂
含    义
消息的传输过程中保存消息的容器
中文名
消息队列
消息队列消息简介
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
收起全文
精华内容
下载资源
问答
  • 2022-02-17 13:52:30


    简介

    MQ(message queue)消息队列,也叫消息中间件。

    消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

    它是类似于数据库一样需要独立部署在服务器上的一种应用,提供接口给其他系统调用。

    JMS规范

    消息中间件是遵守JMS(java message service)规范的一种软件(大多数消息中间件遵守JMS规范)。

    要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。现在既有开源的提供者也有专有的提供者。
    开源的提供者包括:Apache ActiveMQ、Kafka、WebMethods、阿里的RocketMQ等。

    专业术语

    • 提供者:实现JMS规范的中间件服务器。
    • 客户端:发送或者接受消息的应用程序。
    • 生产者:创建并发送消息的客户端。
    • 消费者:接受并处理消息的客户端。
    • 消息:应用程序之间传递的内容。
    • 队列:一个容纳那些被发送的等待阅读的消息的区域,一旦消息被消费,将被从队列中移走。
    • 主题 :一种支持发送消息给多个订阅者的机制。
    • 消息模式:在客户端之间传递消息的方式,JSM中定义了点对点模式(发送者接收者)和发布订阅模式(发布者订阅者)。

    消息模式

    点对点模式:Point-to-Point(P2P)

    消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

    消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
    在这里插入图片描述

    1. 每个消息只有一个消费者。一旦被消费,消息就不再在消息队列中。

    2. 提供者和消费者之间在时间上没有依赖性。当提供者发送了消息之后,不管消费者有没有正在运行,它不会影响到消息被发送到队列。

    3. 每条消息仅会传送给一个消费者。可能会有多个消费者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个消费者所消费。

    4. 消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者。当已被消费时,就会从队列头部将它们删除(除非使用了消息优先级)。

    5. 消费者在成功接收消息之后需向队列应答成功。

    queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

    发布订阅模式:Publish/Subscribe(Pub/Sub)

    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
    在这里插入图片描述

    1. 每个消息可以有多个消费者。
    2. 发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
    3. 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
    4. 每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久型、非持久型和动态型。
    5. 发布者通常不会知道哪一个订阅者正在接收主题消息。
    6. 消息被推送给消费者。这意味着消息会传送给消费者,而无须请求。

    topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。

    消息消费方式

    1. 同步

      订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。

    2. 异步
      订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

    JMS规范接口

    • ConnectionFactor接口(连接工厂)
      用于创建连接到消息中间件的连接工厂。

      创建Connection对象的工厂,根据消息类型的不同,用户将使用队列连接工厂QueueConnectionFactory或者主题连接工厂TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

    • Connection接口(连接)
      Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装),代表了应用程序和消息服务器之间的通信链路。

      Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    • Destination接口(目标)
      Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。

      它是消息生产者的消息发送目标或者说消息消费者的消息来源。

      • 对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);
      • 对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

      所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

    • Session接口(会话)
      Session是我们操作消息的接口。表示一个单线程的上下文,用于发送和接收消息。

      由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。
      可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    • MessageProducer接口(消息生产者)
      消息生产者由Session创建,并用于将消息发送到Destination。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

      同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    • MessageConsumer接口(消息消费者)
      消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。

      可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    • Message接口(消息)
      是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分。

      1. 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
      2. 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
      3. 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。

      消息接口非常灵活,并提供了许多方式来定制消息的内容。

    • MessageListener(监听器)
      消息监听器,如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。

      EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

    消息中间件作用

    1.系统解耦

    系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。

    2.异步通信

    消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    对于一些非必须及时处理的业务,通过消息队列可以优化系统响应时间。提升系统性能。

    3.流量削峰

    使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    4.数据采集

    分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

    5.可恢复性

    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

    许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

    6.可扩展性

    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    7.顺序保证

    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

    消息中间件协议

    1.AMQP协议

    AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

    优点:可靠、通用

    部分相关产品:

    • RabbitMQ
      一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台。
    • OpenAMQ
      AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS。
    • Apache Qpid
      Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET。
    • Zyre
      一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力。

    2.MQTT协议

    MQTT(Message Queuing Telemetry Transport)消息队列遥测传输,是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。

    该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

    优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

    3.STOMP协议

    STOMP(Streaming Text Orientated Message Protocol)流文本定向消息协议,是一种为MOM(Message Oriented Middleware)面向消息的中间件设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

    优点:命令模式(非topic\queue模式)

    部分相关产品:

    • ActiveMQ

    4.XMPP协议

    XMPP(Extensible Messaging and Presence Protocol)可扩展消息处理现场协议,是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。

    核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

    优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

    5.基于TCP/IP自定义协议

    有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

    主流消息中间件

    ActiveMQ

    • 非常成熟,功能比较完备,大量公司使用;
    • 社区越来越不活跃,维护越来越少,几个月才发一次版;
    • 偶尔会有较低概率丢失消息;
    • 多数使用目的主要是用于解耦和异步通信,较少在大规模吞吐的场景中使用。

    RabbitMQ

    • 比较成熟,功能比较完备,大量公司使用;
    • Erlang语言开发,性能极其好,延时很低;
    • 比较好用,社区活跃,几乎每个月都发布几个版本;
    • 吞吐量万级,和其他相比会略低一些,这是因为他做的实现机制比较重;
    • Erlang开发,语言难度大,很难读源码,很难定制和掌控。基本只能依赖于开源社区的快速维护和修复bug。
    • 集群动态扩展会很麻烦,这主要是erlang语言本身带来的问题。

    RocketMQ

    • 文档相对来说简单一些,接口简单易用(接口不是按照标准JMS规范);
    • 阿里大规模应用,有保障(阿里日处理消息上百亿之多),可以做到大规模吞吐,性能也非常好;
    • 分布式扩展也很方便;
    • 社区比较活跃,维护还可以;
    • 可靠性和可用性都不错;
    • 支撑大规模的topic数量;
    • 支持复杂MQ业务场景;
    • Java语言编写,我们可以自己阅读源码。

    Kafka

    • 仅提供较少的核心功能;

    • 提供超高的吞吐量;

    • ms级的延迟;

    • 极高的可用性以及可靠性;

    • 分布式可以任意扩展;

    • 一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用;

    • topic的大幅增加会导致吞吐量的大幅度下降;

      所以尽量保证topic数量不要过多,以保证其超高吞吐量。如果要支撑大规模topic,需要增加更多的机器资源

    • 消息有可能重复消费;

    • 天然适合大数据实时计算以及日志收集,在大数据领域中以及日志采集得以广泛使用。

    4种MQ对比

    特性ActiveMQRabbitMQRocketMQKafka
    成熟度成熟成熟比较成熟成熟日志领域
    社区活跃度较高
    开发语言JavaErlangJavaScala
    跨语言支持,Java优先语言无关只支持Java支持,Java优先
    支持协议AMQP、MQTT、STOMP、OpenWireAMQP、MQTT、STOMPMQTT、TCPKafka
    JMS规范支持支持支持得不够好不支持
    持久化内存、文件、数据库内存、文件磁盘文件磁盘文件
    可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
    单机吞吐量万级万级万级十万级
    消息延迟毫秒级微秒级毫秒级毫秒级
    可靠性有较低的概率丢失数据有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
    事务支持支持支持支持
    集群支持支持支持支持
    负载均衡支持支持支持支持
    文档完备完备完备完备
    是否开源开源开源开源开源
    所属社区/公司ApacheRabbitApacheApache
    消息服务默认端口616165672109119092
    管理后台单独部署
    管理后台默认端口8161156728080-
    部署方式独立、嵌入独立独立独立
    评价产品成熟,功能齐全,大量公司使用;有较低概率丢失消息;社区不够活跃,版本维护较少,公司产品重心不在该产品上Erlang开发,性能好,延迟低;大量公司使用;社区比较活跃;但erlang语言难度大,集群动态扩容很麻烦功能较为完善,社区比较活跃;还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

    消息分发策略对比:

    消息分发策略ActiveMQRabbitMQRocketMQKafka
    发布订阅支持支持支持支持
    轮询分发支持支持-支持
    公平分发-支持-支持
    重发支持支持支持-
    消息拉取-支持支持支持

    MQ的选择

    最早大家用ActiveMQ。但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃。

    后来大家用RabbitMQ。但是确实erlang语言阻止了大量的java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,但是确实人是开源的,比较稳定的支持,活跃度也高。

    现在确实越来越多的公司会去用RocketMQ。

    • 对于中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;
    • 对于大型公司,基础架构研发实力较强,用RocketMQ是很好的选择;
    • 大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题。社区活跃度很高,何况Kafka几乎是全世界这个领域的规范制定者。
    更多相关内容
  • 本文主要介绍了两种java实现消息队列的方式,利用Spring消息模板发送消息和Apache ActiveMQ官方实例发送消息,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • C++封装实现的异步加锁消息队列,支持多线程,完美封装,可用于消息接收、处理
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高...

    消息队列

    “消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

    为什么要用消息队列

    (1) 通过异步处理提高系统性能(削峰、减少响应所需时间)
    在这里插入图片描述
    如上图,在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。

    (2) 降低系统耦合性
    我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
    我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示:
    在这里插入图片描述
    消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

    消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

    另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

    备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。

    RocketMq

    RocketMq是一个由阿里巴巴开源的消息中间件,脱胎去阿里每部使用的MetaQ,在设计上借鉴了Kafka。2012年开源,2017年成为apache顶级项目

    RocketMQ 是什么?
    在这里插入图片描述
    上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:
    是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
    Producer、Consumer、队列都可以分布式。
    Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
    能够保证严格的消息顺序
    提供丰富的消息拉取模式
    高效的订阅者水平扩展能力
    实时的消息订阅机制
    亿级消息堆积能力
    较少的依赖

    RocketMQ 物理部署结构
    在这里插入图片描述
    如上图所示, RocketMQ的部署结构有以下特点:
    Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
    Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
    Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
    Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    RocketMQ 逻辑部署结构
    在这里插入图片描述

    如上图所示,RocketMQ的逻辑部署结构有Producer和Consumer两个特点。
    Producer Group
    用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:1.标识一类Producer
    2.可以通过运维工具查询这个发送消息应用下有多个Producer实例
    3.发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
    Consumer Group
    用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

    RocketMQ 数据存储结构
    在这里插入图片描述
    如上图所示,RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

    JMS和AMQP的区别

    JMS
    JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

    JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

    ActiveMQ 就是基于 JMS 规范实现的。

    JMS两种消息模型

    ①点到点(P2P)模型
    在这里插入图片描述
    使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

    ② 发布/订阅(Pub/Sub)模型
    在这里插入图片描述
    发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。

    JMS 五种不同的消息正文格式
    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    • StreamMessage – Java原始值的数据流
    • MapMessage–一套名称-值对
    • TextMessage–一个字符串对象
    • ObjectMessage–一个序列化的 Java对象
    • BytesMessage–一个字节的数据流

    AMQP
    ​ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

    RabbitMQ 就是基于 AMQP 协议实现的。
    在这里插入图片描述
    总结:

    • AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在 Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
    • JMS 支持TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
    • 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

    常见的消息队列对比

    在这里插入图片描述
    总结:

    • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
    • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
    • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
    • kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

    消息队列应用场景

    1、异步处理
    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
    a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
    在这里插入图片描述
    b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
    在这里插入图片描述
    假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
    因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
    小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
    在这里插入图片描述
    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

    2、应用解耦
    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
    在这里插入图片描述
    传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

    如何解决以上问题呢?引入应用消息队列后的方案,如下图:
    在这里插入图片描述
    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
    库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    3、流量削锋
    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
    a、可以控制活动的人数
    b、可以缓解短时间内高流量压垮应用
    在这里插入图片描述
    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
    秒杀业务根据消息队列中的请求信息,再做后续处理

    2.4日志处理
    日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
    在这里插入图片描述
    日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
    Kafka消息队列,负责日志数据的接收,存储和转发
    日志处理应用:订阅并消费kafka队列中的日志数据

    5、消息通讯
    消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
    点对点通讯:
    在这里插入图片描述
    客户端A和客户端B使用同一队列,进行消息通讯。

    聊天室通讯:
    在这里插入图片描述
    客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
    以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

    消息中间件示例
    电商系统
    在这里插入图片描述
    消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
    (1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
    (2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
    (3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

    日志收集系统
    在这里插入图片描述
    分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
    Zookeeper注册中心,提出负载均衡和地址查找服务
    日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
    Kafka集群:接收,路由,存储,转发等消息处理
    Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

    参考文章:
    https://www.jianshu.com/p/36a7775b04ec
    https://blog.csdn.net/HD243608836/article/details/80217591

    展开全文
  • windows 消息队列 实现:发送消息、接收消息、多线程监听消息
  • 可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
  • 什么是消息队列你了解过么?

    你知道的越多,你不知道的越多

    点赞再看,养成习惯

    GitHub上已经开源 https://github.com/JavaFamily 有一线大厂面试点脑图、个人联系方式,欢迎Star和完善

    面试开始

    一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向你走来,看着铮亮的头,心想着肯定是尼玛顶级架构师吧!但是我们看过暖男敖丙的系列,腹有诗书气自华,虚都不虚。

    小伙子之前问了你这么多Redis的知识,你不仅对答如流,你还能把各自场景的解决方案,优缺点说得这么流畅,说你是不是看过敖丙写的《吊打面试官》系列呀?

    惊!!!老师你怎么知道的,我看了他的系列根本停不下来啊。

    呵呵,Redis没难住你,但是我问个新的技术栈我还怕难不住你?我问问你你项目中用过消息队列么?你为啥用消息队列?

    噗此,这也叫问题?别人用了我能不用么?别人用了我就用了呗,我就是为了用而用。

    你心里嘀咕就好了,千万别说出来哈,说出来了没拿到Offer别到时候就在那说,敖丙那个渣男教我说的!

    面试官你好:我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件

    哦?你说到业务场景越来越复杂,你那说一下你都在什么场景用到了消息队列?

    嗯,我从三个方面去说一下我使用的场景吧。

    Tip:这三个场景也是消息队列的经典场景,大家基本上要烂熟于心那种,就是一说到消息队列你脑子就要想到异步、削峰、解耦,条件反射那种。

    异步:

    我们之前的场景里面有很多步骤都是在一个流程里面需要做完的,就比如说我的下单系统吧,本来我们业务简单,下单了付了钱就好了,流程就走完了。

    但是后面来了个产品经理,搞了个优惠券系统,OK问题不大,流程里面多100ms去扣减优惠券。

    后来产品经理灵光一闪说我们可以搞个积分系统啊,也行吧,流程里面多了200ms去增减积分。

    再后来后来隔壁的产品老王说:下单成功后我们要给用户发短信,也将就吧,100ms去发个短信。

    再后来。。。(敖丙你有完没完!!!)

    反正就流程有点像这样 ↓

    你们可以看到这才加了三个,我可以斩钉截铁的告诉你真正的下单流程涉及的系统绝对在10个以上(主流电商),越大的越多。

    这个链路这样下去,时间长得一批,用户发现我买个东西你特么要花几十秒,垃圾电商我不在你这里买了,不过要是都像并夕夕这么便宜,真香

    但是我们公司没有夕夕的那个经济实力啊,那只能优化系统了。

    Tip:我之前在的电商老东家要求所有接口的RtResponseTime响应时间)在200ms内,超出的全部优化,我现在所负责的系统QPS也是9W+就是抖动一下网络集群都可能炸锅那种,RT基本上都要求在50ms以内。

    大家感受一下这个QPS。

    嗯不错,链路长了就慢了,那你怎么解决的?

    那链路长了就慢了,但是我们发现上面的流程其实可以同时做的呀,你支付成功后,我去校验优惠券的同时我可以去增减积分啊,还可以同时发个短信啊。

    那正常的流程我们是没办法实现的呀,怎么办,异步

    你对比一下是不是发现,这样子最多只用100毫秒用户知道下单成功了,至于短信你迟几秒发给他他根本不在意是吧。

    小伙子我打断你一下,你说了异步,那我用线程,线程池去做不是一样的么?

    诶呀,面试官你不要急嘛,我后面还会说到的,骚等。

    解耦:

    既然面试官这么问了,我就说一下为啥我们不能用线程去做,因为用线程去做,你是不是要写代码?

    你一个订单流程,你扣积分,扣优惠券,发短信,扣库存。。。等等这么多业务要调用这么多的接口,每次加一个你要调用一个接口然后还要重新发布系统,写一次两次还好,写多了你就说:老子不干了!

    而且真的全部都写在一起的话,不单单是耦合这一个问题,你出问题排查也麻烦,流程里面随便一个地方出问题搞不好会影响到其他的点,小伙伴说我每个流程都try catch不就行了,相信我别这么做,这样的代码就像个定时炸弹💣,你不知道什么时候爆炸,平时不炸偏偏在你做活动的时候炸,你就领个P0故障收拾书包提前回家过年吧。

    Tip:P0—PN 是互联网大厂经常用来判定事故等级的机制,P0是最高等级了。

    但是你用了消息队列,耦合这个问题就迎刃而解了呀。

    哦,帅丙怎么说?

    且听我娓娓道来:

    你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了

    那你的流程走完了,你不用管别人是否成功么?比如你下单了积分没加,优惠券没扣怎么办?

    问题是个好问题,但是没必要考虑,业务系统本身就是自己的开发人员维护的,你积分扣失败关我下单的什么事情?你管好自己下单系统的就好了。

    Tip:话是这么说,但是这其实是用了消息队列的一个缺点,涉及到分布式事务的知识点,我下面会提到。

    削峰:

    就拿我上一期写的秒杀来说(暗示新同学看我上一期),你平时流量很低,但是你要做秒杀活动00 :00的时候流量疯狂怼进来,你的服务器,RedisMySQL各自的承受能力都不一样,你直接全部流量照单全收肯定有问题啊,直接就打挂了。

    那怎么办?

    简单,把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器处理能力,你能处理5000QPS你就消费这么多,可能会比正常的慢一点,但是不至于打挂服务器,等流量高峰下去了,你的服务也就没压力了。

    你看阿里双十一12:00的时候这么多流量瞬间涌进去,他有时候是不是会慢一点,但是人家没挂啊,或者降级给你个友好的提示页面,等高峰过去了又是一条好汉了。

    为了这个图特意打高一台服务的流量
    为了这个图特意打高一台服务的流量

    听你说了辣么多,怎么都是好处,那我问你使用了消息队列有啥问题么?

    诶,看过前面我写的文章的人才都知道,我经常说的就是,技术是把双刃剑

    没错面试官,我使用他是因为他带给我们很多好处,但是使用之后问题也是接踵而至

    同样的暖男我呀,也从三个点介绍他主要的缺点:

    系统复杂性

    本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费消息丢失消息的顺序消费等等,反正用了之后就是贼烦。

    我插一句嘴,上面的问题(重复消费、消息丢失、顺序消费)你能分别介绍一下,并且说一下分别是怎么解决的么?

    不要!我都说了敖丙下一章写啥?

    其实不是暖男我不想在这里写,这三个问题我想了下,统统都是MQ重点问题,单独拿一个出来就是一篇文章了,篇幅实在太长了,我会在下一章挨个介绍一遍的。

    数据一致性

    这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。

    就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?

    我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法,这样就像个渣男,没有格局这样呀你的路会越走越窄的

    所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?

    分布式事务:把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

    Tip:分布式事务在互联网公司里面实在常见,我也不在这里大篇幅介绍了,后面都会专门说的。

    可用性

    你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是杀一个程序员能搞定的吧,感觉得杀一片。

    至于怎么保证高可用,还是那句话也不在这里展开讨论了,我后面一样会写,像写Redis那样写出来的。

    放心敖丙我不是渣男来的,我肯定会对你们负责的。点赞!

    看不出来啊,你有点东西呀,那我问一下你,你们是怎么做技术选型的?

    目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。

    不过敖丙我想说的是,ActiveMQRabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。

    KafkaRocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源

    就像我们公司就是是基于KafkaRocketMQ两者的优点自研的消息队列中间件,吞吐量、可靠性、时效性等都很可观。

    我们回归正题,我这里用网上找的对比图让大家看看差距到底在哪里:

    大家其实一下子就能看到差距了,就拿吞吐量来说,早期比较活跃的ActiveMQRabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要

    比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?

    再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继

    部署方式而言前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

    我们再聊一下RabbitMQ这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。

    至于RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。

    Tip:Dubbo等我写到RPC我会详细介绍的。

    Kafka我放到最后说,你们也应该知道了,压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

    以上这些都只是一些我自己的个人意见,真正的选型还是要去深入研究的,不然那你公司一天UV就1000你告诉我你要去用Kafka我只能说你吃饱撑的。

    记住,没有最好的技术,只有最适合的技术,不要为了用而用

    面试结束

    嗯,小伙子不错不错,分析得很到位,那你记得下期来说一下消息队列的高可用,重复消费、消息丢失、消息顺序、分布式事务等问题?

    嗯嗯好的面试官,不过不确定能不能一口气说完,毕竟敖丙还没开始写,而且读者还有可能白嫖,动力不一定够。

    嗯嗯这倒是个问题,不过啊在看的都是人才肯定会给你点赞👍的!

    我也这么认为。

    总结

    消息队列的基础知识我就先介绍这么多,消息队列在面试里面基本上也是跟我前面写的Redis一样必问的。

    面试的思路还是一样,要知其然,也要知其所以然,就是要知道为啥用,用了有啥好处,有啥坑。

    面试官不喜欢只知道用的,你只会用那哪天线上出问题怎么办?你难道在旁边拜佛?

    Tip:本来有很多我准备的资料的,但是都是外链,或者不合适的分享方式,博客的运营小姐姐提醒了我,所以大家去公众号回复【资料】好了。

    鸣谢

    之前的文章写了很多人加我,然后有个人才说是他蚂蚁金服的Leader推荐的我,我突然意识到我文章的受众好像慢慢变广了,之后不严谨的点要杜绝掉。

    所以之后我的文章经常会有大厂的小伙伴Review,也希望帮助我更好的监督自己的文章吧。

    这次是 某阿里系电商跟我一起做过活动小组的 佩恩 帮我Review的文章,感谢!

    絮叨

    另外,敖丙把自己的面试文章整理成了一本电子书,共 1630页!目录如下

    现在免费送给大家,在我的公众号三太子敖丙回复 【888】 即可获取。

    我是敖丙,一个在互联网苟且偷生的程序员。

    你知道的越多,你不知道的越多人才们的 【三连】 就是丙丙创作的最大动力,我们下期见!

    注:如果本篇博客有任何错误和建议,欢迎人才们留言!


    文章持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub https://github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。

    展开全文
  • SpringCloud笔记(四)消息队列

    千次阅读 2022-04-22 10:51:44
    消息队列 经过前面的学习,我们已经了解了我们之前的技术在分布式环境下的应用,接着我们来看最后一章的内容。 那么,什么是消息队列呢? 我们之前如果需要进行远程调用,那么一般可以通过发送HTTP请求来完成,而...

    image-20220415163559986

    消息队列

    经过前面的学习,我们已经了解了我们之前的技术在分布式环境下的应用,接着我们来看最后一章的内容。

    那么,什么是消息队列呢?

    我们之前如果需要进行远程调用,那么一般可以通过发送HTTP请求来完成,而现在,我们可以使用第二种方式,就是消息队列,它能够将发送方发送的信息放入队列中,当新的消息入队时,会通知接收方进行处理,一般消息发送方称为生产者,接收方称为消费者。

    image-20220415165805716

    这样我们所有的请求,都可以直接丢到消息队列中,再由消费者取出,不再是直接连接消费者的形式了,而是加了一个中间商,这也是一种很好的解耦方案,并且在高并发的情况下,由于消费者能力有限,消息队列也能起到一个削峰填谷的作用,堆积一部分的请求,再由消费者来慢慢处理,而不会像直接调用那样请求蜂拥而至。

    那么,消息队列具体实现有哪些呢:

    • RabbitMQ - 性能很强,吞吐量很高,支持多种协议,集群化,消息的可靠执行特性等优势,很适合企业的开发。
    • Kafka - 提供了超高的吞吐量,ms级别的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。
    • RocketMQ - 阿里巴巴推出的消息队列,经历过双十一的考验,单机吞吐量高,消息的高可靠性,扩展性强,支持事务等,但是功能不够完整,语言支持性较差。

    我们这里,主要讲解的是RabbitMQ消息队列。

    RabbitMQ 消息队列

    **官方网站:**https://www.rabbitmq.com

    RabbitMQ拥有数万计的用户,是最受欢迎的开源消息队列之一,从T-MobileRuntastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。

    RabbitMQ轻量级,易于在本地和云端部署,它支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模、高可用性要求。

    RabbitMQ在许多操作系统和云环境中运行,并为大多数流行语言提供了广泛的开发者工具

    我们首先还是来看看如何进行安装。

    安装消息队列

    **下载地址:**https://www.rabbitmq.com/download.html

    由于除了消息队列本身之外还需要Erlang环境(RabbitMQ就是这个语言开发的)所以我们就在我们的Ubuntu服务器上进行安装。

    首先是Erlang,比较大,1GB左右:

    sudo apt install erlang
    

    接着安装RabbitMQ:

    sudo apt install rabbitmq-server
    

    安装完成后,可以输入:

    sudo rabbitmqctl status
    

    来查看当前的RabbitMQ运行状态,包括运行环境、内存占用、日志文件等信息:

    Runtime
    
    OS PID: 13718
    OS: Linux
    Uptime (seconds): 65
    Is under maintenance?: false
    RabbitMQ version: 3.8.9
    Node name: rabbit@ubuntu-server-2
    Erlang configuration: Erlang/OTP 23 [erts-11.1.8] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64]
    Erlang processes: 280 used, 1048576 limit
    Scheduler run queue: 1
    Cluster heartbeat timeout (net_ticktime): 60
    

    这样我们的RabbitMQ服务器就安装完成了,要省事还得是Ubuntu啊。

    可以看到默认有两个端口名被使用:

    Listeners
    
    Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    

    我们一会主要使用的就是amqp协议的那个端口5672来进行连接,25672是集群化端口,之后我们也会用到。_ ta

    接着我们还可以将RabbitMQ的管理面板开启,这样话就可以在浏览器上进行实时访问和监控了:

    sudo rabbitmq-plugins enable rabbitmq_management
    

    再次查看状态,可以看到多了一个管理面板,使用的是HTTP协议:

    Listeners
    
    Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    Interface: [::], port: 15672, protocol: http, purpose: HTTP API
    

    我们打开浏览器直接访问一下:

    image-20220415203431587

    可以看到需要我们进行登录才可以进入,我们这里还需要创建一个用户才可以,这里就都用admin:

    sudo rabbitmqctl add_user 用户名 密码
    

    将管理员权限给予我们刚刚创建好的用户:

    sudo rabbitmqctl set_user_tags admin administrator
    

    创建完成之后,我们登录一下页面:

    image-20220415203728664

    进入了之后会显示当前的消息队列情况,包括版本号、Erlang版本等,这里需要介绍一下RabbitMQ的设计架构,这样我们就知道各个模块管理的是什么内容了:

    image-20220416103043845

    • **生产者(Publisher)和消费者(Consumer):**不用多说了吧。
    • **Channel:**我们的客户端连接都会使用一个Channel,再通过Channel去访问到RabbitMQ服务器,注意通信协议不是http,而是amqp协议。
    • **Exchange:**类似于交换机一样的存在,会根据我们的请求,转发给相应的消息队列,每个队列都可以绑定到Exchange上,这样Exchange就可以将数据转发给队列了,可以存在很多个,不同的Exchange类型可以用于实现不同消息的模式。
    • **Queue:**消息队列本体,生产者所有的消息都存放在消息队列中,等待消费者取出。
    • **Virtual Host:**有点类似于环境隔离,不同环境都可以单独配置一个Virtual Host,每个Virtual Host可以包含很多个Exchange和Queue,每个Virtual Host相互之间不影响。

    使用消息队列

    我们就从最简的的模型开始讲起:

    image-20220417103647609

    (一个生产者 -> 消息队列 -> 一个消费者)

    生产者只需要将数据丢进消息队列,而消费者只需要将数据从消息队列中取出,这样就实现了生产者和消费者的消息交互。我们现在来演示一下,首先进入到我们的管理页面,这里我们创建一个新的实验环境,只需要新建一个Virtual Host即可:

    image-20220419143014974

    添加新的虚拟主机之后,我们可以看到,当前admin用户的主机访问权限中新增了我们刚刚添加的环境:

    image-20220419143115507

    现在我们来看看交换机:

    image-20220419143338487

    交换机列表中自动为我们新增了刚刚创建好的虚拟主机相关的预设交换机,一共7个,这里我们首先介绍一下前面两个direct类型的交换机,一个是(AMQP default)还有一个是amq.direct,它们都是直连模式的交换机,我们来看看第一个:

    image-20220419143612318

    第一个交换机是所有虚拟主机都会自带的一个默认交换机,并且此交换机不可删除,此交换机默认绑定到所有的消息队列,如果是通过默认交换机发送消息,那么会根据消息的routingKey(之后我们发消息都会指定)决定发送给哪个同名的交换机,同时也不能显示地将消息队列绑定或解绑到此交换机。

    我们可以看到,详细信息中,当前交换机特性是持久化的,也就是说就算机器重启,那么此交换机也会保留,如果不是持久化,那么一旦重启就会消失。实际上我们在列表中看到D的字样,就表示此交换机是持久化的,包含一会我们要讲解的消息队列列表也是这样,所有自动生成的交换机都是持久化的。

    我们接着来看第二个交换机,这个交换机是一个普通的直连交换机:

    image-20220419144200533

    这个交换机和我们刚刚介绍的默认交换机类型一致,并且也是持久化的,但是我们可以看到它是具有绑定关系的,如果没有指定的消息队列绑定到此交换机上,那么这个交换机无法正常将信息存放到指定的消息队列中,也是根据routingKey寻找消息队列(但是可以自定义)

    我们可以在下面直接操作,让某个队列绑定,这里我们先不进行操作。

    介绍完了两个最基本的交换机之后(其他类型的交换机我们会在后面进行介绍),我们接着来看消息队列:

    image-20220419144508881

    可以看到消息队列列表中没有任何的消息队列,我们可以来尝试添加一个新的消息队列:

    image-20220419144553817

    第一行,我们选择我们刚刚创建好的虚拟主机,在这个虚拟主机下创建此消息队列,接着我们将其类型定义为Classic类型,也就是经典类型(其他类型我们会在后面逐步介绍)名称随便起一个,然后持久化我们选择Transient暂时的(当然也可以持久化,看你自己)自动删除我们选择No(需要至少有一个消费者连接到这个队列,之后,一旦所有与这个队列连接的消费者都断开时,就会自动删除此队列)最下面的参数我们暂时不进行任何设置(之后会用到)

    现在,我们就创建好了一个经典的消息队列:

    image-20220419145109450

    点击此队列的名称,我们可以查看详细信息:

    image-20220419145238458

    详细相信中包括队列的当前负载状态、属性、消息队列占用的内存,消息数量等,一会我们发送消息时可以进一步进行观察。

    现在我们需要将此消息队列绑定到上面的第二个直连交换机,这样我们就可以通过此交换机向此消息队列发送消息了:

    image-20220419145520844

    这里填写之前第二个交换机的名称还有我们自定义的routingKey(最好还是和消息队列名称一致,这里是为了一会演示两个交换机区别用)我们直接点击绑定即可:

    image-20220419145635179

    绑定之后我们可以看到当前队列已经绑定对应的交换机了,现在我们可以前往交换机对此消息队列发送一个消息:

    image-20220419145725499

    回到交换机之后,可以卡到这边也是同步了当前的绑定信息,在下方,我们直接向此消息队列发送信息:

    image-20220419145808450

    点击发送之后,我们回到刚刚的交换机详细页面,可以看到已经有一条新的消息在队列中了:

    image-20220419145903723

    我们可以直接在消息队列这边获取消息队列中的消息,找到下方的Get message选项:

    image-20220419145936160

    可以看到有三个选择,首先第一个Ack Mode,这个是应答模式选择,一共有4个选项:

    image-20220419150053926

    • Nack message requeue true:拒绝消息,也就是说不会将消息从消息队列取出,并且重新排队,一次可以拒绝多个消息。
    • Ack message requeue false:确认应答,确认后消息会从消息队列中移除,一次可以确认多个消息。
    • Reject message requeue true/false:也是拒绝此消息,但是可以指定是否重新排队。

    这里我们使用默认的就可以了,这样只会查看消息是啥,但是不会取出,消息依然存在于消息队列中,第二个参数是编码格式,使用默认的就可以了,最后就是要生效的操作数量,选择1就行:

    image-20220419150712314

    可以看到我们刚刚的消息已经成功读取到。

    现在我们再去第一个默认交换机中尝试发送消息试试看:

    image-20220419150913859

    如果我们使用之前自定义的routingKey,会显示没有路由,这是因为默认的交换机只会找对应名称的消息队列,我们现在向yyds发送一下试试看:

    image-20220419151016735

    可以看到消息成功发布了,我们来接收一下看看:

    image-20220419151058659

    可以看到成功发送到此消息队列中了。

    当然除了在交换机发送消息给消息队列之外,我们也可以直接在消息队列这里发:

    image-20220419151155264

    效果是一样的,注意这里我们可以选择是否将消息持久化,如果是持久化消息,那么就算服务器重启,此消息也会保存在消息队列中。

    最后如果我们不需要再使用此消息队列了,我们可以手动对其进行删除或是清空:

    image-20220419151548923

    点击Delete Queue删除我们刚刚创建好的yyds队列,到这里,我们对应消息队列的一些简单使用,就讲解完毕了。

    使用Java操作消息队列

    现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.2</version>
    </dependency>
    

    依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:

    public static void main(String[] args) {
        //使用ConnectionFactory来创建连接
        ConnectionFactory factory = new ConnectionFactory();
    
        //设定连接信息,基操
        factory.setHost("192.168.0.12");
        factory.setPort(5672);  //注意这里写5672,是amqp协议端口
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/test");
      
     		//创建连接
        try(Connection connection = factory.newConnection()){
            
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    

    这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。

    try(Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()){   //通过Connection创建新的Channel
      	//声明队列,如果此队列不存在,会自动创建
        channel.queueDeclare("yyds", false, false, false, null);
      	//将队列绑定到交换机
        channel.queueBind("yyds", "amq.direct", "my-yyds");
      	//发布新的消息,注意消息需要转换为byte[]
        channel.basicPublish("amq.direct", "yyds", null, "Hello World!".getBytes());
    }catch (Exception e){
        e.printStackTrace();
    }
    

    其中queueDeclare方法的参数如下:

    • queue:队列的名称(默认创建后routingKey和队列名称一致)
    • durable:是否持久化。
    • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
    • autoDelete:是否自动删除。
    • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

    其中queueBind方法参数如下:

    • queue:需要绑定的队列名称。
    • exchange:需要绑定的交换机名称。
    • routingKey:不用多说了吧。

    其中basicPublish方法的参数如下:

    • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
    • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
    • props:其他的配置。
    • body:消息本体。

    执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

    image-20220419153630431

    并且此消息队列已经成功与amq.direct交换机进行绑定:

    image-20220419154618613

    那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.37.129.4");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/test");
    
        //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
        //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        //创建一个基本的消费者
        channel.basicConsume("yyds", false, (s, delivery) -> {
            System.out.println(new String(delivery.getBody()));
            //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
            //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
            //为false,那么消息就会被丢弃
            //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            //跟上面一样,最后一个参数为false,只不过这里省了
            //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
        }, s -> {});
    }
    

    其中basicConsume方法参数如下:

    • queue - 消息队列名称,直接指定。
    • autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
    • deliver - 消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
    • cancel - 当消费者取消订阅时进行的函数回调,这里暂时用不到。

    现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:

    image-20220419155938158

    我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

    现在我们把刚刚创建好的消息队列删除。

    官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/

    前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    接着我们需要配置RabbitMQ的地址等信息:

    spring:
      rabbitmq:
        addresses: 192.168.0.4
        username: admin
        password: admin
        virtual-host: /test
    

    这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:

    @Configuration
    public class RabbitConfiguration {
        @Bean("directExchange")  //定义交换机Bean,可以很多个
        public Exchange exchange(){
            return ExchangeBuilder.directExchange("amq.direct").build();
        }
    
        @Bean("yydsQueue")     //定义消息队列
        public Queue queue(){
            return QueueBuilder
              				.nonDurable("yyds")   //非持久化类型
              				.build();
        }
    
        @Bean("binding")
        public Binding binding(@Qualifier("directExchange") Exchange exchange,
                               @Qualifier("yydsQueue") Queue queue){
          	//将我们刚刚定义的交换机和队列进行绑定
            return BindingBuilder
                    .bind(queue)   //绑定队列
                    .to(exchange)  //到交换机
                    .with("my-yyds")   //使用自定义的routingKey
                    .noargs();
        }
    }
    

    接着我们来创建一个消费者,这里我们直接编写在测试用例中:

    @SpringBootTest
    class SpringCloudMqApplicationTests {
    
      	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
        @Resource
        RabbitTemplate template;
    
        @Test
        void publisher() {
          	//使用convertAndSend方法一步到位,参数基本和之前是一样的
          	//最后一个消息本体可以是Object类型,真是大大的方便
            template.convertAndSend("amq.direct", "my-yyds", "Hello World!");
        }
    
    }
    

    现在我们来运行一下这个测试用例:

    image-20220419221426545

    可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

    image-20220419221532673

    现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:

    @Component  //注册为Bean
    public class TestListener {
    
        @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
        public void test(Message message){
            System.out.println(new String(message.getBody()));
        }
    }
    

    接着我们启动服务器:

    image-20220419230223151

    可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

    image-20220419230315376

    接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

    当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:

    @Test
    void publisher() {
      	//会等待消费者消费然后返回响应结果
        Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
        System.out.println("收到消费者响应:"+res);
    }
    

    消费者这边只需要返回一个对应的结果即可:

    @RabbitListener(queues = "yyds")
    public String receiver(String data){
        System.out.println("一号消息队列监听器 "+data);
        return "收到!";
    }
    

    测试没有问题:

    image-20220421142425891

    那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?

    @Data
    public class User {
        int id;
        String name;
    }
    
    @Configuration
    public class RabbitConfiguration {
      	...
    
        @Bean("jacksonConverter")   //直接创建一个用于JSON转换的Bean
        public Jackson2JsonMessageConverter converter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    

    接着我们只需要指定转换器就可以了:

    @Component
    public class TestListener {
    
      	//指定messageConverter为我们刚刚创建的Bean名称
        @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
        public void receiver(User user){  //直接接收User类型
            System.out.println(user);
        }
    }
    

    现在我们直接在管理页面发送:

    {"id":1,"name":"LB"}
    

    image-20220416225912100

    可以看到成功完成了转换,并输出了用户信息:

    image-20220416225829807

    同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:

    @Test
    void publisher() {
        template.convertAndSend("amq.direct", "yyds", new User());
    }
    

    可以看到后台的数据类型为:

    image-20220419232715025

    image-20220416231709750

    这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。

    死信队列

    消息队列中的数据,如果迟迟没有消费者来处理,那么就会一直占用消息队列的空间。比如我们模拟一下抢车票的场景,用户下单高铁票之后,会进行抢座,然后再进行付款,但是如果用户下单之后并没有及时的付款,这张票不可能一直让这个用户占用着,因为你不买别人还要买呢,所以会在一段时间后超时,让这张票可以继续被其他人购买。

    这时,我们就可以使用死信队列,将那些用户超时未付款的或是用户主动取消的订单,进行进一步的处理,以下类型的消息都会被判定为死信:

    • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
    • 消息TTL过期
    • 队列达到最大长度

    image-20220419112336088

    那么如何构建这样的一种使用模式呢?实际上本质就是一个死信交换机+绑定的死信队列,当正常队列中的消息被判定为死信时,会被发送到对应的死信交换机,然后再通过交换机发送到死信队列中,死信队列也有对应的消费者去处理消息。

    这里我们直接在配置类中创建一个新的死信交换机和死信队列,并进行绑定:

    @Configuration
    public class RabbitConfiguration {
    
        @Bean("directDlExchange")
        public Exchange dlExchange(){
            //创建一个新的死信交换机
            return ExchangeBuilder.directExchange("dlx.direct").build();
        }
    
        @Bean("yydsDlQueue")   //创建一个新的死信队列
        public Queue dlQueue(){
            return QueueBuilder
                    .nonDurable("dl-yyds")
                    .build();
        }
    
        @Bean("dlBinding")   //死信交换机和死信队列进绑定
        public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,
                               @Qualifier("yydsDlQueue") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("dl-yyds")
                    .noargs();
        }
    
    		...
    
        @Bean("yydsQueue")
        public Queue queue(){
            return QueueBuilder
                    .nonDurable("yyds")
                    .deadLetterExchange("dlx.direct")   //指定死信交换机
                    .deadLetterRoutingKey("dl-yyds")   //指定死信RoutingKey
                    .build();
        }
      
      	...
    }
    

    接着我们将监听器修改为死信队列监听:

    @Component
    public class TestListener {
        @RabbitListener(queues = "dl-yyds", messageConverter = "jacksonConverter")
        public void receiver(User user){
            System.out.println(user);
        }
    }
    

    配置完成后,我们来尝试启动一下吧,注意启动之前记得把之前的队列给删了,这里要重新定义。

    image-20220420103846981

    队列列表中已经出现了我们刚刚定义好的死信队列,并且yyds队列也支持死信队列发送功能了,现在我们尝试向此队列发送一个消息,但是我们将其拒绝:

    image-20220420105359931

    可以看到拒绝后,如果不让消息重新排队,那么就会变成死信,直接被丢进死信队列中,可以看到在拒绝后:

    image-20220420105455291

    现在我们来看看第二种情况,RabbitMQ支持将超过一定时间没被消费的消息自动删除,这需要消息队列设定TTL值,如果消息的存活时间超过了Time To Live值,就会被自动删除,自动删除后的消息如果有死信队列,那么就会进入到死信队列中。

    现在我们将yyds消息队列设定TTL值(毫秒为单位):

    @Bean("yydsQueue")
    public Queue queue(){
        return QueueBuilder
                .nonDurable("yyds")
                .deadLetterExchange("dlx.direct")
                .deadLetterRoutingKey("dl-yyds")
                .ttl(5000)   //如果5秒没处理,就自动删除
                .build();
    }
    

    现在我们重启测试一下,注意修改了之后记得删除之前的yyds队列:

    image-20220420110317997

    可以看到现在yyds队列已经具有TTL特性了,我们现在来插入一个新的消息:

    image-20220420110504022

    可以看到消息5秒钟之后就不见了,而是被丢进了死信队列中。

    最后我们来看一下当消息队列长度达到最大的情况,现在我们将消息队列的长度进行限制:

    @Bean("yydsQueue")
    public Queue queue(){
        return QueueBuilder
                .nonDurable("yyds")
                .deadLetterExchange("dlx.direct")
                .deadLetterRoutingKey("dl-yyds")
                .maxLength(3)   //将最大长度设定为3
                .build();
    }
    

    现在我们重启一下,然后尝试连续插入4个消息:

    image-20220420135316458

    可以看到yyds消息队列新增了Limit特性,也就是限定长度:

    @Test
    void publisher() {
        for (int i = 0; i < 4; i++) 
            template.convertAndSend("amq.direct", "my-yyds", new User());
    }
    

    image-20220420135419673

    可以看到因为长度限制为3,所以有一个消息直接被丢进了死信队列中,为了能够更直观地观察消息队列的机制,我们为User类新增一个时间字段:

    @Data
    public class User {
        int id;
        String name;
        String date = new Date().toString();
    }
    

    接着每隔一秒钟插入一个:

    @Test
    void publisher() throws InterruptedException {
        for (int i = 0; i < 4; i++) {
            Thread.sleep(1000);
            template.convertAndSend("amq.direct", "my-yyds", new User());
        }
    }
    

    再次进行上述实验,可以发现如果到达队列长度限制,那么每次插入都会把位于队首的消息丢进死信队列,来腾出空间给新来的消息。

    工作队列模式

    **注意:**XX模式只是一种设计思路,并不是指的具体的某种实现,可以理解为实现XX模式需要怎么去写。

    前面我们了解了最简的一个消费者一个生产者的模式,接着我们来了解一下一个生产者多个消费者的情况:

    image-20220420151258324

    实际上这种模式就非常适合多个工人等待新的任务到来的场景,我们的任务有很多个,一个一个丢进消息队列,而此时工人有很多个,那么我们就可以将这些任务分配个各个工人,让他们各自负责一些任务,并且做的快的工人还可以做完成一些(能者多劳)。

    非常简单,我们只需要创建两个监听器即可:

    @Component
    public class TestListener {
        @RabbitListener(queues = "yyds")
        public void receiver(String data){   //这里直接接收String类型的数据
            System.out.println("一号消息队列监听器 "+data);
        }
    
        @RabbitListener(queues = "yyds")
        public void receiver2(String data){
            System.out.println("二号消息队列监听器 "+data);
        }
    }
    

    可以看到我们发送消息时,会自动进行轮询分发:

    image-20220420154602883

    那么如果我们一开始就在消息队列中放入一部分消息在开启消费者呢?

    image-20220420154654901

    可以看到,如果是一开始就存在消息,会被一个消费者一次性全部消耗,这是因为我们没有对消费者的Prefetch count(预获取数量,一次性获取消息的最大数量)进行限制,也就是说我们现在希望的是消费者一次只能拿一个消息,而不是将所有的消息全部都获取。

    image-20220420160253144

    因此我们需要对这个数量进行一些配置,这里我们需要在配置类中定义一个自定义的ListenerContainerFactory,可以在这里设定消费者Channel的PrefetchCount的大小:

    @Resource
    private CachingConnectionFactory connectionFactory;
    
    @Bean(name = "listenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(1);   //将PrefetchCount设定为1表示一次只能取一个
        return factory;
    }
    

    接着我们在监听器这边指定即可:

    @Component
    public class TestListener {
        @RabbitListener(queues = "yyds",  containerFactory = "listenerContainer")
        public void receiver(String data){
            System.out.println("一号消息队列监听器 "+data);
        }
    
        @RabbitListener(queues = "yyds", containerFactory = "listenerContainer")
        public void receiver2(String data){
            System.out.println("二号消息队列监听器 "+data);
        }
    }
    

    现在我们再次启动服务器,可以看到PrefetchCount被限定为1了:

    image-20220420164702864

    再次重复上述的实现,可以看到消息不会被一号消费者给全部抢走了:

    image-20220420164827502

    当然除了去定义两个相同的监听器之外,我们也可以直接在注解中定义,比如我们现在需要10个同样的消费者:

    @Component
    public class TestListener {
        @RabbitListener(queues = "yyds",  containerFactory = "listenerContainer", concurrency = "10")
        public void receiver(String data){
            System.out.println("一号消息队列监听器 "+data);
        }
    }
    

    可以看到在管理页面中出现了10个消费者:

    image-20220420170349298

    至此,有关工作队列模式就讲到这里。

    发布订阅模式

    前面我们已经了解了RabbitMQ客户端的一些基本操作,包括普通的消息模式,接着我们来了解一下其他的模式,首先是发布订阅模式,它支持多种方式:

    image-20220420172252440

    比如我们在阿里云买了云服务器,但是最近快到期了,那么就会给你的手机、邮箱发送消息,告诉你需要去续费了,但是手机短信和邮件发送并不一定是同一个业务提供的,但是现在我们又希望能够都去执行,所以就可以用到发布订阅模式,简而言之就是,发布一次,消费多个。

    实现这种模式其实也非常简单,但是如果使用我们之前的直连交换机,肯定是不行的,我们这里需要用到另一种类型的交换机,叫做fanout(扇出)类型,这时一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。

    这里我们使用默认的交换机:

    image-20220420225300171

    这个交换机是一个fanout类型的交换机,我们就是要它就行了:

    @Configuration
    public class RabbitConfiguration {
    
        @Bean("fanoutExchange")
        public Exchange exchange(){
          	//注意这里是fanoutExchange
            return ExchangeBuilder.fanoutExchange("amq.fanout").build();
        }
    
        @Bean("yydsQueue1")
        public Queue queue(){
            return QueueBuilder.nonDurable("yyds1").build();
        }
    
        @Bean("binding")
        public Binding binding(@Qualifier("fanoutExchange") Exchange exchange,
                               @Qualifier("yydsQueue1") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("yyds1")
                    .noargs();
        }
    
        @Bean("yydsQueue2")
        public Queue queue2(){
            return QueueBuilder.nonDurable("yyds2").build();
        }
    
        @Bean("binding2")
        public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,
                               @Qualifier("yydsQueue2") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("yyds2")
                    .noargs();
        }
    }
    

    这里我们将两个队列都绑定到此交换机上,我们先启动看看效果:

    image-20220420230954785

    绑定没有什么问题,接着我们搞两个监听器,监听一下这两个队列:

    @Component
    public class TestListener {
        @RabbitListener(queues = "yyds1")
        public void receiver(String data){
            System.out.println("一号消息队列监听器 "+data);
        }
    
        @RabbitListener(queues = "yyds2")
        public void receiver2(String data){
            System.out.println("二号消息队列监听器 "+data);
        }
    }
    

    现在我们通过交换机发送消息,看看是不是两个监听器都会接收到消息:

    image-20220420231113658

    可以看到确实是两个消息队列都能够接受到此消息:

    image-20220420231145578

    这样我们就实现了发布订阅模式。

    路由模式

    路由模式实际上我们一开始就已经实现了,我们可以在绑定时指定想要的routingKey只有生产者发送时指定了对应的routingKey才能到达对应的队列。

    image-20220420232826848

    当然除了我们之前的一次绑定之外,同一个消息队列可以多次绑定到交换机,并且使用不同的routingKey,这样只要满足其中一个都可以被发送到此消息队列中:

    @Configuration
    public class RabbitConfiguration {
    
        @Bean("directExchange")
        public Exchange exchange(){
            return ExchangeBuilder.directExchange("amq.direct").build();
        }
    
        @Bean("yydsQueue")
        public Queue queue(){
            return QueueBuilder.nonDurable("yyds").build();
        }
    
        @Bean("binding")   //使用yyds1绑定
        public Binding binding(@Qualifier("directExchange") Exchange exchange,
                               @Qualifier("yydsQueue") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("yyds1")
                    .noargs();
        }
    
        @Bean("binding2")   //使用yyds2绑定
        public Binding binding2(@Qualifier("directExchange") Exchange exchange,
                               @Qualifier("yydsQueue") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("yyds2")
                    .noargs();
        }
    }
    

    启动后我们可以看到管理面板中出现了两个绑定关系:

    image-20220420233606749

    这里可以测试一下,随便使用哪个routingKey都可以。

    主题模式

    实际上这种模式就是一种模糊匹配的模式,我们可以将routingKey以模糊匹配的方式去进行转发。

    image-20220420233721239

    我们可以使用*#来表示:

    • * - 表示任意的一个单词
    • # - 表示0个或多个单词

    这里我们来测试一下:

    @Configuration
    public class RabbitConfiguration {
    
        @Bean("topicExchange")  //这里使用预置的Topic类型交换机
        public Exchange exchange(){
            return ExchangeBuilder.topicExchange("amq.topic").build();
        }
    
        @Bean("yydsQueue")
        public Queue queue(){
            return QueueBuilder.nonDurable("yyds").build();
        }
    
        @Bean("binding")
        public Binding binding2(@Qualifier("topicExchange") Exchange exchange,
                               @Qualifier("yydsQueue") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with("*.test.*")
                    .noargs();
        }
    }
    

    启动项目,可以看到只要是满足通配符条件的都可以成功转发到对应的消息队列:

    image-20220421103753962

    接着我们可以再试试看#通配符。

    除了我们这里使用的默认主题交换机之外,还有一个叫做amq.rabbitmq.trace的交换机:

    image-20220421104035463

    可以看到它也是topic类型的,那么这个交换机是做什么的呢?实际上这是用于帮助我们记录和追踪生产者和消费者使用消息队列的交换机,它是一个内部的交换机,那么如果使用呢?首先创建一个消息队列用于接收记录:

    image-20220421104619325

    接着我们需要在控制台将虚拟主机/test的追踪功能开启:

    sudo rabbitmqctl trace_on -p /test
    

    开启后,我们将此队列绑定到上面的交换机上:

    image-20220421104843224

    image-20220421105141144

    由于发送到此交换机上的routingKey为routing key为 publish.交换机名称 和 deliver.队列名称,分别对应生产者投递到交换机的消息,和消费者从队列上获取的消息,因此这里使用#通配符进行绑定。

    现在我们来测试一下,比如还是往yyds队列发送消息:

    image-20220421105242770

    可以看到在发送消息,并且消费者已经处理之后,trace队列中新增了两条消息,那么我们来看看都是些什么消息:

    image-20220421105528532

    通过追踪,我们可以很明确地得知消息发送的交换机、routingKey、用户等信息,包括信息本身,同样的,消费者在取出数据时也有记录:

    image-20220421105638715

    我们可以明确消费者的地址、端口、具体操作的队列以及取出的消息信息等。

    到这里,我们就已经了解了3种类型的交换机。

    第四种交换机类型

    通过前面的学习,我们已经介绍了三种交换机类型,现在我们来介绍一下第四种交换机类型header,它是根据头部信息来决定的,在我们发送的消息中是可以携带一些头部信息的(类似于HTTP),我们可以根据这些头部信息来决定路由到哪一个消息队列中。

    @Configuration
    public class RabbitConfiguration {
    
        @Bean("headerExchange")  //注意这里返回的是HeadersExchange
        public HeadersExchange exchange(){
            return ExchangeBuilder
                    .headersExchange("amq.headers")  //RabbitMQ为我们预置了两个,这里用第一个就行
                    .build();
        }
    
        @Bean("yydsQueue")
        public Queue queue(){
            return QueueBuilder.nonDurable("yyds").build();
        }
    
        @Bean("binding")
        public Binding binding2(@Qualifier("headerExchange") HeadersExchange exchange,  //这里和上面一样的类型
                               @Qualifier("yydsQueue") Queue queue){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)   //使用HeadersExchange的to方法,可以进行进一步配置
              			//.whereAny("a", "b").exist();   这个是只要存在任意一个指定的头部Key就行
                    //.whereAll("a", "b").exist();   这个是必须存在所有指定的的头部Key
                    .where("test").matches("hello");   //比如我们现在需要消息的头部信息中包含test,并且值为hello才能转发给我们的消息队列
          					//.whereAny(Collections.singletonMap("test", "hello")).match();  传入Map也行,批量指定键值对
        }
    }
    

    现在我们来启动一下试试看:

    image-20220421110926077

    结果发现,消息可以成功发送到消息队列,这就是使用头部信息进行路由。

    这样,我们就介绍完了所有四种类型的交换机。

    集群搭建

    前面我们对于RabbitMQ的相关内容已经基本讲解完毕了,最后我们来尝试搭建一个集群,让RabbitMQ之间相互进行数据复制(镜像模式)稍微有点麻烦,跟着视频走吧。

    可能会用到的一些命令:

    sudo rabbitmqctl stop_app
    sudo rabbitmqctl join_cluster rabbit@ubuntu-server
    sudo rabbitmqctl start_app
    

    实现复制即可。


    SpringCloud 消息组件

    前面我们已经学习了如何使用RabbitMQ消息队列,接着我们来简单介绍一下SpringCloud为我们提供的一些消息组件。

    SpringCloud Stream

    **官方文档:**https://docs.spring.io/spring-cloud-stream/docs/3.2.2/reference/html/

    前面我们介绍了RabbitMQ,了解了消息队列相关的一些操作,但是可能我们会遇到不同的系统在用不同的消息队列,比如系统A用的Kafka、系统B用的RabbitMQ,但是我们现在又没有学习过Kafka,那么怎么办呢?有没有一种方式像JDBC一样,我们只需要关心SQL和业务本身,而不用关心数据库的具体实现呢?

    SpringCloud Stream能够做到,它能够屏蔽底层实现,我们使用统一的消息队列操作方式就能操作多种不同类型的消息队列。

    image-20220421225215709

    它屏蔽了RabbitMQ底层操作,让我们使用统一的Input和Output形式,以Binder为中间件,这样就算我们切换了不同的消息队列,也无需修改代码,而具体某种消息队列的底层实现是交给Stream在做的。

    这里我们创建一个新的项目来测试一下:

    image-20220421215534386

    依赖如下:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>2021.0.1</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    
    <dependencies>
        <!--  RabbitMQ的Stream实现  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
    

    首先我们来编写一下生产者,首先是配置文件:

    server:
      port: 8001
    spring:
      cloud:
        stream:
          binders:   #此处配置要绑定的rabbitmq的服务信息
            local-server: #绑定名称,随便起一个就行
              type: rabbit #消息组件类型,这里使用的是RabbitMQ,就填写rabbit
              environment:  #服务器相关信息,按照下面的方式填写就行,爆红别管
                spring:
                  rabbitmq:
                    host: 192.168.0.6
                    port: 5672
                    username: admin
                    password: admin
                    virtual-host: /test
           bindings:
            test-out-0:
              destination: test.exchange
    

    接着我们来编写一个Controller,一会访问一次这个接口,就向消息队列发送一个数据:

    @RestController
    public class PublishController {
    
        @Resource
        StreamBridge bridge;  //通过bridge来发送消息
    
        @RequestMapping("/publish")
        public String publish(){
            //第一个参数其实就是RabbitMQ的交换机名称(数据会发送给这个交换机,到达哪个消息队列,不由我们决定)
          	//这个交换机的命名稍微有一些规则:
          	//输入:    <名称> + -in- + <index>
          	//输出:    <名称> + -out- + <index>
          	//这里我们使用输出的方式,来将数据发送到消息队列,注意这里的名称会和之后的消费者Bean名称进行对应
            bridge.send("test-out-0", "HelloWorld!");
            return "消息发送成功!"+new Date();
        }
    }
    

    现在我们来将生产者启动一下,访问一下接口:

    image-20220421220955906

    可以看到消息成功发送,我们来看看RabbitMQ这边的情况:

    image-20220421221027145

    新增了一个test-in-0交换机,并且此交换机是topic类型的:

    image-20220421221107547

    但是目前没有任何队列绑定到此交换机上,因此我们刚刚发送的消息实际上是没有给到任何队列的。

    接着我们来编写一下消费者,消费者的编写方式比较特别,只需要定义一个Consumer就可以了,其他配置保持一致:

    @Component
    public class ConsumerComponent {
    
        @Bean("test")   //注意这里需要填写我们前面交换机名称中"名称",这样生产者发送的数据才会正确到达
        public Consumer<String> consumer(){
            return System.out::println;
        }
    }
    

    配置中需要修改一下目标交换机:

    server:
      port: 8002
    spring:
      cloud:
        stream:
        	...
          bindings:
          	#因为消费者是输入,默认名称为 方法名-in-index,这里我们将其指定为我们刚刚定义的交换机
            test-in-0:
              destination: test.exchange
    

    接着我们直接启动就可以了,可以看到启动之后,自动为我们创建了一个新的队列:

    image-20220421221733723

    而这个队列实际上就是我们消费者等待数据到达的队列:

    image-20220421221807577

    可以看到当前队列直接绑定到了我们刚刚创建的交换机上,并且routingKey是直接写的#,也就是说一会消息会直接过来。

    现在我们再来访问一些消息发送接口:

    image-20220421221938730

    image-20220421221952663

    可以看到消费者成功地进行消费了:

    image-20220421222011924

    这样,我们就通过使用SpringCloud Stream来屏蔽掉底层RabbitMQ来直接进行消息的操作了。

    SpringCloud Bus

    **官方文档:**https://cloud.spring.io/spring-cloud-bus/reference/html/

    实际上它就相当于是一个消息总线,可用于向各个服务广播某些状态的更改(比如云端配置更改,可以结合Config组件实现动态更新配置,当然我们前面学习的Nacos其实已经包含这个功能了)或其他管理指令。

    这里我们也是简单使用一下吧,Bus需要基于一个具体的消息队列实现,比如RabbitMQ或是Kafka,这里我们依然使用RabbitMQ。

    我们将最开始的微服务拆分项目继续使用,比如现在我们希望借阅服务的某个接口调用时,能够给用户服务和图书服务发送一个通知,首先是依赖:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    

    接着我们只需要在配置文件中将RabbitMQ的相关信息配置:

    spring:
      rabbitmq:
        addresses: 192.168.0.6
        username: admin
        password: admin
        virtual-host: /test
    management:
      endpoints:
        web:
          exposure:
            include: "*"    #暴露端点,一会用于提醒刷新
    

    然后启动我们的三个服务器,可以看到在管理面板中:

    image-20220421232118952

    新增了springCloudBug这样一个交换机,并且:

    image-20220421232146646

    自动生成了各自的消息队列,这样就可以监听并接收到消息了。

    现在我们访问一个端口:

    image-20220421233200950

    此端口是用于通知别人进行刷新,可以看到调用之后,消息队列中成功出现了一次消费:

    image-20220421233302328

    现在我们结合之前使用的Config配置中心,来看看是不是可以做到通知之后所有的配置动态刷新了。

    展开全文
  • 消息队列(MQ)

    千次阅读 2021-11-13 14:20:18
    一、消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。...
  • 十二张图,踹开消息队列的大门

    千次阅读 多人点赞 2021-07-08 22:18:24
    消息队列,应用广泛,面试必问。一篇文章,十二张图,我们一起走进消息队列的世界。
  • 搭载FreeRTOS系统,任务一向消息队列填充数字,任务二从消息队列提取数据并发送到串口1,同时有LED灯跟随数据传送亮灭。 这里我们的课程设计内容。 对于STM32和FreeRTOS初学者以及想了解RTOS的任务机制与消息队列的...
  • 2、多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息。发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息,消息一旦被读出就从队列中删除。 二、结构 1、消息队列中消息本身...
  • 什么是消息队列

    万次阅读 多人点赞 2021-08-17 20:13:42
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...
  • 开源队列产品对比 云队列产品对比 调研总结 1.针对自建队列产品: 2.针对云队列产品: 3.综合考虑: 开源队列产品对比 队列名称 ActiveMQ RabbitMQ RocketMQ Kafka 定位 非日志的可靠消息...
  • 一、队列简介 ...由于队列用来传递消息的,所以也称为消息队列。FreeRTOS中的信号量的也是依据队列实现的!所以有必要深入的了解FreeRTOS的队列。 1、数据存储 通常队列采用先进先出(FIFO)的存储缓冲机制,
  • 消息队列,问题与处理方案梳理

    千次阅读 2022-04-10 15:31:19
    1、如何保证消息不被重复消费? 一、为什么会出现重复消费的问题? RabbitMQ、RocketMQ、Kafka 都有可能出现重复消费的问题,导致重复消费的原因可能出现在生产者,也可能出现在 MQ 或 消费者。这里说的重复消费问题...
  • RT-Thread之消息队列

    千次阅读 2022-03-10 14:59:39
    消息队列是常用的线程间通信方式,是一种异步的通信方式。消息队列可以应用于多种场合:线程间的消息交换、使用串口接收不定长数据等。 消息队列的基本概念 队列又称消息队列,是一种常用于线程间通信的数据...
  • 通俗易懂讲消息队列

    万次阅读 多人点赞 2020-03-05 17:00:21
    一、什么是消息队列消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。 消息队列,一般我们会简称它为MQ(Message Queue),嗯...
  • 15 Redis 实现消息队列

    万次阅读 2021-12-06 21:13:44
    15 Redis 实现消息队列前言一、消息队列的消息存取需求二、基于 Streams 的消息队列解决方案总结 前言 消息队列要能支持组件通信消息的快速读写,而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能...
  • RT-Thread快速入门-消息队列

    千次阅读 2022-03-18 12:47:08
    首发,公众号【一起学嵌入式】 哈哈,RT-Thread 快速入门系列文章登上官方论坛 “今日聚焦” 了,能够得到官方认可,属实受宠若惊。感谢 RT-Thread 的认可,感谢官方提供的...消息队列在实际项目中应用较多,建议初.
  • RabbitMQ—消息队列

    千次阅读 2022-01-26 16:09:57
    RabbitMQ是一个在AMQP( 一个提供统一消息服务的应用层标准高级消息队列协议)基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 主要特性: 保证可靠性:使用一些机制来保证可靠性,如...
  • MQ消息队列

    千次阅读 2022-02-22 10:27:16
    什么是消息队列 我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。 消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过异步处理提高...
  • 消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。“消息队列”是在...
  • 常用6种消息队列介绍和对比

    千次阅读 2021-03-25 14:37:55
    消息队列是分布式应用间交换信息的重要组件,消息队列可驻留在内存或磁盘上, 队列可以存储消息直到它们被应用程序读走。 通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要...
  • FreeRTOS消息队列

    万次阅读 多人点赞 2019-01-31 17:55:04
    任务或者中断服务程序都可以给消息队列发送消息,当发送消息时,如果队列未满或者允许覆盖入队, FreeRTOS 会将消息拷贝到消息队列队尾,否则,会根据用户指定的阻塞超时时间进行阻塞,在这段时间中,如果队列一直不...
  • RabbitMQ消息队列常见面试题总结

    万次阅读 多人点赞 2021-03-22 02:46:39
    RabbitMQ消息队列常见面试题总结; 1、什么是消息队列消息队列的优缺点? 2、Kafka、ActiveMQ、RabbitMQ、RocketMQ的区别? 3、如何保证消息不被重复消费? 4、如何保证消息不丢失,进行可靠性传输? 5、如何保证...
  • 消息队列内容解析

    千次阅读 2022-02-07 08:31:47
    消息队列应用背景 消息队列常见的应用场景有:异步,解耦,削锋。 1. 异步处理数据 异步可以类举生活中的例子,比如说是取送快递,如果快递员需要直接对用户进行签收,那么效率会大大降低,而引入快递柜(消息队列...
  • 消息队列的消息积压解决办法

    千次阅读 2022-03-16 15:57:51
    1、可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办? 2、或者是这整个就积压了几个小时,你这个时候怎么办? 3、或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 587,950
精华内容 235,180
关键字:

消息队列