rabbitmq 订阅
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 展开全文
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
信息
开发公司
Rabbit
简    称
MQ
构    成
以高性能、健壮以及可伸缩性出名的 Erlang 写成
释    义
一种程序对程序的通信方法
中文名
消息队列
外文名
Message Queue
rabbitmq简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
收起全文
精华内容
参与话题
问答
  • RabbitMQ

    万次阅读 多人点赞 2019-10-29 23:14:20
    文章目录RabbitMQ 使用场景服务解耦流量削峰异步调用rabbitmq 基本概念ExchangeMessage QueueBinding KeyRouting Keyrabbitmq安装安装erlang语言库rabbitmq官方精简的Erlang语言包下载和安装安装socat依赖socat依赖...

    Rabbitmq

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密

    耦合

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

    解耦

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

    低流量

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

    流量峰值

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景

    流量销峰

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢

    阻塞

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    异步调用

    rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    rabbitmq

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    exchange

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    rabbitmq安装

    离线安装

    下载离线安装包文件

    上传离线安装包

    • rabbitmq-install 目录上传到 /root

    切换到rabbitmq-install目录

    cd rabbitmq-install
    

    安装

    rpm -ivh *.rpm
    

    Yum在线安装

    以下内容来自 RabbitMQ 官方手册

    rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
    
    
    # centos7 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    # centos6 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    yum makecache
    
    yum install socat
    
    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
    rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
    
    yum install rabbitmq-server
    

    启动rabbitmq服务器

    # 设置服务,开机自动启动
    systemctl enable rabbitmq-server
    
    # 启动服务
    systemctl start rabbitmq-server
    

    rabbitmq管理界面

    启用管理界面

    # 开启管理界面插件
    rabbitmq-plugins enable rabbitmq_management
    
    # 防火墙打开 15672 管理端口
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    

    重启RabbitMQ服务

    systemctl restart rabbitmq-server
    

    访问

    访问服务器的15672端口,例如:

    http://192.168.64.140:15672

    添加用户

    添加用户

    # 添加用户
    rabbitmqctl add_user admin admin
    
    # 新用户设置用户为超级管理员
    rabbitmqctl set_user_tags admin administrator
    

    设置访问权限

    访问权限
    访问权限

    开放客户端连接端口

    # 打开客户端连接端口
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --reload
    
    • 主要端口介绍
      • 4369 – erlang发现口
      • 5672 – client端通信口
      • 15672 – 管理界面ui端口
      • 25672 – server间内部通信口

    rabbitmq六种工作模式

    简单模式

    简单

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    • 发送消息的程序是生产者
    • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    • 消费者等待从队列接收消息

    简单模式

    pom.xml

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.tedu</groupId>
    	<artifactId>rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>5.4.3</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.8.0</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    工作模式

    工作

    工作模式

    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		//参数:queue,durable,exclusive,autoDelete,arguments
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		while (true) {
    		    //控制台输入的消息发送到rabbitmq
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			//如果输入的是"exit"则结束生产者进程
    			if ("exit".equals(msg)) {
    				break;
    			}
    			//参数:exchage,routingKey,props,body
    			ch.basicPublish("", "helloworld", null, msg.getBytes());
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    
    				//遍历字符串中的字符,每个点使进程暂停一秒
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    运行测试

    运行:

    • 一个生产者
    • 两个消费者

    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    • 消费者一: 1,3,5
    • 消费者二: 2,4

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

    如果生产者发送以下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

    • 消费者一: 1…, 3, 5
    • 消费者二: 2, 4

    当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("helloworld", false, callback, cancel);
    	}
    }
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    合理分发

    消息持久化

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    ch.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    ch.queueDeclare("task_queue", true, false, false, null);
    

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    下面是"工作模式"最终完成的生产者和消费者代码

    生产者代码

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class Test3 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue", true,false,false,null);
    
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第三个参数设置消息持久化
    			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者代码

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test4 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue",true,false,false,null);
    		
    		System.out.println("等待接收数据");
    		
    		ch.basicQos(1); //一次只接收一条消息
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("task_queue", false, callback, cancel);
    	}
    }
    
    

    发布订阅模式

    发布订阅

    发布订阅

    在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

    为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。

    在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。

    最终, 消息会被广播到所有消息接受者

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queueName = ch.queueDeclare().getQueue();
    

    绑定 Bindings

    绑定

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    ch.queueBind(queueName, "logs", "");
    

    现在, logs交换机将会向我们指定的队列添加消息

    列出绑定关系:

    rabbitmqctl list_bindings

    完成的代码

    完成代码

    生产者

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

    package rabbitmq.publishsubscribe;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为logs的交换机,交换机类型为fanout
    		//这一步是必须的,因为禁止发布到不存在的交换。
    		ch.exchangeDeclare("logs", "fanout");
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第一个参数,向指定的交换机发送消息
    			//第二个参数,不指定队列,由消费者向交换机绑定队列
    			//如果还没有队列绑定到交换器,消息就会丢失,
    			//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
    			ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
    ReceiveLogs.java代码:

    package rabbitmq.publishsubscribe;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 logs 的交换机, 它的类型是 fanout
    		ch.exchangeDeclare("logs", "fanout");
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		//把该队列,绑定到 logs 交换机
    		//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
    		ch.queueBind(queueName, "logs", "");
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    路由模式

    路由

    路由模式

    在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。

    在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    绑定 Bindings

    在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

    ch.queueBind(queueName, "logs", "");
    

    绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

    绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

    ch.queueBind(queueName, EXCHANGE_NAME, "black");
    

    bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

    直连交换机 Direct exchange

    上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

    前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

    我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

    路由模式

    其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

    这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

    多重绑定 Multiple bindings

    多重绑定

    使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

    发送日志

    我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

    和前面一样,我们首先需要创建一个exchange:

    //参数1: 交换机名
    //参数2: 交换机类型
    ch.exchangeDeclare("direct_logs", "direct");
    

    接着来看发送消息的代码

    //参数1: 交换机名
    //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    //参数3: 其他配置属性
    //参数4: 发布的消息数据 
    ch.basicPublish("direct_logs", "error", null, message.getBytes());
    

    订阅

    接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

    ch.queueBind(queueName, "logs", "info");
    ch.queueBind(queueName, "logs", "warning");
    

    完整的代码

    完整代码

    生产者

    package rabbitmq.routing;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		String[] a = {"warning", "info", "error"};
    		
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//随机产生日志级别
    			String level = a[new Random().nextInt(a.length)];
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("direct_logs", level, null, msg.getBytes());
    			System.out.println("消息已发送: "+level+" - "+msg);
    			
    		}
    
    		c.close();
    	}
    }
    
    

    消费者

    package rabbitmq.routing;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入接收的日志级别,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 direct_logs 交换机
    		//允许使用多个 bindingKey
    		for (String level : a) {
    			ch.queueBind(queueName, "direct_logs", level);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    主题模式

    主题

    在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

    虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

    在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

    这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

    要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    主题

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    完成的代码

    生产者

    package rabbitmq.topic;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".contentEquals(msg)) {
    				break;
    			}
    			System.out.print("输入routingKey: ");
    			String routingKey = new Scanner(System.in).nextLine();
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
    			
    			System.out.println("消息已发送: "+routingKey+" - "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    package rabbitmq.topic;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入bindingKey,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 topic_logs 交换机
    		//允许使用多个 bindingKey
    		for (String bindingKey : a) {
    			ch.queueBind(queueName, "topic_logs", bindingKey);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    RPC模式

    RPC

    如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.

    在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

    客户端

    在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

    RPCClient client = new RPCClient();
    String result = client.call("4");
    System.out.println( "第四个斐波那契数是: " + result);
    

    回调队列 Callback Queue

    使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

    //定义回调队列,
    //自动生成对列名,非持久,独占,自动删除
    callbackQueueName = ch.queueDeclare().getQueue();
    
    //用来设置回调队列的参数对象
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    //发送调用消息
    ch.basicPublish("", "rpc_queue", props, message.getBytes());
    

    消息属性 Message Properties

    AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:

    deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。

    contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json

    replyTo:通常用于指定回调队列。

    correlationId:将RPC响应与请求关联起来非常有用。

    关联id (correlationId):

    在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

    这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

    小结

    rpc

    RPC的工作方式是这样的:

    • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
    • 请求被发送到rpc_queue队列。
    • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
    • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。

    完成的代码

    服务器端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCServer {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		/*
    		 * 定义队列 rpc_queue, 将从它接收请求信息
    		 * 
    		 * 参数:
    		 * 1. queue, 对列名
    		 * 2. durable, 持久化
    		 * 3. exclusive, 排他
    		 * 4. autoDelete, 自动删除
    		 * 5. arguments, 其他参数属性
    		 */
    		ch.queueDeclare("rpc_queue",false,false,false,null);
    		ch.queuePurge("rpc_queue");//清除队列中的内容
    		
    		ch.basicQos(1);//一次只接收一条消息
    		
    		
    		//收到请求消息后的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//处理收到的数据(要求第几个斐波那契数)
    				String msg = new String(message.getBody(), "UTF-8");
    				int n = Integer.parseInt(msg);
    				//求出第n个斐波那契数
    				int r = fbnq(n);
    				String response = String.valueOf(r);
    				
    				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
    				BasicProperties replyProps = new BasicProperties.Builder()
    						.correlationId(message.getProperties().getCorrelationId())
    						.build();
    				/*
    				 * 发送响应消息
    				 * 1. 默认交换机
    				 * 2. 由客户端指定的,用来传递响应消息的队列名
    				 * 3. 参数(关联id)
    				 * 4. 发回的响应消息
    				 */
    				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
    				//发送确认消息
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
    		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    	}
    
    	protected static int fbnq(int n) {
    		if(n == 1 || n == 2) return 1;
    		
    		return fbnq(n-1)+fbnq(n-2);
    	}
    }
    
    

    客户端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCClient {
    	Connection con;
    	Channel ch;
    	
    	public RPCClient() throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		con = f.newConnection();
    		ch = con.createChannel();
    	}
    	
    	public String call(String msg) throws Exception {
    		//自动生成对列名,非持久,独占,自动删除
    		String replyQueueName = ch.queueDeclare().getQueue();
    		//生成关联id
    		String corrId = UUID.randomUUID().toString();
    		
    		//设置两个参数:
    		//1. 请求和响应的关联id
    		//2. 传递响应数据的queue
    		BasicProperties props = new BasicProperties.Builder()
    				.correlationId(corrId)
    				.replyTo(replyQueueName)
    				.build();
    		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
    		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
    		
    		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
    		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    		
    		//接收响应数据的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
    				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
    					//把收到的响应数据,放入阻塞集合
    					response.offer(new String(message.getBody(), "UTF-8"));
    				}
    			}
    		};
    
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//开始从队列接收响应数据
    		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
    		//返回保存在集合中的响应数据
    		return response.take();
    	}
    	
    	public static void main(String[] args) throws Exception {
    		RPCClient client = new RPCClient();
    		while (true) {
    			System.out.print("求第几个斐波那契数:");
    			int n = new Scanner(System.in).nextInt();
    			String r = client.call(""+n);
    			System.out.println(r);
    		}
    	}
    }
    
    

    virtual host

    在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

    创建virtual host: /pd

    • 进入虚拟机管理界面

    虚拟主机

    • 添加新的虚拟机’/pd’,名称必须以"/"开头

    虚拟主机

    • 查看添加的结果

    结果

    设置虚拟机的用户访问权限

    点击 /pd 虚拟主机, 设置用户 admin 对它的访问权限

    权限

    拼多商城整合 rabbitmq

    当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中

    当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统

    我们需要应对流量峰值,让流量曲线变得平缓,如下图

    削峰

    订单存储的解耦

    为了进行流量削峰,我们引入 rabbitmq 消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;而订单消费者应用从消息队列接收订单消息,并把订单保存到数据库

    订单

    这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存

    生产者-发送订单

    pom.xml 添加依赖

    spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.yml

    添加RabbitMQ的连接信息

    spring:
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    

    修改主程序 RunPdAPP

    在主程序中添加下面的方法创建Queue实例

    当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,代码在RabbitAdmin.declareQueues()方法中

    	@Bean
    	public Queue getQueue() {
    		Queue q = new Queue("orderQueue", true);
    		return q;
    	}
    

    修改 OrderServiceImpl

    	//RabbitAutoConfiguration中创建了AmpqTemplate实例
    	@Autowired
    	AmqpTemplate amqpTemplate;
    
        //saveOrder原来的数据库访问代码全部注释,添加rabbitmq消息发送代码
    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		String orderId = generateId();
    		pdOrder.setOrderId(orderId);
    
    		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		return orderId;
    
    		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		
    		//		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		//		pdOrder.setShippingName(pdShipping.getReceiverName());
    		//		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		//		pdOrder.setStatus(1);// 
    		//		pdOrder.setPaymentType(1);
    		//		pdOrder.setPostFee(10D);
    		//		pdOrder.setCreateTime(new Date());
    		//
    		//		double payment = 0;
    		//		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		//		for (ItemVO itemVO : itemVOs) {
    		//			PdOrderItem pdOrderItem = new PdOrderItem();
    		//			String id = generateId();
    		//			//String id="2";
    		//			pdOrderItem.setId(id);
    		//			pdOrderItem.setOrderId(orderId);
    		//			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    		//			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    		//			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    		//			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    		//
    		//			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    		//			pdOrderItemMapper.insert(pdOrderItem);
    		//		}
    		//		pdOrder.setPayment(payment);
    		//		pdOrderMapper.insert(pdOrder);
    		//		return orderId;
    	}
    
    

    消费者-接收订单,并保存到数据库

    pd-web项目复制为pd-order-consumer

    复制项目

    修改 application.yml

    把端口修改成 81

    server:
      port: 81
    
    spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
        username: root
        password: root
    
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    mybatis:
      #typeAliasesPackage: cn.tedu.ssm.pojo
      mapperLocations: classpath:com.pd.mapper/*.xml
    
    logging:
      level: 
        cn.tedu.ssm.mapper: debug
    

    删除无关代码

    pd-order-consumer项目只需要从 RabbitMQ 接收订单数据, 再保存到数据库即可, 所以项目中只需要保留这部分代码

    • 删除 com.pd.controller 包
    • 删除 com.pd.payment.utils 包
    • 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl

    新建 OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    修改 OrderServiceImpl 的 saveOrder() 方法

    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		//		return orderId;
    		//
    		//		
    		//		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		
    		//从RabbitMQ接收的订单数据,
    		//已经在上游订单业务中生成过id,这里不再重新生成id
    		//直接获取该订单的id
    		String orderId = pdOrder.getOrderId();
    
    		
    		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		pdOrder.setShippingName(pdShipping.getReceiverName());
    		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		pdOrder.setStatus(1);// 
    		pdOrder.setPaymentType(1);
    		pdOrder.setPostFee(10D);
    		pdOrder.setCreateTime(new Date());
    
    		double payment = 0;
    		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		for (ItemVO itemVO : itemVOs) {
    			PdOrderItem pdOrderItem = new PdOrderItem();
    			String id = generateId();
    			//String id="2";
    			pdOrderItem.setId(id);
    			pdOrderItem.setOrderId(orderId);
    			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    
    			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    			pdOrderItemMapper.insert(pdOrderItem);
    		}
    		pdOrder.setPayment(payment);
    		pdOrderMapper.insert(pdOrder);
    		return orderId;
    	}   
    

    手动确认

    application.yml

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    

    OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder, Channel channel, Message message)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} 
    	}
    }
    
    展开全文
  • 消息中间件MQ与RabbitMQ面试题(2020最新版)

    万次阅读 多人点赞 2020-03-01 11:11:21
    文章目录为什么使用MQ?MQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?你们公司生产环境用的是...rabbitmq 的使用场景RabbitMQ基本概念RabbitMQ的工作模式如何保证RabbitMQ消息的顺序性?消息如何分发?消...

    大家好,我是CSDN的博主ThinkWon,“2020博客之星年度总评选"开始啦,希望大家帮我投票,每天都可以投多票哦,点击下方链接,然后点击"最大”,再点击"投TA一票"就可以啦!
    投票链接:https://bss.csdn.net/m/topic/blog_star2020/detail?username=thinkwon
    在技术的世界里,ThinkWon将一路与你相伴!创作出更多更高质量的文章!2020为努力奋斗的你点赞👍,️新的一年,祝各位大牛牛气冲天,牛年大吉!😊😊

    Java面试总结汇总,整理了包括Java基础知识,集合容器,并发编程,JVM,常用开源框架Spring,MyBatis,数据库,中间件等,包含了作为一个Java工程师在面试中需要用到或者可能用到的绝大部分知识。欢迎大家阅读,本人见识有限,写的博客难免有错误或者疏忽的地方,还望各位大佬指点,在此表示感激不尽。文章持续更新中…

    序号 内容 链接地址
    1 Java基础知识面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390612
    2 Java集合容器面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588551
    3 Java异常面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390689
    4 并发编程面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104863992
    5 JVM面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390752
    6 Spring面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397516
    7 Spring MVC面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397427
    8 Spring Boot面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397299
    9 Spring Cloud面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397367
    10 MyBatis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/101292950
    11 Redis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/103522351
    12 MySQL数据库面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104778621
    13 消息中间件MQ与RabbitMQ面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588612
    14 Dubbo面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390006
    15 Linux面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588679
    16 Tomcat面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397665
    17 ZooKeeper面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397719
    18 Netty面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104391081
    19 架构设计&分布式&数据结构与算法面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/105870730

    为什么使用MQ?MQ的优点

    简答

    • 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
    • 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
    • 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
    • 日志处理 - 解决大量日志传输。
    • 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

    详答

    主要是:解耦、异步、削峰。

    解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

    就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

    异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。

    削峰:减少高峰时期对服务器压力。

    消息队列有什么优缺点?RabbitMQ有什么优缺点?

    优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

    缺点有以下几个:

    系统可用性降低

    本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;

    系统复杂度提高

    加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

    一致性问题

    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

    所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

    你们公司生产环境用的是什么消息中间件?

    这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。

    举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。

    但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。

    然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。

    另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

    而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。

    除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。

    但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

    然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。

    而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

    另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。

    但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。

    因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。

    Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

    ActiveMQ RabbitMQ RocketMQ Kafka ZeroMQ
    单机吞吐量 比RabbitMQ低 2.6w/s(消息做持久化) 11.6w/s 17.3w/s 29w/s
    开发语言 Java Erlang Java Scala/Java C
    主要维护者 Apache Mozilla/Spring Alibaba Apache iMatix,创始人已去世
    成熟度 成熟 成熟 开源版本不够成熟 比较成熟 只有C、PHP等版本成熟
    订阅形式 点对点(p2p)、广播(发布-订阅) 提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式 基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式 基于topic以及按照topic进行正则匹配的发布订阅模式 点对点(p2p)
    持久化 支持少量堆积 支持少量堆积 支持大量堆积 支持大量堆积 不支持
    顺序消息 不支持 不支持 支持 支持 不支持
    性能稳定性 一般 较差 很好
    集群方式 支持简单集群模式,比如’主-备’,对高级集群模式支持不好。 支持简单集群,'复制’模式,对高级集群模式支持不好。 常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master 天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave 不支持
    管理界面 一般 较好 一般

    综上,各种对比之后,有如下建议:

    一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

    后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

    不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

    所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

    如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

    MQ 有哪些常见问题?如何解决这些问题?

    MQ 的常见问题有:

    1. 消息的顺序问题
    2. 消息的重复问题

    消息的顺序问题

    消息有序指的是可以按照消息的发送顺序来消费。

    假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?

    img

    解决方案:

    (1)保证生产者 - MQServer - 消费者是一对一对一的关系

    img

    缺陷:

    • 并行度就会成为消息系统的瓶颈(吞吐量不够)
    • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。
    • 不关注乱序的应用实际大量存在
    • 队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。

    消息的重复问题

    造成消息重复的根本原因是:网络不可达。

    所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

    消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

    什么是RabbitMQ?

    RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件

    rabbitmq 的使用场景

    (1)服务间异步通信

    (2)顺序消费

    (3)定时任务

    (4)请求削峰

    RabbitMQ基本概念

    • Broker: 简单来说就是消息队列服务器实体
    • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
    • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
    • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
    • VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
    • Producer: 消息生产者,就是投递消息的程序
    • Consumer: 消息消费者,就是接受消息的程序
    • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

    由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

    RabbitMQ的工作模式

    一.simple模式(即最简单的收发模式)

    img

    1.消息产生消息,将消息放入队列

    2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

    二.work工作模式(资源的竞争)

    img

    1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

    三.publish/subscribe发布订阅(共享资源)

    img

    1、每个消费者监听自己的队列;

    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

    四.routing路由模式

    img

    1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

    2.根据业务功能定义路由字符串

    3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

    4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

    五.topic 主题模式(路由模式的一种)

    img

    1.星号井号代表通配符

    2.星号代表多个单词,井号代表一个单词

    3.路由功能添加模糊匹配

    4.消息产生者产生消息,把消息交给交换机

    5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

    (在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

    如何保证RabbitMQ消息的顺序性?

    拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

    消息如何分发?

    若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能

    消息怎么路由?

    消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

    常用的交换器主要分为一下三种:

    fanout:如果交换器收到消息,将会广播到所有绑定的队列上

    direct:如果路由键完全匹配,消息就被投递到相应的队列

    topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

    消息基于什么传输?

    由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

    如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

    先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;

    但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

    针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

    比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

    假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

    如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

    发送方确认模式

    将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

    一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

    如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

    发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

    接收方确认机制

    消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

    这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

    下面罗列几种特殊情况

    • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
    • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

    如何保证RabbitMQ消息的可靠传输?

    消息不可靠的情况可能是消息丢失,劫持等原因;

    丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

    生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

    transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

    confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;

    rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

    如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

    消息队列丢数据:消息持久化。

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

    这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

    这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    那么如何持久化呢?

    这里顺便说一下吧,其实也很容易,就下面两步

    1. 将queue的持久化标识durable设置为true,则代表是一个持久的队列
    2. 发送消息的时候将deliveryMode=2

    这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

    消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

    消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

    如果这时处理消息失败,就会丢失该消息;

    解决方案:处理消息成功后,手动回复确认消息。

    为什么不应该对所有的 message 都使用持久化机制?

    首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

    其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。

    所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

    如何保证高可用的?RabbitMQ 的集群

    RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

    单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式

    普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

    镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

    如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

    消息积压处理办法:临时紧急扩容:

    先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
    新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
    然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
    接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
    等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
    MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

    mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

    设计MQ思路

    比如说这个消息队列系统,我们从以下几个角度来考虑一下:

    首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

    其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。

    其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。

    能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。

    展开全文
  • RabbitMQ教程

    万次阅读 2019-07-30 16:29:43
    文章目录RabbitMQ实战教程1.什么是MQ2.RabbitMQ2.1.RabbitMQ的简介2.2.官网2.3.MQ的其他产品2.4.学习5种队列2.5.安装文档3.搭建RabbitMQ环境3.1.下载3.2.windows下安装3.3.Linux下安装3.4.安装的注意事项3.5.安装...

    RabbitMQ实战教程

    1.什么是MQ

    • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
      其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    • 关于消息队列的详细介绍请参阅:
      《Java帝国之消息队列》
      《一个故事告诉你什么是消息队列》
      《到底什么时候该使用MQ》

    • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

    • 本教程pdf及代码下载地址
      代码:https://download.csdn.net/download/zpcandzhj/10585077
      教程:https://download.csdn.net/download/zpcandzhj/10585092

    2.RabbitMQ

    2.1.RabbitMQ的简介

    这里写图片描述
    开发语言:Erlang – 面向并发的编程语言。

    这里写图片描述

    2.1.1.AMQP
    AMQP是消息队列的一个协议。

    这里写图片描述

    2.2.官网

    这里写图片描述

    2.3.MQ的其他产品

    这里写图片描述

    2.4.学习5种队列

    这里写图片描述

    2.5.安装文档

    这里写图片描述

    3.搭建RabbitMQ环境

    3.1.下载

    下载地址:http://www.rabbitmq.com/download.html

    3.2.windows下安装

    3.2.1.安装Erlang
    下载:http://www.erlang.org/download/otp_win64_17.3.exe
    安装:
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    3.2.2.安装RabbitMQ
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    开始菜单里出现如下选项:
    这里写图片描述

    启动、停止、重新安装等。

    3.2.3.启用管理工具
    1、双击这里写图片描述
    2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
    rabbitmq-plugins enable rabbitmq_management
    这里写图片描述

    这样就启动了管理工具,可以试一下命令:
    停止:net stop RabbitMQ
    启动:net start RabbitMQ

    3、在浏览器中输入地址查看:http://127.0.0.1:15672/
    这里写图片描述
    4、使用默认账号登录:guest/ guest

    3.3.Linux下安装

    3.3.1.安装Erlang
    3.3.2.添加yum支持
    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    使用yum安装:
    sudo yum install erlang
    这里写图片描述

    3.3.3.安装RabbitMQ
    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    3.3.4.启动、停止
    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    3.3.5.设置开机启动
    chkconfig rabbitmq-server on
    3.3.6.设置配置文件
    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

    mv rabbitmq.config.example rabbitmq.config
    3.3.7.开启用户远程访问
    vi /etc/rabbitmq/rabbitmq.config
    这里写图片描述
    注意要去掉后面的逗号。
    3.3.8.开启web界面管理工具
    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart
    3.3.9.防火墙开放15672端口
    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /etc/rc.d/init.d/iptables save

    3.4.安装的注意事项

    1、推荐使用默认的安装路径
    2、系统用户名必须是英文
    Win10改名字非常麻烦,具体方法百度
    这里写图片描述
    3、计算机名必须是英文
    这里写图片描述
    4、系统的用户必须是管理员

    如果安装失败应该如何解决:
    1、重装系统 – 不推荐
    2、将RabbitMQ安装到linux虚拟机中
    a)推荐
    3、使用别人安装好的RabbitMQ服务
    a)只要给你开通一个账户即可。
    b)使用公用的RabbitMQ服务,在192.168.50.22
    c)推荐

    常见错误:
    这里写图片描述

    3.5.安装完成后操作

    1、系统服务中有RabbitMQ服务,停止、启动、重启
    这里写图片描述
    2、打开命令行工具
    这里写图片描述
    如果找不到命令行工具,直接cd到相应目录:
    这里写图片描述
    输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
    这里写图片描述
    查看管理页面
    这里写图片描述
    通过默认账户 guest/guest 登录
    如果能够登录,说明安装成功。
    这里写图片描述

    4.添加用户

    4.1.添加admin用户

    这里写图片描述

    4.2.用户角色

    1、超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2、监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3、策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4、普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5、其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4.3.创建Virtual Hosts

    这里写图片描述

    选中Admin用户,设置权限:
    这里写图片描述
    看到权限已加:
    这里写图片描述

    4.4.管理界面中的功能

    这里写图片描述

    这里写图片描述

    5.学习五种队列

    这里写图片描述

    5.1.导入my-rabbitmq项目

    项目下载地址:
    https://download.csdn.net/download/zpcandzhj/10585077
    这里写图片描述

    5.2.简单队列

    5.2.1.图示
    这里写图片描述

    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。

    5.2.2.导入RabbitMQ的客户端依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>3.4.1</version>
    </dependency>
    

    5.2.3.获取MQ的连接

    package com.zpc.rabbitmq.util;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection
    public class ConnectionUtil 
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    5.2.4.生产者发送消息到队列

    package com.zpc.rabbitmq.simple
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String QUEUE_NAME = "q_test_01"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel()
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    5.2.5.管理工具中查看消息
    这里写图片描述

    点击上面的队列名称,查询具体的队列中的信息:
    这里写图片描述

    5.2.6.消费者从队列中获取消息

    package com.zpc.rabbitmq.simple
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    public class Recv 
        private final static String QUEUE_NAME = "q_test_01"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel)
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    5.3.Work模式

    这里写图片描述

    5.3.1.图示
    这里写图片描述

    一个生产者、2个消费者。

    一个消息只能被一个消费者获取。
    5.3.2.消费者1

    package com.zpc.rabbitmq.work
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [y] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态,注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.3.消费者2

    package com.zpc.rabbitmq.work
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
                //下面这行注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.4.生产者
    向队列中发送100条消息。

    package com.zpc.rabbitmq.work
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'")
                Thread.sleep(i * 10);
            }
            channel.close();
            connection.close();
        }
    }
    

    5.3.5.测试
    测试结果:
    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

    • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
      RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

    • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
      basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    • 2个概念

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

    5.4.Work模式的“能者多劳”

    打开上述代码的注释:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

    同时改为手动确认:

    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    测试:
    消费者1比消费者2获取的消息更多。

    5.5.消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:
    这里写图片描述

    自动模式:
    这里写图片描述

    5.6.订阅模式

    这里写图片描述
    5.6.1.图示
    这里写图片描述

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
    这里写图片描述

    5.6.2.消息的生产者(看作是后台系统)
    向交换机中发送消息。

    package com.zpc.rabbitmq.subscribe
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'")
            channel.close();
            connection.close();
        }
    }
    

    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
    5.6.3.消费者1(看作是前台系统)

    package com.zpc.rabbitmq.subscribe
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_work1"
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.4.消费者2(看作是搜索系统)

    package com.zpc.rabbitmq.subscribe
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_work2"
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.5.测试
    测试结果:
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    在管理工具中查看队列和交换机的绑定关系:

    这里写图片描述

    5.7.路由模式

    这里写图片描述
    5.7.1.图示
    这里写图片描述

    5.7.2.生产者
    这里写图片描述
    5.7.3.消费者1(假设是前台系统)
    这里写图片描述
    5.7.4.消费2(假设是搜索系统)
    这里写图片描述

    5.8.主题模式(通配符模式)

    这里写图片描述

    这里写图片描述

    5.8.1.图示
    这里写图片描述
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    5.8.2.生产者

    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Send 
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic")
            // 消息内容
            String message = "Hello World!!";
            channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'")
            channel.close();
            connection.close();
        }
    }
    

    5.8.3.消费者1(前台系统)

    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_topic_work_1"
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv_x] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.8.4.消费者2(搜索系统)

    
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_topic_work_2"
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2_x] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    6.Spring-Rabbit

    6.1.Spring项目

    http://spring.io/projects

    这里写图片描述

    6.2.简介

    这里写图片描述
    这里写图片描述

    6.3.使用

    6.3.1.消费者

    package com.zpc.rabbitmq.spring
    /**
     * 消费者
     *
     * @author Evan
     */
    public class Foo 
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("\n消费者: " + foo + "\n");
        }
    }
    

    6.3.2.生产者

    package com.zpc.rabbitmq.spring
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext
    public class SpringMain {
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, 鸟鹏!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    6.3.3.配置文件
    1、定义连接工厂

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
       host="127.0.0.1" port="5672" username="admin" password="admin"
       virtual-host="testhost" />
    

    2、定义模板(可以指定交换机或队列)

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    

    3、定义队列、交换机、以及完成队列和交换机的绑定

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="zpcQueue" auto-declare="true"/>
    <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
       <rabbit:bindings>
          <rabbit:binding queue="zpcQueue"/>
       </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    4、定义监听

    <rabbit:listener-container connection-factory="connectionFactory">
       <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
    </rabbit:listener-container>
    <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    

    5、定义管理,用于管理队列、交换机等:

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />
    

    完整配置文件rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
       <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
          exchange="fanoutExchange" routing-key="foo.bar" /> -->
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义队列,自动声明 -->
       <rabbit:queue name="zpcQueue" auto-declare="true"/>
       <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
       <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
          <rabbit:bindings>
             <rabbit:binding queue="zpcQueue"/>
          </rabbit:bindings>
       </rabbit:fanout-exchange>
    <!--   <rabbit:topic-exchange name="myExchange">
          <rabbit:bindings>
             <rabbit:binding queue="myQueue" pattern="foo.*" />
          </rabbit:bindings>
       </rabbit:topic-exchange> -->
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
       </rabbit:listener-container>
       <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    </beans>
    

    6.4.持久化交换机和队列

    这里写图片描述

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。

    如何选择持久化?非持久化? – 看需求。

    7.Spring集成RabbitMQ一个完整案例

    创建三个系统A,B,C
    A作为生产者,B、C作为消费者(B,C作为web项目启动)
    项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

    7.1.在A系统中发送消息到交换机

    7.1.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
       <groupId>com.zpc</groupId>
       <artifactId>myrabbitA</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>
       <name>myrabbit</name>
       <dependencies>
          <dependency>
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-rabbit</artifactId>
             <version>1.4.0.RELEASE</version>
          </dependency>
          <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
          </dependency>
       </dependencies>
    </project>
    

    7.1.2.队列和交换机的绑定关系
    实现:
    1、在配置文件中将队列和交换机完成绑定
    2、可以在管理界面中完成绑定
    a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
    b)管理更加灵活
    c)更容易对绑定关系的权限管理,流程管理
    本例选择第2种方式
    7.1.3.配置
    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
       <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
       <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
       <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
    </beans>
    

    7.1.4.消息内容
    方案:
    1、消息内容使用对象做json序列化发送
    a)数据大
    b)有些数据其他人是可能用不到的
    2、发送特定的业务字段,如id、操作类型

    7.1.5.实现
    生产者MsgSender.java:

    package com.zpc.myrabbit
    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map
    /**
     * 消息生产者
     */
    public class MsgSender {
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class)
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            //发送消息
            Map<String, Object> msg = new HashMap<String, Object>();
            msg.put("type", "1");
            msg.put("date", date);
            template.convertAndSend("type2", JSON.toJSONString(msg));
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    7.2.在B系统接收消息

    7.2.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.zpc</groupId>
        <artifactId>myrabbitB</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
        <name>myrabbit</name>
        <properties>
            <spring.version>4.1.3.RELEASE</spring.version>
            <fastjson.version>1.2.46</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}</finalName>
            <plugins>
                <!-- web层需要配置Tomcat插件 -->
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <path>/testRabbit</path>
                        <uriEncoding>UTF-8</uriEncoding>
                        <port>8081</port>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    7.2.2.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义B系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testB" auto-declare="true"/>
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
       </rabbit:listener-container>
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.2.3.具体处理逻辑

    public class Listener {
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者B开始处理消息: " + msg + "\n");
        }
    }
    

    7.2.4.在界面管理工具中完成绑定关系
    选中定义好的交换机(exchange)
    这里写图片描述
    1)direct
    这里写图片描述
    2)fanout
    这里写图片描述
    3)topic
    这里写图片描述

    7.3.在C系统中接收消息

    (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

    7.3.1.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义C系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testC" auto-declare="true"/>
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
       </rabbit:listener-container>
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.3.2.处理业务逻辑

    public class Listener 
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者C开始处理消息: " + msg + "\n");
        }
    }
    

    7.3.3.在管理工具中绑定队列和交换机
    见7.2.4

    7.3.4.测试
    分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

    8.Springboot集成RabbitMQ

    8.1.简单队列

    1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置application.properties文件
    配置rabbitmq的安装地址、端口以及账户信息

    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    

    3、配置队列

    package com.zpc.rabbitmq
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    }
    

    4、发送者

    package com.zpc.rabbitmq
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    import java.text.SimpleDateFormat;
    import java.util.Date
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    }
    

    5、接收者

    package com.zpc.rabbitmq
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6、测试

    package com.zpc.rabbitmq
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest 
        @Autowired
        private HelloSender helloSender
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    }
    

    8.2.多对多使用(Work模式)

    注册两个Receiver:

    package com.zpc.rabbitmq
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver2 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    

    }

    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }
    
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    

    8.3.Topic Exchange(主题模式)

    • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    1)配置队列,绑定交换机

    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class TopicRabbitConfig 
        final static String message = "q_topic_message";
        final static String messages = "q_topic_messages"
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }
    
    /**
     * 声明一个Topic类型的交换机
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("mybootexchange");
    }
    
    /**
     * 绑定Q到交换机,并且指定routingKey
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }
    
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
    

    }

    2)创建2个消费者
    q_topic_message 和q_topic_messages

    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_topic_message")
    public class Receiver1 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_topic_messages")
    public class Receiver2 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3)消息发送者(生产者)

    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    @Component
    public class MsgSender 
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
        }
    
    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
    }
    

    }

    send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
    4)测试

    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTopicTest 
        @Autowired
        private MsgSender msgSender
        @Test
        public void send1() throws Exception {
            msgSender.send1();
        }
    
    @Test
    public void send2() throws Exception {
        msgSender.send2();
    }
    

    }

    8.4.Fanout Exchange(订阅模式)

    • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
      1)配置队列,绑定交换机
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class FanoutRabbitConfig 
        @Bean
        public Queue aMessage() {
            return new Queue("q_fanout_A");
        }
    
    @Bean
    public Queue bMessage() {
        return new Queue("q_fanout_B");
    }
    
    @Bean
    public Queue cMessage() {
        return new Queue("q_fanout_C");
    }
    
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("mybootfanoutExchange");
    }
    
    @Bean
    Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(aMessage).to(fanoutExchange);
    }
    
    @Bean
    Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(bMessage).to(fanoutExchange);
    }
    
    @Bean
    Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cMessage).to(fanoutExchange);
    }
    

    }

    2)创建3个消费者

    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_A")
    public class ReceiverA 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_B")
    public class ReceiverB 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_C")
    public class ReceiverC 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3)生产者

    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    @Component
    public class MsgSenderFanout 
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
        }
    }
    

    4)测试

    package com.zpc.rabbitmq.fanout
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitFanoutTest 
        @Autowired
        private MsgSenderFanout msgSender
        @Test
        public void send1() throws Exception {
            msgSender.send();
        }
    }
    

    结果如下,三个消费者都收到消息:
    AReceiver : hi, fanout msg
    CReceiver : hi, fanout msg
    BReceiver : hi, fanout msg

    9.总结

    展开全文
  • RabbitMQ(万字教程)

    万次阅读 多人点赞 2018-08-05 23:14:56
    RabbitMQ万字教程。RabbitMQ详解。消息队列(MQ),本质是个队列,队列中存放的内容是message。MQ用于不同进程Process/线程Thread之间通信。本文介绍RabbitMQ的使用。RabbitMQ实战教程。

    仅需一次订阅RabbitMQ,作者所有专栏都能看

    推荐【Flinkhttps://blog.csdn.net/hellozpc/article/details/109413465
    推荐【SpringBoothttps://blog.csdn.net/hellozpc/article/details/107095951
    推荐【SpringCloudhttps://blog.csdn.net/hellozpc/article/details/83692496
    推荐【Mybatishttps://blog.csdn.net/hellozpc/article/details/80878563
    推荐【SnowFlakehttps://blog.csdn.net/hellozpc/article/details/108248227
    推荐【并发限流https://blog.csdn.net/hellozpc/article/details/107582771

    **欢迎关注**
    **扫一扫**

    RabbitMQ实战教程

    1.什么是MQ

    • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
      其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    • 关于消息队列的详细介绍请参阅:
      《Java帝国之消息队列》
      《一个故事告诉你什么是消息队列》
      《到底什么时候该使用MQ》

    • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq

    • 本教程pdf及代码下载地址
      代码:https://download.csdn.net/download/zpcandzhj/10585077
      教程:https://download.csdn.net/download/zpcandzhj/10585092

    2.RabbitMQ

    2.1.RabbitMQ的简介

    这里写图片描述
    开发语言:Erlang – 面向并发的编程语言。

    这里写图片描述

    2.1.1.AMQP
    AMQP是消息队列的一个协议。

    这里写图片描述

    2.2.官网

    这里写图片描述

    2.3.MQ的其他产品

    这里写图片描述

    2.4.学习5种队列

    这里写图片描述

    2.5.安装文档

    这里写图片描述

    3.搭建RabbitMQ环境

    3.1.下载

    下载地址:http://www.rabbitmq.com/download.html

    3.2.windows下安装

    3.2.1.安装Erlang
    下载:http://www.erlang.org/download/otp_win64_17.3.exe
    安装:
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    3.2.2.安装RabbitMQ
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    开始菜单里出现如下选项:
    这里写图片描述

    启动、停止、重新安装等。

    3.2.3.启用管理工具
    1、双击这里写图片描述
    2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
    rabbitmq-plugins enable rabbitmq_management
    这里写图片描述

    这样就启动了管理工具,可以试一下命令:
    停止:net stop RabbitMQ
    启动:net start RabbitMQ

    3、在浏览器中输入地址查看:http://127.0.0.1:15672/
    这里写图片描述
    4、使用默认账号登录:guest/ guest

    3.3.Linux下安装

    3.3.1.安装Erlang
    3.3.2.添加yum支持
    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    使用yum安装:
    sudo yum install erlang
    这里写图片描述

    3.3.3.安装RabbitMQ
    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    3.3.4.启动、停止
    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    3.3.5.设置开机启动
    chkconfig rabbitmq-server on
    3.3.6.设置配置文件
    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

    mv rabbitmq.config.example rabbitmq.config
    3.3.7.开启用户远程访问
    vi /etc/rabbitmq/rabbitmq.config
    这里写图片描述
    注意要去掉后面的逗号。
    3.3.8.开启web界面管理工具
    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart
    3.3.9.防火墙开放15672端口
    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /etc/rc.d/init.d/iptables save

    3.4.安装的注意事项

    1、推荐使用默认的安装路径
    2、系统用户名必须是英文
    Win10改名字非常麻烦,具体方法百度
    这里写图片描述
    3、计算机名必须是英文
    这里写图片描述
    4、系统的用户必须是管理员

    如果安装失败应该如何解决:
    1、重装系统 – 不推荐
    2、将RabbitMQ安装到linux虚拟机中
    a)推荐
    3、使用别人安装好的RabbitMQ服务
    a)只要给你开通一个账户即可。
    b)使用公用的RabbitMQ服务,在192.168.50.22
    c)推荐

    常见错误:
    这里写图片描述

    3.5.安装完成后操作

    1、系统服务中有RabbitMQ服务,停止、启动、重启
    这里写图片描述
    2、打开命令行工具
    这里写图片描述
    如果找不到命令行工具,直接cd到相应目录:
    这里写图片描述
    输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
    这里写图片描述
    查看管理页面
    这里写图片描述
    通过默认账户 guest/guest 登录
    如果能够登录,说明安装成功。
    这里写图片描述

    4.添加用户

    4.1.添加admin用户

    这里写图片描述

    4.2.用户角色

    1、超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2、监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3、策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4、普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5、其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4.3.创建Virtual Hosts

    这里写图片描述

    选中Admin用户,设置权限:
    这里写图片描述
    看到权限已加:
    这里写图片描述

    4.4.管理界面中的功能

    这里写图片描述

    这里写图片描述

    5.学习五种队列

    这里写图片描述

    5.1.导入my-rabbitmq项目

    项目下载地址:
    https://download.csdn.net/download/zpcandzhj/10585077
    这里写图片描述

    5.2.简单队列

    5.2.1.图示
    这里写图片描述

    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。
    5.2.2.导入RabbitMQ的客户端依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>3.4.1</version>
    </dependency>
    

    5.2.3.获取MQ的连接

    package com.zpc.rabbitmq.util;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    5.2.4.生产者发送消息到队列

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    5.2.5.管理工具中查看消息
    这里写图片描述

    点击上面的队列名称,查询具体的队列中的信息:
    这里写图片描述
    5.2.6.消费者从队列中获取消息

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    5.3.Work模式

    这里写图片描述

    5.3.1.图示
    这里写图片描述

    一个生产者、2个消费者。

    一个消息只能被一个消费者获取。
    5.3.2.消费者1

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [y] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态,注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.3.消费者2

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
                //下面这行注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.4.生产者
    向队列中发送100条消息。

    package com.zpc.rabbitmq.work;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 10);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    5.3.5.测试
    测试结果:
    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

    • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
      RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

    • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
      basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    • 2个概念

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

    5.4.Work模式的“能者多劳”

    打开上述代码的注释:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

    同时改为手动确认:

    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    测试:
    消费者1比消费者2获取的消息更多。

    5.5.消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:
    这里写图片描述

    自动模式:
    这里写图片描述

    5.6.订阅模式

    这里写图片描述
    5.6.1.图示
    这里写图片描述

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
    这里写图片描述

    5.6.2.消息的生产者(看作是后台系统)
    向交换机中发送消息。

    package com.zpc.rabbitmq.subscribe;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
    5.6.3.消费者1(看作是前台系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work1";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.4.消费者2(看作是搜索系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work2";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.5.测试
    测试结果:
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    在管理工具中查看队列和交换机的绑定关系:

    这里写图片描述

    5.7.路由模式

    这里写图片描述
    5.7.1.图示
    这里写图片描述

    5.7.2.生产者
    这里写图片描述
    5.7.3.消费者1(假设是前台系统)
    这里写图片描述
    5.7.4.消费2(假设是搜索系统)
    这里写图片描述

    5.8.主题模式(通配符模式)

    这里写图片描述

    这里写图片描述

    5.8.1.图示
    这里写图片描述
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    5.8.2.生产者

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 消息内容
            String message = "Hello World!!";
            channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    5.8.3.消费者1(前台系统)

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_1";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.8.4.消费者2(搜索系统)

    package com.zpc.rabbitmq.topic;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_2";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    6.Spring-Rabbit

    6.1.Spring项目

    http://spring.io/projects

    这里写图片描述

    6.2.简介

    这里写图片描述
    这里写图片描述

    6.3.使用

    6.3.1.消费者

    package com.zpc.rabbitmq.spring;
    
    /**
     * 消费者
     *
     * @author Evan
     */
    public class Foo {
    
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("\n消费者: " + foo + "\n");
        }
    }
    

    6.3.2.生产者

    package com.zpc.rabbitmq.spring;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class SpringMain {
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, 鸟鹏!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    6.3.3.配置文件
    1、定义连接工厂

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
       host="127.0.0.1" port="5672" username="admin" password="admin"
       virtual-host="testhost" />
    

    2、定义模板(可以指定交换机或队列)

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    

    3、定义队列、交换机、以及完成队列和交换机的绑定

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="zpcQueue" auto-declare="true"/>
    
    <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
       <rabbit:bindings>
          <rabbit:binding queue="zpcQueue"/>
       </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    4、定义监听

    <rabbit:listener-container connection-factory="connectionFactory">
       <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
    </rabbit:listener-container>
    
    <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    

    5、定义管理,用于管理队列、交换机等:

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />
    

    完整配置文件rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
       <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
          exchange="fanoutExchange" routing-key="foo.bar" /> -->
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义队列,自动声明 -->
       <rabbit:queue name="zpcQueue" auto-declare="true"/>
       
       <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
       <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
          <rabbit:bindings>
             <rabbit:binding queue="zpcQueue"/>
          </rabbit:bindings>
       </rabbit:fanout-exchange>
       
    <!--   <rabbit:topic-exchange name="myExchange">
          <rabbit:bindings>
             <rabbit:binding queue="myQueue" pattern="foo.*" />
          </rabbit:bindings>
       </rabbit:topic-exchange> -->
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
       </rabbit:listener-container>
    
       <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    
    </beans>
    

    6.4.持久化交换机和队列

    这里写图片描述

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。

    如何选择持久化?非持久化? – 看需求。

    欢迎关注公众号「程猿薇茑」

    7.Spring集成RabbitMQ一个完整案例

    创建三个系统A,B,C
    A作为生产者,B、C作为消费者(B,C作为web项目启动)
    项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

    7.1.在A系统中发送消息到交换机

    7.1.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.zpc</groupId>
       <artifactId>myrabbitA</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>
       <name>myrabbit</name>
    
       <dependencies>
          <dependency>
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-rabbit</artifactId>
             <version>1.4.0.RELEASE</version>
          </dependency>
    
          <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
          </dependency>
       </dependencies>
    </project>
    

    7.1.2.队列和交换机的绑定关系
    实现:
    1、在配置文件中将队列和交换机完成绑定
    2、可以在管理界面中完成绑定
    a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
    b)管理更加灵活
    c)更容易对绑定关系的权限管理,流程管理
    本例选择第2种方式
    7.1.3.配置
    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
       <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
       <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
       <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
    </beans>
    

    7.1.4.消息内容
    方案:
    1、消息内容使用对象做json序列化发送
    a)数据大
    b)有些数据其他人是可能用不到的
    2、发送特定的业务字段,如id、操作类型

    7.1.5.实现
    生产者MsgSender.java:

    package com.zpc.myrabbit;
    
    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    
    /**
     * 消息生产者
     */
    public class MsgSender {
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
    
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            //发送消息
            Map<String, Object> msg = new HashMap<String, Object>();
            msg.put("type", "1");
            msg.put("date", date);
            template.convertAndSend("type2", JSON.toJSONString(msg));
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    7.2.在B系统接收消息

    7.2.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zpc</groupId>
        <artifactId>myrabbitB</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
    
        <name>myrabbit</name>
        <properties>
            <spring.version>4.1.3.RELEASE</spring.version>
            <fastjson.version>1.2.46</fastjson.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>${project.artifactId}</finalName>
            <plugins>
                <!-- web层需要配置Tomcat插件 -->
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <path>/testRabbit</path>
                        <uriEncoding>UTF-8</uriEncoding>
                        <port>8081</port>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    7.2.2.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义B系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testB" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.2.3.具体处理逻辑

    public class Listener {
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者B开始处理消息: " + msg + "\n");
        }
    }
    

    7.2.4.在界面管理工具中完成绑定关系
    选中定义好的交换机(exchange)
    这里写图片描述
    1)direct
    这里写图片描述
    2)fanout
    这里写图片描述
    3)topic
    这里写图片描述

    7.3.在C系统中接收消息

    (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

    7.3.1.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义C系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testC" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.3.2.处理业务逻辑

    public class Listener {
    
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者C开始处理消息: " + msg + "\n");
        }
    }
    

    7.3.3.在管理工具中绑定队列和交换机
    见7.2.4

    7.3.4.测试
    分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

    8.Springboot集成RabbitMQ

    8.1.简单队列

    1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置application.properties文件
    配置rabbitmq的安装地址、端口以及账户信息

    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    

    3、配置rabbitmq队列

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    }
    

    4、rabbitmq发送者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    }
    

    5、rabbitmq接收者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6、测试

    package com.zpc.rabbitmq;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    }
    

    8.2.多对多使用(Work模式)

    注册两个Receiver:

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    
    }
    
    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }
    
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    

    8.3.Topic Exchange(主题模式)

    • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    1)配置队列,绑定交换机

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TopicRabbitConfig {
    
        final static String message = "q_topic_message";
        final static String messages = "q_topic_messages";
    
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
        @Bean
        public Queue queueMessages() {
            return new Queue(TopicRabbitConfig.messages);
        }
    
        /**
         * 声明一个Topic类型的交换机
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("mybootexchange");
        }
    
        /**
         * 绑定Q到交换机,并且指定routingKey
         * @param queueMessage
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
    }
    

    2)创建2个消费者
    q_topic_message 和q_topic_messages

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_message")
    public class Receiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_messages")
    public class Receiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3)消息发送者(生产者)

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
        }
    
    
        public void send2() {
            String context = "hi, i am messages 2";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
        }
    }
    

    send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
    4)测试

    package com.zpc.rabbitmq.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTopicTest {
    
        @Autowired
        private MsgSender msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send1();
        }
    
        @Test
        public void send2() throws Exception {
            msgSender.send2();
        }
    }
    

    8.4.Fanout Exchange(订阅模式)

    • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
      1)配置队列,绑定交换机
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutRabbitConfig {
    
        @Bean
        public Queue aMessage() {
            return new Queue("q_fanout_A");
        }
    
        @Bean
        public Queue bMessage() {
            return new Queue("q_fanout_B");
        }
    
        @Bean
        public Queue cMessage() {
            return new Queue("q_fanout_C");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("mybootfanoutExchange");
        }
    
        @Bean
        Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(aMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(bMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(cMessage).to(fanoutExchange);
        }
    }
    

    2)创建3个消费者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_A")
    public class ReceiverA {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_B")
    public class ReceiverB {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_C")
    public class ReceiverC {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3)生产者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSenderFanout {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
        }
    }
    

    4)测试

    package com.zpc.rabbitmq.fanout;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitFanoutTest {
    
        @Autowired
        private MsgSenderFanout msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send();
        }
    }
    

    结果如下,三个消费者都收到消息:
    AReceiver : hi, fanout msg
    CReceiver : hi, fanout msg
    BReceiver : hi, fanout msg

    9.总结

    推荐springCloud教程:
    https://blog.csdn.net/hellozpc/article/details/83692496

    推荐Springboot2.0教程:
    https://blog.csdn.net/hellozpc/article/details/82531834

    **欢迎关注公众号【程猿薇茑】**
    **微信扫一扫**
    展开全文
  • docker 安装rabbitMQ

    万次阅读 2020-11-24 18:42:53
    搜索rabbitMq,进入官方的镜像,可以看到以下几种类型的镜像;我们选择带有“mangement”的版本(包含web管理页面); 拉取镜像 docker pull rabbitmq 查看所有镜像 docker images 安装和web界面启动 ...
  • Rabbitmq

    千次阅读 2019-12-10 11:41:52
    1.应用场景 用于在分布式系统中存储转发消息,在易用性、扩展性... RabbitMQ is a message broker[消息代理]: it accepts and forwards[输出,转发] messages. You can think about it as a post office [邮局...
  • Linux安装RabbitMq Erlang使用

    万次阅读 2019-11-19 15:37:16
    Linux安装RabbitMq 配置安装环境 1. 安装Erlang环境 yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 2)安装ncurses yum -y install ncurses-devel 3)安装erlang环境 ```...
  • Linux安装RabbitMq步骤流程

    万次阅读 2020-01-09 17:47:38
    1:下载RabbitMq依赖的erlang语言安装包 wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 最新的22版本 2:若缺少epel-release 依赖 yum install epel-release 3:安装erlang软件包 ...
  • 代码地址:https://github.com/ArvinHu/rabbit
  • RabbitMQ的安装

    万次阅读 2020-06-02 20:14:09
    一、安装erlang环境 ... 这个文件其实不是gz格式的,使用file otp_src_20.1.tar.gz可以查看它的真实数据格式 解压tar -xvf otp_src_20.1.tar.gz 解压后,先安装依赖,这2个必须要安装 yum install ncurses-devel ...
  • rabbitMQ笔记

    万次阅读 2020-09-04 09:36:50
    【1】windows上安装RabbitMQ 【2】命令 【3】ubuntu16.04中安装RabbitMQ 【4】CentOS 6.5 安装 RabbitMQ 3.6.1 【5】远程访问配置 【1】windows上安装RabbitMQ 1.安装erlang环境(otp_win64_20.0.exe),并检查系统...
  • Springboot整合RabbitMQ详细讲解

    万次阅读 2020-08-26 16:47:28
    搭建RabbitMQ环境 Springboot整合RabbitMQ 1、添加整合依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </...
  • SpringBoot整合RabbitMQ之 典型应用场景实战一

    万次阅读 多人点赞 2018-10-09 15:08:04
    RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、...
  • docker安装RabbitMQ

    万次阅读 2019-12-27 15:39:16
    想玩一下RabbitMQ,在网上查找了linux安装,感觉特别麻烦,本人买的阿里服务器被我安装了docker环境,所以用docker安装更为简洁、快速。 1、查找rabbitmq镜像 注意:如果docker pull rabbitmq 后面不带management...

空空如也

1 2 3 4 5 ... 20
收藏数 94,336
精华内容 37,734
关键字:

rabbitmq