rabbitmq_rabbitmq面试题 - CSDN
rabbitmq 订阅
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 展开全文
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
信息
开发公司
Rabbit
简    称
MQ
构    成
以高性能、健壮以及可伸缩性出名的 Erlang 写成
释    义
一种程序对程序的通信方法
中文名
消息队列
外文名
Message Queue
rabbitmq简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
收起全文
精华内容
参与话题
  • 2小时学会SpringBoot整合RabbitMQ详解

    千人学习 2020-02-20 11:37:46
     课程目标: - 深入理解RabbitMQ发送消息的模式 - 熟练使用RabbitMQ五种消息模型 - 如何确定RabbitMQ的消息是否发送成功 - 如何确定RabbitMQ发送的消息是否消费成功。 - 项目实战中如何使用RabbitMQ,以及...
  • RabbitMQ —— 介绍

    万次阅读 2020-07-28 11:03:22
        MQ 全称为 Message Queue,即消息队列,RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue Protocol ,高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列...

    前文

    消息中间件 —— 简介

    介绍

        MQ 全称为 Message Queue,即消息队列,RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue Protocol ,高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛,RabbitMQ 官方地址 https://www.rabbitmq.com/

    什么是 AMQP?

        AMQP,即 AMQP(Advanced Message Queue Protocol ,高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制,Erlang 中的实现有 RabbitMQ 等

    JMS 是什么?

        即 Java 消息服务,Java 消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持

    应用场景

    1、任务异步处理

    • 将不需要同步处理的并且耗时长的操作由队列通知消息接收方进行异步处理,提高了应用程序的响应时间

    2、应用程序解耦合

    • MQ 相当于一个中介,生产方通过 MQ 与消费方进行交互,它将应用程序进行解耦合

    市场上还有哪些 MQ?

    ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Redis

    为什么使用 RabbitMQ?

    • 使用简单,功能强大
    • 基于 AMQP 协议
    • 社区活跃,文档完善
    • 高并发性能好,这主要得益于 Erlang 语言
    • SpringBoot 默认已集成 RabbitMQ

    快速入门

    RabbitMQ 的工作原理

    在这里插入图片描述
    组成部分说明如下:

    • Broker:消息队列服务进程,此进程包括两个部分:Exchange 和 Queue
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方
    • Producer:消息生产者,即生产方客户端,生产方客户端将消费发送到 MQ
    • Consumer:消息消费者,即消费方客户端,接收 MQ 转发的消息

    消息发布接收流程

    发送消息

    1、生产者和 Broker 建立 TCP 连接
    2、生产者和 Broker 建立通道
    3、生产者通过通道消息发送 Broker,由 Exchange 将消息进行转发
    4、Exchange 将消息转发到指定的 Queue(队列)

    接收消息

    1、消费者和 Broker 建立 TCP 连接
    2、消费者和 Broker 建立通道
    3、消费者监听指定的 Queue(队列)
    4、当有消息到达 Queue 时 Broker 默认将消息推送给消费者
    5、消费者接收到消息


    相关 MQ 文章阅读

    ActiveMQ 下载、安装

    ActiveMQ —— Java 连接 ActiveMQ(点对点)

    ActiveMQ —— Java 连接 ActiveMQ(发布订阅 Topic)

    ActiveMQ —— Broker

    ActiveMQ —— Spring 整合 ActiveMQ

    SpringBoot 整合 ActiveMQ

    展开全文
  • RabbitMQ教程-RabbitMQ详解

    万次阅读 多人点赞 2020-09-21 16:52:17
    RabbitMQ教程。RabbitMQ详解。消息队列(MQ),本质是个队列,队列中存放的内容是message。MQ用于不同进程Process/线程Thread之间通信。本文介绍RabbitMQ的使用。RabbitMQ实战教程。

    仅需一次订阅RabbitMQ,作者所有专栏都能看

    推荐【SpringBoothttps://blog.csdn.net/hellozpc/article/details/107095951
    推荐【SpringCloudhttps://blog.csdn.net/hellozpc/article/details/83692496
    推荐【Mybatishttps://blog.csdn.net/hellozpc/article/details/80878563

    **欢迎关注**
    **扫一扫**

    RabbitMQ实战教程

    1.什么是MQ

    • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
      其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    • 关于消息队列的详细介绍请参阅:
      《Java帝国之消息队列》
      《一个故事告诉你什么是消息队列》
      《到底什么时候该使用MQ》

    • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq

    • 本教程pdf及代码下载地址
      代码:https://download.csdn.net/download/zpcandzhj/10585077
      教程:https://download.csdn.net/download/zpcandzhj/10585092

    2.RabbitMQ

    2.1.RabbitMQ的简介

    这里写图片描述
    开发语言:Erlang – 面向并发的编程语言。

    这里写图片描述

    2.1.1.AMQP
    AMQP是消息队列的一个协议。

    这里写图片描述

    2.2.官网

    这里写图片描述

    2.3.MQ的其他产品

    这里写图片描述

    2.4.学习5种队列

    这里写图片描述

    2.5.安装文档

    这里写图片描述

    3.搭建RabbitMQ环境

    3.1.下载

    下载地址:http://www.rabbitmq.com/download.html

    3.2.windows下安装

    3.2.1.安装Erlang
    下载:http://www.erlang.org/download/otp_win64_17.3.exe
    安装:
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    3.2.2.安装RabbitMQ
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    开始菜单里出现如下选项:
    这里写图片描述

    启动、停止、重新安装等。

    3.2.3.启用管理工具
    1、双击这里写图片描述
    2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
    rabbitmq-plugins enable rabbitmq_management
    这里写图片描述

    这样就启动了管理工具,可以试一下命令:
    停止:net stop RabbitMQ
    启动:net start RabbitMQ

    3、在浏览器中输入地址查看:http://127.0.0.1:15672/
    这里写图片描述
    4、使用默认账号登录:guest/ guest

    3.3.Linux下安装

    3.3.1.安装Erlang
    3.3.2.添加yum支持
    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    使用yum安装:
    sudo yum install erlang
    这里写图片描述

    3.3.3.安装RabbitMQ
    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    3.3.4.启动、停止
    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    3.3.5.设置开机启动
    chkconfig rabbitmq-server on
    3.3.6.设置配置文件
    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

    mv rabbitmq.config.example rabbitmq.config
    3.3.7.开启用户远程访问
    vi /etc/rabbitmq/rabbitmq.config
    这里写图片描述
    注意要去掉后面的逗号。
    3.3.8.开启web界面管理工具
    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart
    3.3.9.防火墙开放15672端口
    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /etc/rc.d/init.d/iptables save

    3.4.安装的注意事项

    1、推荐使用默认的安装路径
    2、系统用户名必须是英文
    Win10改名字非常麻烦,具体方法百度
    这里写图片描述
    3、计算机名必须是英文
    这里写图片描述
    4、系统的用户必须是管理员

    如果安装失败应该如何解决:
    1、重装系统 – 不推荐
    2、将RabbitMQ安装到linux虚拟机中
    a)推荐
    3、使用别人安装好的RabbitMQ服务
    a)只要给你开通一个账户即可。
    b)使用公用的RabbitMQ服务,在192.168.50.22
    c)推荐

    常见错误:
    这里写图片描述

    3.5.安装完成后操作

    1、系统服务中有RabbitMQ服务,停止、启动、重启
    这里写图片描述
    2、打开命令行工具
    这里写图片描述
    如果找不到命令行工具,直接cd到相应目录:
    这里写图片描述
    输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
    这里写图片描述
    查看管理页面
    这里写图片描述
    通过默认账户 guest/guest 登录
    如果能够登录,说明安装成功。
    这里写图片描述

    4.添加用户

    4.1.添加admin用户

    这里写图片描述

    4.2.用户角色

    1、超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2、监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3、策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4、普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5、其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4.3.创建Virtual Hosts

    这里写图片描述

    选中Admin用户,设置权限:
    这里写图片描述
    看到权限已加:
    这里写图片描述

    4.4.管理界面中的功能

    这里写图片描述

    这里写图片描述

    5.学习五种队列

    这里写图片描述

    5.1.导入my-rabbitmq项目

    项目下载地址:
    https://download.csdn.net/download/zpcandzhj/10585077
    这里写图片描述

    5.2.简单队列

    5.2.1.图示
    这里写图片描述

    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。
    5.2.2.导入RabbitMQ的客户端依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>3.4.1</version>
    </dependency>
    

    5.2.3.获取MQ的连接

    package com.zpc.rabbitmq.util;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    5.2.4.生产者发送消息到队列

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    5.2.5.管理工具中查看消息
    这里写图片描述

    点击上面的队列名称,查询具体的队列中的信息:
    这里写图片描述
    5.2.6.消费者从队列中获取消息

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    5.3.Work模式

    这里写图片描述

    5.3.1.图示
    这里写图片描述

    一个生产者、2个消费者。

    一个消息只能被一个消费者获取。
    5.3.2.消费者1

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [y] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态,注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.3.消费者2

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
                //下面这行注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.4.生产者
    向队列中发送100条消息。

    package com.zpc.rabbitmq.work;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 10);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    5.3.5.测试
    测试结果:
    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

    • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
      RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

    • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
      basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    • 2个概念

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

    5.4.Work模式的“能者多劳”

    打开上述代码的注释:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

    同时改为手动确认:

    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    测试:
    消费者1比消费者2获取的消息更多。

    5.5.消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:
    这里写图片描述

    自动模式:
    这里写图片描述

    5.6.订阅模式

    这里写图片描述
    5.6.1.图示
    这里写图片描述

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
    这里写图片描述

    5.6.2.消息的生产者(看作是后台系统)
    向交换机中发送消息。

    package com.zpc.rabbitmq.subscribe;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
    5.6.3.消费者1(看作是前台系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work1";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.4.消费者2(看作是搜索系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work2";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.5.测试
    测试结果:
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    在管理工具中查看队列和交换机的绑定关系:

    这里写图片描述

    5.7.路由模式

    这里写图片描述
    5.7.1.图示
    这里写图片描述

    5.7.2.生产者
    这里写图片描述
    5.7.3.消费者1(假设是前台系统)
    这里写图片描述
    5.7.4.消费2(假设是搜索系统)
    这里写图片描述

    5.8.主题模式(通配符模式)

    这里写图片描述

    这里写图片描述

    5.8.1.图示
    这里写图片描述
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    5.8.2.生产者

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 消息内容
            String message = "Hello World!!";
            channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    5.8.3.消费者1(前台系统)

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_1";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.8.4.消费者2(搜索系统)

    package com.zpc.rabbitmq.topic;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_2";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    6.Spring-Rabbit

    6.1.Spring项目

    http://spring.io/projects

    这里写图片描述

    6.2.简介

    这里写图片描述
    这里写图片描述

    6.3.使用

    6.3.1.消费者

    package com.zpc.rabbitmq.spring;
    
    /**
     * 消费者
     *
     * @author Evan
     */
    public class Foo {
    
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("\n消费者: " + foo + "\n");
        }
    }
    

    6.3.2.生产者

    package com.zpc.rabbitmq.spring;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class SpringMain {
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, 鸟鹏!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    6.3.3.配置文件
    1、定义连接工厂

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
       host="127.0.0.1" port="5672" username="admin" password="admin"
       virtual-host="testhost" />
    

    2、定义模板(可以指定交换机或队列)

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    

    3、定义队列、交换机、以及完成队列和交换机的绑定

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="zpcQueue" auto-declare="true"/>
    
    <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
       <rabbit:bindings>
          <rabbit:binding queue="zpcQueue"/>
       </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    4、定义监听

    <rabbit:listener-container connection-factory="connectionFactory">
       <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
    </rabbit:listener-container>
    
    <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    

    5、定义管理,用于管理队列、交换机等:

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />
    

    完整配置文件rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
       <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
          exchange="fanoutExchange" routing-key="foo.bar" /> -->
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义队列,自动声明 -->
       <rabbit:queue name="zpcQueue" auto-declare="true"/>
       
       <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
       <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
          <rabbit:bindings>
             <rabbit:binding queue="zpcQueue"/>
          </rabbit:bindings>
       </rabbit:fanout-exchange>
       
    <!--   <rabbit:topic-exchange name="myExchange">
          <rabbit:bindings>
             <rabbit:binding queue="myQueue" pattern="foo.*" />
          </rabbit:bindings>
       </rabbit:topic-exchange> -->
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
       </rabbit:listener-container>
    
       <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    
    </beans>
    

    6.4.持久化交换机和队列

    这里写图片描述

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。

    如何选择持久化?非持久化? – 看需求。

    欢迎关注公众号「程猿薇茑」

    7.Spring集成RabbitMQ一个完整案例

    创建三个系统A,B,C
    A作为生产者,B、C作为消费者(B,C作为web项目启动)
    项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

    7.1.在A系统中发送消息到交换机

    7.1.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.zpc</groupId>
       <artifactId>myrabbitA</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>
       <name>myrabbit</name>
    
       <dependencies>
          <dependency>
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-rabbit</artifactId>
             <version>1.4.0.RELEASE</version>
          </dependency>
    
          <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
          </dependency>
       </dependencies>
    </project>
    

    7.1.2.队列和交换机的绑定关系
    实现:
    1、在配置文件中将队列和交换机完成绑定
    2、可以在管理界面中完成绑定
    a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
    b)管理更加灵活
    c)更容易对绑定关系的权限管理,流程管理
    本例选择第2种方式
    7.1.3.配置
    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
       <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
       <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
       <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
    </beans>
    

    7.1.4.消息内容
    方案:
    1、消息内容使用对象做json序列化发送
    a)数据大
    b)有些数据其他人是可能用不到的
    2、发送特定的业务字段,如id、操作类型

    7.1.5.实现
    生产者MsgSender.java:

    package com.zpc.myrabbit;
    
    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    
    /**
     * 消息生产者
     */
    public class MsgSender {
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
    
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            //发送消息
            Map<String, Object> msg = new HashMap<String, Object>();
            msg.put("type", "1");
            msg.put("date", date);
            template.convertAndSend("type2", JSON.toJSONString(msg));
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    7.2.在B系统接收消息

    7.2.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zpc</groupId>
        <artifactId>myrabbitB</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
    
        <name>myrabbit</name>
        <properties>
            <spring.version>4.1.3.RELEASE</spring.version>
            <fastjson.version>1.2.46</fastjson.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>${project.artifactId}</finalName>
            <plugins>
                <!-- web层需要配置Tomcat插件 -->
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <path>/testRabbit</path>
                        <uriEncoding>UTF-8</uriEncoding>
                        <port>8081</port>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    7.2.2.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义B系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testB" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.2.3.具体处理逻辑

    public class Listener {
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者B开始处理消息: " + msg + "\n");
        }
    }
    

    7.2.4.在界面管理工具中完成绑定关系
    选中定义好的交换机(exchange)
    这里写图片描述
    1)direct
    这里写图片描述
    2)fanout
    这里写图片描述
    3)topic
    这里写图片描述

    7.3.在C系统中接收消息

    (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

    7.3.1.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义C系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testC" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.3.2.处理业务逻辑

    public class Listener {
    
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者C开始处理消息: " + msg + "\n");
        }
    }
    

    7.3.3.在管理工具中绑定队列和交换机
    见7.2.4

    7.3.4.测试
    分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

    8.Springboot集成RabbitMQ

    8.1.简单队列

    1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置application.properties文件
    配置rabbitmq的安装地址、端口以及账户信息

    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    

    3、配置rabbitmq队列

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    }
    

    4、rabbitmq发送者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    }
    

    5、rabbitmq接收者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6、测试

    package com.zpc.rabbitmq;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    }
    

    8.2.多对多使用(Work模式)

    注册两个Receiver:

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    
    }
    
    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }
    
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    

    8.3.Topic Exchange(主题模式)

    • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    1)配置队列,绑定交换机

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TopicRabbitConfig {
    
        final static String message = "q_topic_message";
        final static String messages = "q_topic_messages";
    
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
        @Bean
        public Queue queueMessages() {
            return new Queue(TopicRabbitConfig.messages);
        }
    
        /**
         * 声明一个Topic类型的交换机
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("mybootexchange");
        }
    
        /**
         * 绑定Q到交换机,并且指定routingKey
         * @param queueMessage
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
    }
    

    2)创建2个消费者
    q_topic_message 和q_topic_messages

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_message")
    public class Receiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_messages")
    public class Receiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3)消息发送者(生产者)

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
        }
    
    
        public void send2() {
            String context = "hi, i am messages 2";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
        }
    }
    

    send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
    4)测试

    package com.zpc.rabbitmq.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTopicTest {
    
        @Autowired
        private MsgSender msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send1();
        }
    
        @Test
        public void send2() throws Exception {
            msgSender.send2();
        }
    }
    

    8.4.Fanout Exchange(订阅模式)

    • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
      1)配置队列,绑定交换机
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutRabbitConfig {
    
        @Bean
        public Queue aMessage() {
            return new Queue("q_fanout_A");
        }
    
        @Bean
        public Queue bMessage() {
            return new Queue("q_fanout_B");
        }
    
        @Bean
        public Queue cMessage() {
            return new Queue("q_fanout_C");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("mybootfanoutExchange");
        }
    
        @Bean
        Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(aMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(bMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(cMessage).to(fanoutExchange);
        }
    }
    

    2)创建3个消费者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_A")
    public class ReceiverA {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_B")
    public class ReceiverB {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_C")
    public class ReceiverC {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3)生产者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSenderFanout {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
        }
    }
    

    4)测试

    package com.zpc.rabbitmq.fanout;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitFanoutTest {
    
        @Autowired
        private MsgSenderFanout msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send();
        }
    }
    

    结果如下,三个消费者都收到消息:
    AReceiver : hi, fanout msg
    CReceiver : hi, fanout msg
    BReceiver : hi, fanout msg

    9.总结

    推荐springCloud教程:
    https://blog.csdn.net/hellozpc/article/details/83692496

    推荐Springboot2.0教程:
    https://blog.csdn.net/hellozpc/article/details/82531834

    **欢迎关注公众号【程猿薇茑】**
    **微信扫一扫**
    展开全文
  • 消息中间件MQ与RabbitMQ面试题(2020最新版)

    万次阅读 多人点赞 2020-05-06 14:11:22
    文章目录为什么使用MQ?MQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?你们公司生产环境用的是...rabbitmq 的使用场景RabbitMQ基本概念RabbitMQ的工作模式如何保证RabbitMQ消息的顺序性?消息如何分发?消...

    Java面试总结汇总,整理了包括Java基础知识,集合容器,并发编程,JVM,常用开源框架Spring,MyBatis,数据库,中间件等,包含了作为一个Java工程师在面试中需要用到或者可能用到的绝大部分知识。欢迎大家阅读,本人见识有限,写的博客难免有错误或者疏忽的地方,还望各位大佬指点,在此表示感激不尽。文章持续更新中…

    序号 内容 链接地址
    1 Java基础知识面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390612
    2 Java集合容器面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588551
    3 Java异常面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390689
    4 并发编程面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104863992
    5 JVM面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390752
    6 Spring面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397516
    7 Spring MVC面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397427
    8 Spring Boot面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397299
    9 Spring Cloud面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397367
    10 MyBatis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/101292950
    11 Redis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/103522351
    12 MySQL数据库面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104778621
    13 消息中间件MQ与RabbitMQ面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588612
    14 Dubbo面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390006
    15 Linux面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588679
    16 Tomcat面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397665
    17 ZooKeeper面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397719
    18 Netty面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104391081
    19 架构设计&分布式&数据结构与算法面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/105870730

    为什么使用MQ?MQ的优点

    简答

    • 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
    • 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
    • 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
    • 日志处理 - 解决大量日志传输。
    • 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

    详答

    主要是:解耦、异步、削峰。

    解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

    就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

    异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。

    削峰:减少高峰时期对服务器压力。

    消息队列有什么优缺点?RabbitMQ有什么优缺点?

    优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

    缺点有以下几个:

    系统可用性降低

    本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;

    系统复杂度提高

    加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

    一致性问题

    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

    所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

    你们公司生产环境用的是什么消息中间件?

    这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。

    举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。

    但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。

    然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。

    另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

    而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。

    除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。

    但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

    然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。

    而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

    另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。

    但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。

    因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。

    Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

    ActiveMQ RabbitMQ RocketMQ Kafka ZeroMQ
    单机吞吐量 比RabbitMQ低 2.6w/s(消息做持久化) 11.6w/s 17.3w/s 29w/s
    开发语言 Java Erlang Java Scala/Java C
    主要维护者 Apache Mozilla/Spring Alibaba Apache iMatix,创始人已去世
    成熟度 成熟 成熟 开源版本不够成熟 比较成熟 只有C、PHP等版本成熟
    订阅形式 点对点(p2p)、广播(发布-订阅) 提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式 基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式 基于topic以及按照topic进行正则匹配的发布订阅模式 点对点(p2p)
    持久化 支持少量堆积 支持少量堆积 支持大量堆积 支持大量堆积 不支持
    顺序消息 不支持 不支持 支持 支持 不支持
    性能稳定性 一般 较差 很好
    集群方式 支持简单集群模式,比如’主-备’,对高级集群模式支持不好。 支持简单集群,'复制’模式,对高级集群模式支持不好。 常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master 天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave 不支持
    管理界面 一般 较好 一般

    综上,各种对比之后,有如下建议:

    一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

    后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

    不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

    所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

    如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

    MQ 有哪些常见问题?如何解决这些问题?

    MQ 的常见问题有:

    1. 消息的顺序问题
    2. 消息的重复问题

    消息的顺序问题

    消息有序指的是可以按照消息的发送顺序来消费。

    假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?

    img

    解决方案:

    (1)保证生产者 - MQServer - 消费者是一对一对一的关系

    img

    缺陷:

    • 并行度就会成为消息系统的瓶颈(吞吐量不够)
    • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。
    • 不关注乱序的应用实际大量存在
    • 队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。

    消息的重复问题

    造成消息重复的根本原因是:网络不可达。

    所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

    消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

    什么是RabbitMQ?

    RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件

    rabbitmq 的使用场景

    (1)服务间异步通信

    (2)顺序消费

    (3)定时任务

    (4)请求削峰

    RabbitMQ基本概念

    • Broker: 简单来说就是消息队列服务器实体
    • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
    • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
    • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
    • VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
    • Producer: 消息生产者,就是投递消息的程序
    • Consumer: 消息消费者,就是接受消息的程序
    • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

    由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

    RabbitMQ的工作模式

    一.simple模式(即最简单的收发模式)

    img

    1.消息产生消息,将消息放入队列

    2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

    二.work工作模式(资源的竞争)

    img

    1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

    三.publish/subscribe发布订阅(共享资源)

    img

    1、每个消费者监听自己的队列;

    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

    四.routing路由模式

    img

    1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

    2.根据业务功能定义路由字符串

    3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

    4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

    五.topic 主题模式(路由模式的一种)

    img

    1.星号井号代表通配符

    2.星号代表多个单词,井号代表一个单词

    3.路由功能添加模糊匹配

    4.消息产生者产生消息,把消息交给交换机

    5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

    (在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

    如何保证RabbitMQ消息的顺序性?

    拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

    消息如何分发?

    若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能

    消息怎么路由?

    消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

    常用的交换器主要分为一下三种:

    fanout:如果交换器收到消息,将会广播到所有绑定的队列上

    direct:如果路由键完全匹配,消息就被投递到相应的队列

    topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

    消息基于什么传输?

    由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

    如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

    先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;

    但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

    针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

    比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

    假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

    如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

    发送方确认模式

    将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

    一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

    如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

    发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

    接收方确认机制

    消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

    这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

    下面罗列几种特殊情况

    • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
    • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

    如何保证RabbitMQ消息的可靠传输?

    消息不可靠的情况可能是消息丢失,劫持等原因;

    丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

    生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

    transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

    confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;

    rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

    如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

    消息队列丢数据:消息持久化。

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

    这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

    这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    那么如何持久化呢?

    这里顺便说一下吧,其实也很容易,就下面两步

    1. 将queue的持久化标识durable设置为true,则代表是一个持久的队列
    2. 发送消息的时候将deliveryMode=2

    这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

    消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

    消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

    如果这时处理消息失败,就会丢失该消息;

    解决方案:处理消息成功后,手动回复确认消息。

    为什么不应该对所有的 message 都使用持久化机制?

    首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

    其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。

    所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

    如何保证高可用的?RabbitMQ 的集群

    RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

    单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式

    普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

    镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

    如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

    消息积压处理办法:临时紧急扩容:

    先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
    新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
    然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
    接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
    等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
    MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

    mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

    设计MQ思路

    比如说这个消息队列系统,我们从以下几个角度来考虑一下:

    首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

    其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。

    其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。

    能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。

    展开全文
  • RabbitMQ基础概念详细介绍

    万次阅读 多人点赞 2016-12-15 19:28:53
    引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。...RabbitMQ

    转至:http://www.ostest.cn/archives/497

    引言

    你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。
    消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。

    RabbitMQ简介

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
    下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

    ConnectionFactory、Connection、Channel

    ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
    Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

    Queue

    Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
    queue

    RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
    qq

    多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
    2014-2-21 9-46-43

    Message acknowledgment

    在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
    这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…

    另外pub message是没有ack的。

    Message durability

    如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

    Prefetch count

    前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
    2014-2-21 9-49-08

    Exchange

    在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
    2014-2-21 9-51-03
    Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
    RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。

    routing key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
    在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
    RabbitMQ为routing key设定的长度限制为255 bytes。

    Binding

    RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
    2014-2-21 9-52-46

    Binding key

    在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
    在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
    binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

    Exchange Types

    RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。

    fanout

    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
    2014-2-21 9-54-26
    上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

    direct

    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
    2014-2-21 9-55-20
    以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

    topic

    前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

    • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    • binding key与routing key一样也是句点号“. ”分隔的字符串
    • binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    2014-2-21 9-57-37
    以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

    headers

    headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
    在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
    该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

    RPC

    MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
    但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
    2014-2-21 9-59-04
    RabbitMQ中实现RPC的机制是:

    • 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
    • 服务器端收到消息并处理
    • 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
    • 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

    总结

    本文介绍了RabbitMQ中个人认为最重要的概念,充分利用RabbitMQ提供的这些功能就可以处理我们绝大部分的异步业务了。

    展开全文
  • RabbitMQ详解

    2020-07-26 23:02:44
    RabbitMQ 二、RabbitMQ核心概念及AMQP协议 互联网大厂为什么选择RabbitMQ 开源, 性能优秀, 稳定性保障 提供可靠的消息投递模式(confirm), 返回模式(return) SpringAMQP完美整合, API丰富 集群模式丰富, 表达式配置...
  • RabbitMQ的理解

    2020-08-21 18:09:55
    RabbitMQ是一个开源的消息代理软件。它接收生产者发布的消息并发送给消费者。它扮演中间商的角色,可以用来降低web服务器因发送消息带来的负载以及延时。 RabbitMQ里的几个重要概念。 生产者(Producer):发送消息...
  • rabbitmq的交换机Topic的*和#区别

    千次阅读 2019-09-05 15:58:35
    rabbitmq的通配符模式(Topic Exchange)的*和#区别 答 符号“#”匹配路由键的一个或多个词,符号“*”匹配路由键的一个词。 例如:http://www.yayihouse.com/yayishuwu/chapter/2130 ...
  • Topics和Routing的基本原理相同,即:生产者将消息发给交换机,交换机根据routingKey将消息转发给与routingKey匹配的队列。 不同之处是: routingKey的匹配方式, Routing模式是相等匹配, topics模式是统配符匹配。...
  • RabbitMQ的基础概念和设计模式

    万次阅读 2019-07-04 15:21:17
    一、基础概念篇
  • RabbitMQ

    千次阅读 2019-06-12 10:42:16
    内部消息分发机制 集群及高可用方案 持久化机制、内存及磁盘控制 消息可靠性及插件化机制 分布式场景实例解决方案(饿了么卖场)
  • 1、RabbitMQ的简单使用

    万次阅读 多人点赞 2018-06-28 16:46:52
    1、AMQP AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向...Erlang中的实现有 RabbitMQ等。 2、RabbitMQ 1、Erlang语言 ...
  • Windows环境下RabbitMQ的启动和停止命令

    万次阅读 多人点赞 2019-03-28 18:41:56
    首先windows下安装好了erlang和rabbitmq。 如下地址同时下载和安装: Erlang:http://www.erlang.org/download.html RabbitMQ :http://www.rabbitmq.com/download.html 安装RabbitMQ时注意如下图: RabbitMQ ...
  • 重启rabbitmq服务

    万次阅读 2018-05-22 15:03:11
    重启rabbitmq服务通过两个命令来实现: rabbitmqctl stop :停止rabbitmq rabbitmq-server restart : 重启rabbitmq 因为rabbitmqctl是没有restart命令的,所以重启rabbitmq服务需要这么两步。 另外,附上一些...
  • RabbitMQ 网页端控制台开启方式

    万次阅读 2014-12-16 11:14:52
    最近使用RabbitMQ发现只有命令行的方式使用RabbitMQ对队列进行管理
  • 启动/停止rabbitmq及日志查看

    万次阅读 2017-08-07 16:05:55
    [root@localhost ~]# service rabbitmq-server Usage: /etc/init.d/rabbitmq-server {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload} rabbitmq的日志在目录:/var/log/...
  • rabbitmq开启web管理后台

    万次阅读 2016-07-01 16:59:21
    rabbitmq自带管理后台,安装后需要配置开启 进入rabbitmq安装目录中的sbin目录执行 rabbitmq-plugins enable rabbitmq_management 重启rabbitmq服务生效 打开http://localhost:15672/即可看到管理后台 用户名...
  • rabbitmq开启webui界面管理

    万次阅读 2018-09-10 14:51:44
    通过rabbitmq-plugins list查看rabbitmq插件,发现rabbitmq_management这里并没有被开启。这里介绍如何开启rabbitmq基于http的webui界面管理。 如果需要通过浏览器访问rabbitmq管理界面,需要开启rabbitmq_...
  • rabbitmq管理界面开启

    万次阅读 2016-11-24 14:36:26
    在windows7上安装了rabbitmq server, 安装过程略。 下面要开启管理界面: 1、在cmd窗口下进入rabbitmq安装目录下的sbin目录,使用rabbitmq-plugins.bat list查看已安装的插件列表。 2、使用rabbitmq-plugins.bat ...
  • rabbitmq后台启动以及关闭

    万次阅读 2019-05-16 09:49:13
    安装时rabbitmq并未加入系统服务,所以启动时进入安装路径 ./rabbitmq-server -detached 或者 sudo rabbitmqctl start_app 此时如果rabbitmq-server stop会无法停止,报节点已经启动 需用 rabbitmqctl stop_app...
1 2 3 4 5 ... 20
收藏数 98,560
精华内容 39,424
关键字:

rabbitmq