rabbitmq 订阅
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 展开全文
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
信息
开发公司
Rabbit
简    称
MQ
构    成
以高性能、健壮以及可伸缩性出名的 Erlang 写成
释    义
一种程序对程序的通信方法
中文名
消息队列
外文名
Message Queue
rabbitmq简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
收起全文
精华内容
下载资源
问答
  • RabbitMQ

    万次阅读 多人点赞 2019-10-29 23:14:20
    文章目录RabbitMQ 使用场景服务解耦流量削峰异步调用rabbitmq 基本概念ExchangeMessage QueueBinding KeyRouting Keyrabbitmq安装安装erlang语言库rabbitmq官方精简的Erlang语言包下载和安装安装socat依赖socat依赖...

    Rabbitmq

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密

    耦合

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

    解耦

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

    低流量

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

    流量峰值

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景

    流量销峰

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢

    阻塞

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    异步调用

    rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    rabbitmq

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    exchange

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    rabbitmq安装

    离线安装

    下载离线安装包文件

    上传离线安装包

    • rabbitmq-install 目录上传到 /root

    切换到rabbitmq-install目录

    cd rabbitmq-install
    

    安装

    rpm -ivh *.rpm
    

    Yum在线安装

    以下内容来自 RabbitMQ 官方手册

    rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
    
    
    # centos7 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    # centos6 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    yum makecache
    
    yum install socat
    
    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
    rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
    
    yum install rabbitmq-server
    

    启动rabbitmq服务器

    # 设置服务,开机自动启动
    systemctl enable rabbitmq-server
    
    # 启动服务
    systemctl start rabbitmq-server
    

    rabbitmq管理界面

    启用管理界面

    # 开启管理界面插件
    rabbitmq-plugins enable rabbitmq_management
    
    # 防火墙打开 15672 管理端口
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    

    重启RabbitMQ服务

    systemctl restart rabbitmq-server
    

    访问

    访问服务器的15672端口,例如:

    http://192.168.64.140:15672

    添加用户

    添加用户

    # 添加用户
    rabbitmqctl add_user admin admin
    
    # 新用户设置用户为超级管理员
    rabbitmqctl set_user_tags admin administrator
    

    设置访问权限

    访问权限
    访问权限

    开放客户端连接端口

    # 打开客户端连接端口
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --reload
    
    • 主要端口介绍
      • 4369 – erlang发现口
      • 5672 – client端通信口
      • 15672 – 管理界面ui端口
      • 25672 – server间内部通信口

    rabbitmq六种工作模式

    简单模式

    简单

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    • 发送消息的程序是生产者
    • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    • 消费者等待从队列接收消息

    简单模式

    pom.xml

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.tedu</groupId>
    	<artifactId>rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>5.4.3</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.8.0</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    工作模式

    工作

    工作模式

    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		//参数:queue,durable,exclusive,autoDelete,arguments
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		while (true) {
    		    //控制台输入的消息发送到rabbitmq
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			//如果输入的是"exit"则结束生产者进程
    			if ("exit".equals(msg)) {
    				break;
    			}
    			//参数:exchage,routingKey,props,body
    			ch.basicPublish("", "helloworld", null, msg.getBytes());
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    
    				//遍历字符串中的字符,每个点使进程暂停一秒
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    运行测试

    运行:

    • 一个生产者
    • 两个消费者

    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    • 消费者一: 1,3,5
    • 消费者二: 2,4

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

    如果生产者发送以下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

    • 消费者一: 1…, 3, 5
    • 消费者二: 2, 4

    当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("helloworld", false, callback, cancel);
    	}
    }
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    合理分发

    消息持久化

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    ch.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    ch.queueDeclare("task_queue", true, false, false, null);
    

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    下面是"工作模式"最终完成的生产者和消费者代码

    生产者代码

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class Test3 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue", true,false,false,null);
    
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第三个参数设置消息持久化
    			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者代码

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test4 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue",true,false,false,null);
    		
    		System.out.println("等待接收数据");
    		
    		ch.basicQos(1); //一次只接收一条消息
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("task_queue", false, callback, cancel);
    	}
    }
    
    

    发布订阅模式

    发布订阅

    发布订阅

    在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

    为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。

    在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。

    最终, 消息会被广播到所有消息接受者

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queueName = ch.queueDeclare().getQueue();
    

    绑定 Bindings

    绑定

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    ch.queueBind(queueName, "logs", "");
    

    现在, logs交换机将会向我们指定的队列添加消息

    列出绑定关系:

    rabbitmqctl list_bindings

    完成的代码

    完成代码

    生产者

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

    package rabbitmq.publishsubscribe;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为logs的交换机,交换机类型为fanout
    		//这一步是必须的,因为禁止发布到不存在的交换。
    		ch.exchangeDeclare("logs", "fanout");
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第一个参数,向指定的交换机发送消息
    			//第二个参数,不指定队列,由消费者向交换机绑定队列
    			//如果还没有队列绑定到交换器,消息就会丢失,
    			//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
    			ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
    ReceiveLogs.java代码:

    package rabbitmq.publishsubscribe;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 logs 的交换机, 它的类型是 fanout
    		ch.exchangeDeclare("logs", "fanout");
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		//把该队列,绑定到 logs 交换机
    		//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
    		ch.queueBind(queueName, "logs", "");
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    路由模式

    路由

    路由模式

    在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。

    在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    绑定 Bindings

    在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

    ch.queueBind(queueName, "logs", "");
    

    绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

    绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

    ch.queueBind(queueName, EXCHANGE_NAME, "black");
    

    bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

    直连交换机 Direct exchange

    上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

    前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

    我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

    路由模式

    其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

    这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

    多重绑定 Multiple bindings

    多重绑定

    使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

    发送日志

    我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

    和前面一样,我们首先需要创建一个exchange:

    //参数1: 交换机名
    //参数2: 交换机类型
    ch.exchangeDeclare("direct_logs", "direct");
    

    接着来看发送消息的代码

    //参数1: 交换机名
    //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    //参数3: 其他配置属性
    //参数4: 发布的消息数据 
    ch.basicPublish("direct_logs", "error", null, message.getBytes());
    

    订阅

    接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

    ch.queueBind(queueName, "logs", "info");
    ch.queueBind(queueName, "logs", "warning");
    

    完整的代码

    完整代码

    生产者

    package rabbitmq.routing;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		String[] a = {"warning", "info", "error"};
    		
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//随机产生日志级别
    			String level = a[new Random().nextInt(a.length)];
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("direct_logs", level, null, msg.getBytes());
    			System.out.println("消息已发送: "+level+" - "+msg);
    			
    		}
    
    		c.close();
    	}
    }
    
    

    消费者

    package rabbitmq.routing;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入接收的日志级别,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 direct_logs 交换机
    		//允许使用多个 bindingKey
    		for (String level : a) {
    			ch.queueBind(queueName, "direct_logs", level);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    主题模式

    主题

    在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

    虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

    在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

    这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

    要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    主题

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    完成的代码

    生产者

    package rabbitmq.topic;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".contentEquals(msg)) {
    				break;
    			}
    			System.out.print("输入routingKey: ");
    			String routingKey = new Scanner(System.in).nextLine();
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
    			
    			System.out.println("消息已发送: "+routingKey+" - "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    package rabbitmq.topic;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入bindingKey,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 topic_logs 交换机
    		//允许使用多个 bindingKey
    		for (String bindingKey : a) {
    			ch.queueBind(queueName, "topic_logs", bindingKey);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    RPC模式

    RPC

    如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.

    在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

    客户端

    在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

    RPCClient client = new RPCClient();
    String result = client.call("4");
    System.out.println( "第四个斐波那契数是: " + result);
    

    回调队列 Callback Queue

    使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

    //定义回调队列,
    //自动生成对列名,非持久,独占,自动删除
    callbackQueueName = ch.queueDeclare().getQueue();
    
    //用来设置回调队列的参数对象
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    //发送调用消息
    ch.basicPublish("", "rpc_queue", props, message.getBytes());
    

    消息属性 Message Properties

    AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:

    deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。

    contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json

    replyTo:通常用于指定回调队列。

    correlationId:将RPC响应与请求关联起来非常有用。

    关联id (correlationId):

    在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

    这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

    小结

    rpc

    RPC的工作方式是这样的:

    • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
    • 请求被发送到rpc_queue队列。
    • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
    • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。

    完成的代码

    服务器端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCServer {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		/*
    		 * 定义队列 rpc_queue, 将从它接收请求信息
    		 * 
    		 * 参数:
    		 * 1. queue, 对列名
    		 * 2. durable, 持久化
    		 * 3. exclusive, 排他
    		 * 4. autoDelete, 自动删除
    		 * 5. arguments, 其他参数属性
    		 */
    		ch.queueDeclare("rpc_queue",false,false,false,null);
    		ch.queuePurge("rpc_queue");//清除队列中的内容
    		
    		ch.basicQos(1);//一次只接收一条消息
    		
    		
    		//收到请求消息后的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//处理收到的数据(要求第几个斐波那契数)
    				String msg = new String(message.getBody(), "UTF-8");
    				int n = Integer.parseInt(msg);
    				//求出第n个斐波那契数
    				int r = fbnq(n);
    				String response = String.valueOf(r);
    				
    				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
    				BasicProperties replyProps = new BasicProperties.Builder()
    						.correlationId(message.getProperties().getCorrelationId())
    						.build();
    				/*
    				 * 发送响应消息
    				 * 1. 默认交换机
    				 * 2. 由客户端指定的,用来传递响应消息的队列名
    				 * 3. 参数(关联id)
    				 * 4. 发回的响应消息
    				 */
    				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
    				//发送确认消息
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
    		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    	}
    
    	protected static int fbnq(int n) {
    		if(n == 1 || n == 2) return 1;
    		
    		return fbnq(n-1)+fbnq(n-2);
    	}
    }
    
    

    客户端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCClient {
    	Connection con;
    	Channel ch;
    	
    	public RPCClient() throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		con = f.newConnection();
    		ch = con.createChannel();
    	}
    	
    	public String call(String msg) throws Exception {
    		//自动生成对列名,非持久,独占,自动删除
    		String replyQueueName = ch.queueDeclare().getQueue();
    		//生成关联id
    		String corrId = UUID.randomUUID().toString();
    		
    		//设置两个参数:
    		//1. 请求和响应的关联id
    		//2. 传递响应数据的queue
    		BasicProperties props = new BasicProperties.Builder()
    				.correlationId(corrId)
    				.replyTo(replyQueueName)
    				.build();
    		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
    		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
    		
    		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
    		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    		
    		//接收响应数据的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
    				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
    					//把收到的响应数据,放入阻塞集合
    					response.offer(new String(message.getBody(), "UTF-8"));
    				}
    			}
    		};
    
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//开始从队列接收响应数据
    		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
    		//返回保存在集合中的响应数据
    		return response.take();
    	}
    	
    	public static void main(String[] args) throws Exception {
    		RPCClient client = new RPCClient();
    		while (true) {
    			System.out.print("求第几个斐波那契数:");
    			int n = new Scanner(System.in).nextInt();
    			String r = client.call(""+n);
    			System.out.println(r);
    		}
    	}
    }
    
    

    virtual host

    在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

    创建virtual host: /pd

    • 进入虚拟机管理界面

    虚拟主机

    • 添加新的虚拟机’/pd’,名称必须以"/"开头

    虚拟主机

    • 查看添加的结果

    结果

    设置虚拟机的用户访问权限

    点击 /pd 虚拟主机, 设置用户 admin 对它的访问权限

    权限

    拼多商城整合 rabbitmq

    当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中

    当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统

    我们需要应对流量峰值,让流量曲线变得平缓,如下图

    削峰

    订单存储的解耦

    为了进行流量削峰,我们引入 rabbitmq 消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;而订单消费者应用从消息队列接收订单消息,并把订单保存到数据库

    订单

    这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存

    生产者-发送订单

    pom.xml 添加依赖

    spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.yml

    添加RabbitMQ的连接信息

    spring:
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    

    修改主程序 RunPdAPP

    在主程序中添加下面的方法创建Queue实例

    当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,代码在RabbitAdmin.declareQueues()方法中

    	@Bean
    	public Queue getQueue() {
    		Queue q = new Queue("orderQueue", true);
    		return q;
    	}
    

    修改 OrderServiceImpl

    	//RabbitAutoConfiguration中创建了AmpqTemplate实例
    	@Autowired
    	AmqpTemplate amqpTemplate;
    
        //saveOrder原来的数据库访问代码全部注释,添加rabbitmq消息发送代码
    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		String orderId = generateId();
    		pdOrder.setOrderId(orderId);
    
    		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		return orderId;
    
    		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		
    		//		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		//		pdOrder.setShippingName(pdShipping.getReceiverName());
    		//		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		//		pdOrder.setStatus(1);// 
    		//		pdOrder.setPaymentType(1);
    		//		pdOrder.setPostFee(10D);
    		//		pdOrder.setCreateTime(new Date());
    		//
    		//		double payment = 0;
    		//		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		//		for (ItemVO itemVO : itemVOs) {
    		//			PdOrderItem pdOrderItem = new PdOrderItem();
    		//			String id = generateId();
    		//			//String id="2";
    		//			pdOrderItem.setId(id);
    		//			pdOrderItem.setOrderId(orderId);
    		//			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    		//			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    		//			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    		//			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    		//
    		//			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    		//			pdOrderItemMapper.insert(pdOrderItem);
    		//		}
    		//		pdOrder.setPayment(payment);
    		//		pdOrderMapper.insert(pdOrder);
    		//		return orderId;
    	}
    
    

    消费者-接收订单,并保存到数据库

    pd-web项目复制为pd-order-consumer

    复制项目

    修改 application.yml

    把端口修改成 81

    server:
      port: 81
    
    spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
        username: root
        password: root
    
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    mybatis:
      #typeAliasesPackage: cn.tedu.ssm.pojo
      mapperLocations: classpath:com.pd.mapper/*.xml
    
    logging:
      level: 
        cn.tedu.ssm.mapper: debug
    

    删除无关代码

    pd-order-consumer项目只需要从 RabbitMQ 接收订单数据, 再保存到数据库即可, 所以项目中只需要保留这部分代码

    • 删除 com.pd.controller 包
    • 删除 com.pd.payment.utils 包
    • 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl

    新建 OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    修改 OrderServiceImpl 的 saveOrder() 方法

    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		//		return orderId;
    		//
    		//		
    		//		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		
    		//从RabbitMQ接收的订单数据,
    		//已经在上游订单业务中生成过id,这里不再重新生成id
    		//直接获取该订单的id
    		String orderId = pdOrder.getOrderId();
    
    		
    		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		pdOrder.setShippingName(pdShipping.getReceiverName());
    		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		pdOrder.setStatus(1);// 
    		pdOrder.setPaymentType(1);
    		pdOrder.setPostFee(10D);
    		pdOrder.setCreateTime(new Date());
    
    		double payment = 0;
    		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		for (ItemVO itemVO : itemVOs) {
    			PdOrderItem pdOrderItem = new PdOrderItem();
    			String id = generateId();
    			//String id="2";
    			pdOrderItem.setId(id);
    			pdOrderItem.setOrderId(orderId);
    			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    
    			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    			pdOrderItemMapper.insert(pdOrderItem);
    		}
    		pdOrder.setPayment(payment);
    		pdOrderMapper.insert(pdOrder);
    		return orderId;
    	}   
    

    手动确认

    application.yml

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    

    OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder, Channel channel, Message message)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} 
    	}
    }
    
    展开全文
  • RabbitMQ教程

    万次阅读 2019-07-30 16:29:43
    文章目录RabbitMQ实战教程1.什么是MQ2.RabbitMQ2.1.RabbitMQ的简介2.2.官网2.3.MQ的其他产品2.4.学习5种队列2.5.安装文档3.搭建RabbitMQ环境3.1.下载3.2.windows下安装3.3.Linux下安装3.4.安装的注意事项3.5.安装...

    RabbitMQ实战教程

    1.什么是MQ

    • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
      其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    • 关于消息队列的详细介绍请参阅:
      《Java帝国之消息队列》
      《一个故事告诉你什么是消息队列》
      《到底什么时候该使用MQ》

    • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

    • 本教程pdf及代码下载地址
      代码:https://download.csdn.net/download/zpcandzhj/10585077
      教程:https://download.csdn.net/download/zpcandzhj/10585092

    2.RabbitMQ

    2.1.RabbitMQ的简介

    这里写图片描述
    开发语言:Erlang – 面向并发的编程语言。

    这里写图片描述

    2.1.1.AMQP
    AMQP是消息队列的一个协议。

    这里写图片描述

    2.2.官网

    这里写图片描述

    2.3.MQ的其他产品

    这里写图片描述

    2.4.学习5种队列

    这里写图片描述

    2.5.安装文档

    这里写图片描述

    3.搭建RabbitMQ环境

    3.1.下载

    下载地址:http://www.rabbitmq.com/download.html

    3.2.windows下安装

    3.2.1.安装Erlang
    下载:http://www.erlang.org/download/otp_win64_17.3.exe
    安装:
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    3.2.2.安装RabbitMQ
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    开始菜单里出现如下选项:
    这里写图片描述

    启动、停止、重新安装等。

    3.2.3.启用管理工具
    1、双击这里写图片描述
    2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
    rabbitmq-plugins enable rabbitmq_management
    这里写图片描述

    这样就启动了管理工具,可以试一下命令:
    停止:net stop RabbitMQ
    启动:net start RabbitMQ

    3、在浏览器中输入地址查看:http://127.0.0.1:15672/
    这里写图片描述
    4、使用默认账号登录:guest/ guest

    3.3.Linux下安装

    3.3.1.安装Erlang
    3.3.2.添加yum支持
    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    使用yum安装:
    sudo yum install erlang
    这里写图片描述

    3.3.3.安装RabbitMQ
    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    3.3.4.启动、停止
    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    3.3.5.设置开机启动
    chkconfig rabbitmq-server on
    3.3.6.设置配置文件
    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

    mv rabbitmq.config.example rabbitmq.config
    3.3.7.开启用户远程访问
    vi /etc/rabbitmq/rabbitmq.config
    这里写图片描述
    注意要去掉后面的逗号。
    3.3.8.开启web界面管理工具
    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart
    3.3.9.防火墙开放15672端口
    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /etc/rc.d/init.d/iptables save

    3.4.安装的注意事项

    1、推荐使用默认的安装路径
    2、系统用户名必须是英文
    Win10改名字非常麻烦,具体方法百度
    这里写图片描述
    3、计算机名必须是英文
    这里写图片描述
    4、系统的用户必须是管理员

    如果安装失败应该如何解决:
    1、重装系统 – 不推荐
    2、将RabbitMQ安装到linux虚拟机中
    a)推荐
    3、使用别人安装好的RabbitMQ服务
    a)只要给你开通一个账户即可。
    b)使用公用的RabbitMQ服务,在192.168.50.22
    c)推荐

    常见错误:
    这里写图片描述

    3.5.安装完成后操作

    1、系统服务中有RabbitMQ服务,停止、启动、重启
    这里写图片描述
    2、打开命令行工具
    这里写图片描述
    如果找不到命令行工具,直接cd到相应目录:
    这里写图片描述
    输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
    这里写图片描述
    查看管理页面
    这里写图片描述
    通过默认账户 guest/guest 登录
    如果能够登录,说明安装成功。
    这里写图片描述

    4.添加用户

    4.1.添加admin用户

    这里写图片描述

    4.2.用户角色

    1、超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2、监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3、策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4、普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5、其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4.3.创建Virtual Hosts

    这里写图片描述

    选中Admin用户,设置权限:
    这里写图片描述
    看到权限已加:
    这里写图片描述

    4.4.管理界面中的功能

    这里写图片描述

    这里写图片描述

    5.学习五种队列

    这里写图片描述

    5.1.导入my-rabbitmq项目

    项目下载地址:
    https://download.csdn.net/download/zpcandzhj/10585077
    这里写图片描述

    5.2.简单队列

    5.2.1.图示
    这里写图片描述

    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。

    5.2.2.导入RabbitMQ的客户端依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>3.4.1</version>
    </dependency>
    

    5.2.3.获取MQ的连接

    package com.zpc.rabbitmq.util;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection
    public class ConnectionUtil 
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    5.2.4.生产者发送消息到队列

    package com.zpc.rabbitmq.simple
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String QUEUE_NAME = "q_test_01"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel()
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    5.2.5.管理工具中查看消息
    这里写图片描述

    点击上面的队列名称,查询具体的队列中的信息:
    这里写图片描述

    5.2.6.消费者从队列中获取消息

    package com.zpc.rabbitmq.simple
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    public class Recv 
        private final static String QUEUE_NAME = "q_test_01"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel)
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    5.3.Work模式

    这里写图片描述

    5.3.1.图示
    这里写图片描述

    一个生产者、2个消费者。

    一个消息只能被一个消费者获取。
    5.3.2.消费者1

    package com.zpc.rabbitmq.work
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [y] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态,注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.3.消费者2

    package com.zpc.rabbitmq.work
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
                //下面这行注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.4.生产者
    向队列中发送100条消息。

    package com.zpc.rabbitmq.work
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String QUEUE_NAME = "test_queue_work"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'")
                Thread.sleep(i * 10);
            }
            channel.close();
            connection.close();
        }
    }
    

    5.3.5.测试
    测试结果:
    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

    • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
      RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

    • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
      basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    • 2个概念

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

    5.4.Work模式的“能者多劳”

    打开上述代码的注释:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

    同时改为手动确认:

    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    测试:
    消费者1比消费者2获取的消息更多。

    5.5.消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:
    这里写图片描述

    自动模式:
    这里写图片描述

    5.6.订阅模式

    这里写图片描述
    5.6.1.图示
    这里写图片描述

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
    这里写图片描述

    5.6.2.消息的生产者(看作是后台系统)
    向交换机中发送消息。

    package com.zpc.rabbitmq.subscribe
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    public class Send 
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'")
            channel.close();
            connection.close();
        }
    }
    

    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
    5.6.3.消费者1(看作是前台系统)

    package com.zpc.rabbitmq.subscribe
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_work1"
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.4.消费者2(看作是搜索系统)

    package com.zpc.rabbitmq.subscribe
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_work2"
        private final static String EXCHANGE_NAME = "test_exchange_fanout"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.5.测试
    测试结果:
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    在管理工具中查看队列和交换机的绑定关系:

    这里写图片描述

    5.7.路由模式

    这里写图片描述
    5.7.1.图示
    这里写图片描述

    5.7.2.生产者
    这里写图片描述
    5.7.3.消费者1(假设是前台系统)
    这里写图片描述
    5.7.4.消费2(假设是搜索系统)
    这里写图片描述

    5.8.主题模式(通配符模式)

    这里写图片描述

    这里写图片描述

    5.8.1.图示
    这里写图片描述
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    5.8.2.生产者

    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Send 
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic")
            // 消息内容
            String message = "Hello World!!";
            channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'")
            channel.close();
            connection.close();
        }
    }
    

    5.8.3.消费者1(前台系统)

    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    import com.zpc.rabbitmq.util.ConnectionUtil
    public class Recv 
        private final static String QUEUE_NAME = "test_queue_topic_work_1"
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv_x] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.8.4.消费者2(搜索系统)

    
    import com.zpc.rabbitmq.util.ConnectionUtil
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer
    public class Recv2 
        private final static String QUEUE_NAME = "test_queue_topic_work_2"
        private final static String EXCHANGE_NAME = "test_exchange_topic"
        public static void main(String[] argv) throws Exception 
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel()
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*")
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1)
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer)
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2_x] Received '" + message + "'");
                Thread.sleep(10)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    6.Spring-Rabbit

    6.1.Spring项目

    http://spring.io/projects

    这里写图片描述

    6.2.简介

    这里写图片描述
    这里写图片描述

    6.3.使用

    6.3.1.消费者

    package com.zpc.rabbitmq.spring
    /**
     * 消费者
     *
     * @author Evan
     */
    public class Foo 
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("\n消费者: " + foo + "\n");
        }
    }
    

    6.3.2.生产者

    package com.zpc.rabbitmq.spring
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext
    public class SpringMain {
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, 鸟鹏!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    6.3.3.配置文件
    1、定义连接工厂

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
       host="127.0.0.1" port="5672" username="admin" password="admin"
       virtual-host="testhost" />
    

    2、定义模板(可以指定交换机或队列)

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    

    3、定义队列、交换机、以及完成队列和交换机的绑定

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="zpcQueue" auto-declare="true"/>
    <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
       <rabbit:bindings>
          <rabbit:binding queue="zpcQueue"/>
       </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    4、定义监听

    <rabbit:listener-container connection-factory="connectionFactory">
       <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
    </rabbit:listener-container>
    <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    

    5、定义管理,用于管理队列、交换机等:

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />
    

    完整配置文件rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
       <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
          exchange="fanoutExchange" routing-key="foo.bar" /> -->
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义队列,自动声明 -->
       <rabbit:queue name="zpcQueue" auto-declare="true"/>
       <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
       <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
          <rabbit:bindings>
             <rabbit:binding queue="zpcQueue"/>
          </rabbit:bindings>
       </rabbit:fanout-exchange>
    <!--   <rabbit:topic-exchange name="myExchange">
          <rabbit:bindings>
             <rabbit:binding queue="myQueue" pattern="foo.*" />
          </rabbit:bindings>
       </rabbit:topic-exchange> -->
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
       </rabbit:listener-container>
       <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    </beans>
    

    6.4.持久化交换机和队列

    这里写图片描述

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。

    如何选择持久化?非持久化? – 看需求。

    7.Spring集成RabbitMQ一个完整案例

    创建三个系统A,B,C
    A作为生产者,B、C作为消费者(B,C作为web项目启动)
    项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

    7.1.在A系统中发送消息到交换机

    7.1.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
       <groupId>com.zpc</groupId>
       <artifactId>myrabbitA</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>
       <name>myrabbit</name>
       <dependencies>
          <dependency>
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-rabbit</artifactId>
             <version>1.4.0.RELEASE</version>
          </dependency>
          <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
          </dependency>
       </dependencies>
    </project>
    

    7.1.2.队列和交换机的绑定关系
    实现:
    1、在配置文件中将队列和交换机完成绑定
    2、可以在管理界面中完成绑定
    a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
    b)管理更加灵活
    c)更容易对绑定关系的权限管理,流程管理
    本例选择第2种方式
    7.1.3.配置
    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
       <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
       <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
       <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
    </beans>
    

    7.1.4.消息内容
    方案:
    1、消息内容使用对象做json序列化发送
    a)数据大
    b)有些数据其他人是可能用不到的
    2、发送特定的业务字段,如id、操作类型

    7.1.5.实现
    生产者MsgSender.java:

    package com.zpc.myrabbit
    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map
    /**
     * 消息生产者
     */
    public class MsgSender {
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class)
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            //发送消息
            Map<String, Object> msg = new HashMap<String, Object>();
            msg.put("type", "1");
            msg.put("date", date);
            template.convertAndSend("type2", JSON.toJSONString(msg));
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    7.2.在B系统接收消息

    7.2.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.zpc</groupId>
        <artifactId>myrabbitB</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
        <name>myrabbit</name>
        <properties>
            <spring.version>4.1.3.RELEASE</spring.version>
            <fastjson.version>1.2.46</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}</finalName>
            <plugins>
                <!-- web层需要配置Tomcat插件 -->
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <path>/testRabbit</path>
                        <uriEncoding>UTF-8</uriEncoding>
                        <port>8081</port>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    7.2.2.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义B系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testB" auto-declare="true"/>
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
       </rabbit:listener-container>
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.2.3.具体处理逻辑

    public class Listener {
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者B开始处理消息: " + msg + "\n");
        }
    }
    

    7.2.4.在界面管理工具中完成绑定关系
    选中定义好的交换机(exchange)
    这里写图片描述
    1)direct
    这里写图片描述
    2)fanout
    这里写图片描述
    3)topic
    这里写图片描述

    7.3.在C系统中接收消息

    (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

    7.3.1.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
       <!-- 定义C系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testC" auto-declare="true"/>
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
       </rabbit:listener-container>
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.3.2.处理业务逻辑

    public class Listener 
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者C开始处理消息: " + msg + "\n");
        }
    }
    

    7.3.3.在管理工具中绑定队列和交换机
    见7.2.4

    7.3.4.测试
    分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

    8.Springboot集成RabbitMQ

    8.1.简单队列

    1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置application.properties文件
    配置rabbitmq的安装地址、端口以及账户信息

    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    

    3、配置队列

    package com.zpc.rabbitmq
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    }
    

    4、发送者

    package com.zpc.rabbitmq
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    import java.text.SimpleDateFormat;
    import java.util.Date
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    }
    

    5、接收者

    package com.zpc.rabbitmq
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6、测试

    package com.zpc.rabbitmq
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest 
        @Autowired
        private HelloSender helloSender
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    }
    

    8.2.多对多使用(Work模式)

    注册两个Receiver:

    package com.zpc.rabbitmq
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver2 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    

    }

    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }
    
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    

    8.3.Topic Exchange(主题模式)

    • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    1)配置队列,绑定交换机

    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class TopicRabbitConfig 
        final static String message = "q_topic_message";
        final static String messages = "q_topic_messages"
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }
    
    /**
     * 声明一个Topic类型的交换机
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("mybootexchange");
    }
    
    /**
     * 绑定Q到交换机,并且指定routingKey
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }
    
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
    

    }

    2)创建2个消费者
    q_topic_message 和q_topic_messages

    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_topic_message")
    public class Receiver1 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_topic_messages")
    public class Receiver2 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3)消息发送者(生产者)

    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    @Component
    public class MsgSender 
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
        }
    
    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
    }
    

    }

    send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
    4)测试

    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTopicTest 
        @Autowired
        private MsgSender msgSender
        @Test
        public void send1() throws Exception {
            msgSender.send1();
        }
    
    @Test
    public void send2() throws Exception {
        msgSender.send2();
    }
    

    }

    8.4.Fanout Exchange(订阅模式)

    • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
      1)配置队列,绑定交换机
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration
    @Configuration
    public class FanoutRabbitConfig 
        @Bean
        public Queue aMessage() {
            return new Queue("q_fanout_A");
        }
    
    @Bean
    public Queue bMessage() {
        return new Queue("q_fanout_B");
    }
    
    @Bean
    public Queue cMessage() {
        return new Queue("q_fanout_C");
    }
    
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("mybootfanoutExchange");
    }
    
    @Bean
    Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(aMessage).to(fanoutExchange);
    }
    
    @Bean
    Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(bMessage).to(fanoutExchange);
    }
    
    @Bean
    Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cMessage).to(fanoutExchange);
    }
    

    }

    2)创建3个消费者

    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_A")
    public class ReceiverA 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_B")
    public class ReceiverB 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component
    @Component
    @RabbitListener(queues = "q_fanout_C")
    public class ReceiverC 
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3)生产者

    package com.zpc.rabbitmq.fanout
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component
    @Component
    public class MsgSenderFanout 
        @Autowired
        private AmqpTemplate rabbitTemplate
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
        }
    }
    

    4)测试

    package com.zpc.rabbitmq.fanout
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitFanoutTest 
        @Autowired
        private MsgSenderFanout msgSender
        @Test
        public void send1() throws Exception {
            msgSender.send();
        }
    }
    

    结果如下,三个消费者都收到消息:
    AReceiver : hi, fanout msg
    CReceiver : hi, fanout msg
    BReceiver : hi, fanout msg

    9.总结

    展开全文
  • RabbitMQ(万字教程)

    万次阅读 多人点赞 2018-08-05 23:14:56
    RabbitMQ万字教程。RabbitMQ详解。消息队列(MQ),本质是个队列,队列中存放的内容是message。MQ用于不同进程Process/线程Thread之间通信。本文介绍RabbitMQ的使用。RabbitMQ实战教程。

    仅需一次订阅RabbitMQ,作者所有专栏都能看

    推荐【Flinkhttps://blog.csdn.net/hellozpc/article/details/109413465
    推荐【SpringBoothttps://blog.csdn.net/hellozpc/article/details/107095951
    推荐【SpringCloudhttps://blog.csdn.net/hellozpc/article/details/83692496
    推荐【Mybatishttps://blog.csdn.net/hellozpc/article/details/80878563
    推荐【SnowFlakehttps://blog.csdn.net/hellozpc/article/details/108248227
    推荐【并发限流https://blog.csdn.net/hellozpc/article/details/107582771

    **欢迎关注**
    **扫一扫**

    RabbitMQ实战教程

    1.什么是MQ

    • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
      其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    • 关于消息队列的详细介绍请参阅:
      《Java帝国之消息队列》
      《一个故事告诉你什么是消息队列》
      《到底什么时候该使用MQ》

    • MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq

    • 本教程pdf及代码下载地址
      代码:https://download.csdn.net/download/zpcandzhj/10585077
      教程:https://download.csdn.net/download/zpcandzhj/10585092

    2.RabbitMQ

    2.1.RabbitMQ的简介

    这里写图片描述
    开发语言:Erlang – 面向并发的编程语言。

    这里写图片描述

    2.1.1.AMQP
    AMQP是消息队列的一个协议。

    这里写图片描述

    2.2.官网

    这里写图片描述

    2.3.MQ的其他产品

    这里写图片描述

    2.4.学习5种队列

    这里写图片描述

    2.5.安装文档

    这里写图片描述

    3.搭建RabbitMQ环境

    3.1.下载

    下载地址:http://www.rabbitmq.com/download.html

    3.2.windows下安装

    3.2.1.安装Erlang
    下载:http://www.erlang.org/download/otp_win64_17.3.exe
    安装:
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    3.2.2.安装RabbitMQ
    这里写图片描述
    这里写图片描述
    这里写图片描述
    安装完成。

    开始菜单里出现如下选项:
    这里写图片描述

    启动、停止、重新安装等。

    3.2.3.启用管理工具
    1、双击这里写图片描述
    2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
    rabbitmq-plugins enable rabbitmq_management
    这里写图片描述

    这样就启动了管理工具,可以试一下命令:
    停止:net stop RabbitMQ
    启动:net start RabbitMQ

    3、在浏览器中输入地址查看:http://127.0.0.1:15672/
    这里写图片描述
    4、使用默认账号登录:guest/ guest

    3.3.Linux下安装

    3.3.1.安装Erlang
    3.3.2.添加yum支持
    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    使用yum安装:
    sudo yum install erlang
    这里写图片描述

    3.3.3.安装RabbitMQ
    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    3.3.4.启动、停止
    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    3.3.5.设置开机启动
    chkconfig rabbitmq-server on
    3.3.6.设置配置文件
    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

    mv rabbitmq.config.example rabbitmq.config
    3.3.7.开启用户远程访问
    vi /etc/rabbitmq/rabbitmq.config
    这里写图片描述
    注意要去掉后面的逗号。
    3.3.8.开启web界面管理工具
    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart
    3.3.9.防火墙开放15672端口
    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /etc/rc.d/init.d/iptables save

    3.4.安装的注意事项

    1、推荐使用默认的安装路径
    2、系统用户名必须是英文
    Win10改名字非常麻烦,具体方法百度
    这里写图片描述
    3、计算机名必须是英文
    这里写图片描述
    4、系统的用户必须是管理员

    如果安装失败应该如何解决:
    1、重装系统 – 不推荐
    2、将RabbitMQ安装到linux虚拟机中
    a)推荐
    3、使用别人安装好的RabbitMQ服务
    a)只要给你开通一个账户即可。
    b)使用公用的RabbitMQ服务,在192.168.50.22
    c)推荐

    常见错误:
    这里写图片描述

    3.5.安装完成后操作

    1、系统服务中有RabbitMQ服务,停止、启动、重启
    这里写图片描述
    2、打开命令行工具
    这里写图片描述
    如果找不到命令行工具,直接cd到相应目录:
    这里写图片描述
    输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
    这里写图片描述
    查看管理页面
    这里写图片描述
    通过默认账户 guest/guest 登录
    如果能够登录,说明安装成功。
    这里写图片描述

    4.添加用户

    4.1.添加admin用户

    这里写图片描述

    4.2.用户角色

    1、超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
    2、监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    3、策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
    4、普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
    5、其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

    4.3.创建Virtual Hosts

    这里写图片描述

    选中Admin用户,设置权限:
    这里写图片描述
    看到权限已加:
    这里写图片描述

    4.4.管理界面中的功能

    这里写图片描述

    这里写图片描述

    5.学习五种队列

    这里写图片描述

    5.1.导入my-rabbitmq项目

    项目下载地址:
    https://download.csdn.net/download/zpcandzhj/10585077
    这里写图片描述

    5.2.简单队列

    5.2.1.图示
    这里写图片描述

    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。
    5.2.2.导入RabbitMQ的客户端依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>3.4.1</version>
    </dependency>
    

    5.2.3.获取MQ的连接

    package com.zpc.rabbitmq.util;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    5.2.4.生产者发送消息到队列

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    5.2.5.管理工具中查看消息
    这里写图片描述

    点击上面的队列名称,查询具体的队列中的信息:
    这里写图片描述
    5.2.6.消费者从队列中获取消息

    package com.zpc.rabbitmq.simple;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "q_test_01";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    5.3.Work模式

    这里写图片描述

    5.3.1.图示
    这里写图片描述

    一个生产者、2个消费者。

    一个消息只能被一个消费者获取。
    5.3.2.消费者1

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [y] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态,注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.3.消费者2

    package com.zpc.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
                //下面这行注释掉表示使用自动确认模式
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.3.4.生产者
    向队列中发送100条消息。

    package com.zpc.rabbitmq.work;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 10);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    5.3.5.测试
    测试结果:
    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

    • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
      RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

    • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
      basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    • 2个概念

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

    5.4.Work模式的“能者多劳”

    打开上述代码的注释:

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    
    //开启这行 表示使用手动确认模式
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

    同时改为手动确认:

    // 监听队列,false表示手动返回完成状态,true表示自动
    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    测试:
    消费者1比消费者2获取的消息更多。

    5.5.消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:
    这里写图片描述

    自动模式:
    这里写图片描述

    5.6.订阅模式

    这里写图片描述
    5.6.1.图示
    这里写图片描述

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
    这里写图片描述

    5.6.2.消息的生产者(看作是后台系统)
    向交换机中发送消息。

    package com.zpc.rabbitmq.subscribe;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
    5.6.3.消费者1(看作是前台系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work1";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.4.消费者2(看作是搜索系统)

    package com.zpc.rabbitmq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work2";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.6.5.测试
    测试结果:
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    在管理工具中查看队列和交换机的绑定关系:

    这里写图片描述

    5.7.路由模式

    这里写图片描述
    5.7.1.图示
    这里写图片描述

    5.7.2.生产者
    这里写图片描述
    5.7.3.消费者1(假设是前台系统)
    这里写图片描述
    5.7.4.消费2(假设是搜索系统)
    这里写图片描述

    5.8.主题模式(通配符模式)

    这里写图片描述

    这里写图片描述

    5.8.1.图示
    这里写图片描述
    同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

    5.8.2.生产者

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 消息内容
            String message = "Hello World!!";
            channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    5.8.3.消费者1(前台系统)

    package com.zpc.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_1";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    5.8.4.消费者2(搜索系统)

    package com.zpc.rabbitmq.topic;
    
    import com.zpc.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_topic_work_2";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [Recv2_x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    6.Spring-Rabbit

    6.1.Spring项目

    http://spring.io/projects

    这里写图片描述

    6.2.简介

    这里写图片描述
    这里写图片描述

    6.3.使用

    6.3.1.消费者

    package com.zpc.rabbitmq.spring;
    
    /**
     * 消费者
     *
     * @author Evan
     */
    public class Foo {
    
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("\n消费者: " + foo + "\n");
        }
    }
    

    6.3.2.生产者

    package com.zpc.rabbitmq.spring;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class SpringMain {
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, 鸟鹏!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    6.3.3.配置文件
    1、定义连接工厂

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
       host="127.0.0.1" port="5672" username="admin" password="admin"
       virtual-host="testhost" />
    

    2、定义模板(可以指定交换机或队列)

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    

    3、定义队列、交换机、以及完成队列和交换机的绑定

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="zpcQueue" auto-declare="true"/>
    
    <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
       <rabbit:bindings>
          <rabbit:binding queue="zpcQueue"/>
       </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    4、定义监听

    <rabbit:listener-container connection-factory="connectionFactory">
       <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
    </rabbit:listener-container>
    
    <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    

    5、定义管理,用于管理队列、交换机等:

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />
    

    完整配置文件rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
       <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
          exchange="fanoutExchange" routing-key="foo.bar" /> -->
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义队列,自动声明 -->
       <rabbit:queue name="zpcQueue" auto-declare="true"/>
       
       <!-- 定义交换器,把Q绑定到交换机,自动声明 -->
       <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
          <rabbit:bindings>
             <rabbit:binding queue="zpcQueue"/>
          </rabbit:bindings>
       </rabbit:fanout-exchange>
       
    <!--   <rabbit:topic-exchange name="myExchange">
          <rabbit:bindings>
             <rabbit:binding queue="myQueue" pattern="foo.*" />
          </rabbit:bindings>
       </rabbit:topic-exchange> -->
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
       </rabbit:listener-container>
    
       <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
    
    </beans>
    

    6.4.持久化交换机和队列

    这里写图片描述

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。

    如何选择持久化?非持久化? – 看需求。

    欢迎关注公众号「程猿薇茑」

    7.Spring集成RabbitMQ一个完整案例

    创建三个系统A,B,C
    A作为生产者,B、C作为消费者(B,C作为web项目启动)
    项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077

    7.1.在A系统中发送消息到交换机

    7.1.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.zpc</groupId>
       <artifactId>myrabbitA</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>
       <name>myrabbit</name>
    
       <dependencies>
          <dependency>
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-rabbit</artifactId>
             <version>1.4.0.RELEASE</version>
          </dependency>
    
          <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.47</version>
          </dependency>
       </dependencies>
    </project>
    

    7.1.2.队列和交换机的绑定关系
    实现:
    1、在配置文件中将队列和交换机完成绑定
    2、可以在管理界面中完成绑定
    a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
    b)管理更加灵活
    c)更容易对绑定关系的权限管理,流程管理
    本例选择第2种方式
    7.1.3.配置
    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
       <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
       <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
       <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
    
       <!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
       <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
       <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
    </beans>
    

    7.1.4.消息内容
    方案:
    1、消息内容使用对象做json序列化发送
    a)数据大
    b)有些数据其他人是可能用不到的
    2、发送特定的业务字段,如id、操作类型

    7.1.5.实现
    生产者MsgSender.java:

    package com.zpc.myrabbit;
    
    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.AbstractApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    
    /**
     * 消息生产者
     */
    public class MsgSender {
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
    
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            //发送消息
            Map<String, Object> msg = new HashMap<String, Object>();
            msg.put("type", "1");
            msg.put("date", date);
            template.convertAndSend("type2", JSON.toJSONString(msg));
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    

    7.2.在B系统接收消息

    7.2.1.导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zpc</groupId>
        <artifactId>myrabbitB</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
    
        <name>myrabbit</name>
        <properties>
            <spring.version>4.1.3.RELEASE</spring.version>
            <fastjson.version>1.2.46</fastjson.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>${project.artifactId}</finalName>
            <plugins>
                <!-- web层需要配置Tomcat插件 -->
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <configuration>
                        <path>/testRabbit</path>
                        <uriEncoding>UTF-8</uriEncoding>
                        <port>8081</port>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    7.2.2.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义B系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testB" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.2.3.具体处理逻辑

    public class Listener {
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者B开始处理消息: " + msg + "\n");
        }
    }
    

    7.2.4.在界面管理工具中完成绑定关系
    选中定义好的交换机(exchange)
    这里写图片描述
    1)direct
    这里写图片描述
    2)fanout
    这里写图片描述
    3)topic
    这里写图片描述

    7.3.在C系统中接收消息

    (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)

    7.3.1.配置

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
       <!-- 定义RabbitMQ的连接工厂 -->
       <rabbit:connection-factory id="connectionFactory"
          host="127.0.0.1" port="5672" username="admin" password="admin"
          virtual-host="testhost" />
    
       <!-- MQ的管理,包括队列、交换器等 -->
       <rabbit:admin connection-factory="connectionFactory" />
    
       <!-- 定义C系统需要监听的队列,自动声明 -->
       <rabbit:queue name="q_topic_testC" auto-declare="true"/>
    
       <!-- 队列监听 -->
       <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
       </rabbit:listener-container>
    
       <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
    </beans>
    

    7.3.2.处理业务逻辑

    public class Listener {
    
        //具体执行业务的方法
        public void listen(String msg) {
            System.out.println("\n消费者C开始处理消息: " + msg + "\n");
        }
    }
    

    7.3.3.在管理工具中绑定队列和交换机
    见7.2.4

    7.3.4.测试
    分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型

    8.Springboot集成RabbitMQ

    8.1.简单队列

    1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置application.properties文件
    配置rabbitmq的安装地址、端口以及账户信息

    spring.application.name=spirng-boot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    

    3、配置rabbitmq队列

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    }
    

    4、rabbitmq发送者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    }
    

    5、rabbitmq接收者

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6、测试

    package com.zpc.rabbitmq;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
    
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    }
    

    8.2.多对多使用(Work模式)

    注册两个Receiver:

    package com.zpc.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_hello")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    
    }
    
    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            helloSender.send(i);
            Thread.sleep(300);
        }
    }
    
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + i + " " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
    

    8.3.Topic Exchange(主题模式)

    • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列(消费者)来演示。
    1)配置队列,绑定交换机

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TopicRabbitConfig {
    
        final static String message = "q_topic_message";
        final static String messages = "q_topic_messages";
    
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
    
        @Bean
        public Queue queueMessages() {
            return new Queue(TopicRabbitConfig.messages);
        }
    
        /**
         * 声明一个Topic类型的交换机
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("mybootexchange");
        }
    
        /**
         * 绑定Q到交换机,并且指定routingKey
         * @param queueMessage
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
    }
    

    2)创建2个消费者
    q_topic_message 和q_topic_messages

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_message")
    public class Receiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_topic_messages")
    public class Receiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3)消息发送者(生产者)

    package com.zpc.rabbitmq.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send1() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
        }
    
    
        public void send2() {
            String context = "hi, i am messages 2";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
        }
    }
    

    send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
    4)测试

    package com.zpc.rabbitmq.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTopicTest {
    
        @Autowired
        private MsgSender msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send1();
        }
    
        @Test
        public void send2() throws Exception {
            msgSender.send2();
        }
    }
    

    8.4.Fanout Exchange(订阅模式)

    • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
      1)配置队列,绑定交换机
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutRabbitConfig {
    
        @Bean
        public Queue aMessage() {
            return new Queue("q_fanout_A");
        }
    
        @Bean
        public Queue bMessage() {
            return new Queue("q_fanout_B");
        }
    
        @Bean
        public Queue cMessage() {
            return new Queue("q_fanout_C");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("mybootfanoutExchange");
        }
    
        @Bean
        Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(aMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(bMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(cMessage).to(fanoutExchange);
        }
    }
    

    2)创建3个消费者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_A")
    public class ReceiverA {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_B")
    public class ReceiverB {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    
    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "q_fanout_C")
    public class ReceiverC {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3)生产者

    package com.zpc.rabbitmq.fanout;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MsgSenderFanout {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
        }
    }
    

    4)测试

    package com.zpc.rabbitmq.fanout;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitFanoutTest {
    
        @Autowired
        private MsgSenderFanout msgSender;
    
        @Test
        public void send1() throws Exception {
            msgSender.send();
        }
    }
    

    结果如下,三个消费者都收到消息:
    AReceiver : hi, fanout msg
    CReceiver : hi, fanout msg
    BReceiver : hi, fanout msg

    9.总结

    推荐springCloud教程:
    https://blog.csdn.net/hellozpc/article/details/83692496

    推荐Springboot2.0教程:
    https://blog.csdn.net/hellozpc/article/details/82531834

    **欢迎关注公众号【程猿薇茑】**
    **微信扫一扫**
    展开全文
  • docker 安装rabbitMQ

    万次阅读 2020-11-24 18:42:53
    搜索rabbitMq,进入官方的镜像,可以看到以下几种类型的镜像;我们选择带有“mangement”的版本(包含web管理页面); 拉取镜像 docker pull rabbitmq 查看所有镜像 docker images 安装和web界面启动 ...

    下载镜像

    进入docker hub镜像仓库地址:https://hub.docker.com/

    搜索rabbitMq,进入官方的镜像,可以看到以下几种类型的镜像;我们选择带有“mangement”的版本(包含web管理页面);

    • 拉取镜像

    docker pull rabbitmq
    • 查看所有镜像
    docker images

    安装和web界面启动

    • 镜像创建和启动容器
    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

    说明:

    • -d 后台运行容器;
    • --name 指定容器名;
    • -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
    • -v 映射目录或文件;
    • --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
    • -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
    • 查看正在运行容器
    docker ps
    •  启动rabbitmq_management       rabbitmq 是刚才--name 指定的容器名;
    docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

    • 访问测试

    浏览器打开web管理端:http://ip:15672

    因为我是mac 我直接访问localhost  登陆账号密码 admin/admin

    • 无法访问

      开启防火墙15672端口

     firewall-cmd --zone=public --add-port=15672/tcp --permanent        
    
     firewall-cmd --reload 
    

    希望对你有所帮助

    展开全文
  • Linux安装RabbitMq Erlang使用

    万次阅读 2019-11-19 15:37:16
    Linux安装RabbitMq 配置安装环境 1. 安装Erlang环境 yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 2)安装ncurses yum -y install ncurses-devel 3)安装erlang环境 ```...
  • RabbitMQ教程_2 RabbitMQ 安装

    万次阅读 2021-01-08 14:52:35
    文章目录2.RabbitMQ 的安装2.1 RabbitMQ简介应用场景AMQP协议常用交换器2.2 RabbitMQ 的安装2.2.1 下载2.2.2 下载的安装包2.2.3 安装步骤1.将rabbitmq安装包上传到linux系统中2.安装依赖包rpm安装方式yum 安装方式3....
  • Rabbitmq

    千次阅读 2019-12-10 11:41:52
    1.应用场景 用于在分布式系统中存储转发消息,在易用性、扩展性... RabbitMQ is a message broker[消息代理]: it accepts and forwards[输出,转发] messages. You can think about it as a post office [邮局...
  • Linux安装RabbitMq步骤流程

    万次阅读 2020-01-09 17:47:38
    1:下载RabbitMq依赖的erlang语言安装包 wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 最新的22版本 2:若缺少epel-release 依赖 yum install epel-release 3:安装erlang软件包 ...
  • RabbitMQ的安装

    万次阅读 2020-06-02 20:14:09
    一、安装erlang环境 ... 这个文件其实不是gz格式的,使用file otp_src_20.1.tar.gz可以查看它的真实数据格式 解压tar -xvf otp_src_20.1.tar.gz 解压后,先安装依赖,这2个必须要安装 yum install ncurses-devel ...
  • rabbitMQ笔记

    万次阅读 2020-09-04 09:36:50
    【1】windows上安装RabbitMQ 【2】命令 【3】ubuntu16.04中安装RabbitMQ 【4】CentOS 6.5 安装 RabbitMQ 3.6.1 【5】远程访问配置 【1】windows上安装RabbitMQ 1.安装erlang环境(otp_win64_20.0.exe),并检查系统...
  • SpringBoot整合RabbitMQ之 典型应用场景实战一

    万次阅读 多人点赞 2018-10-09 15:08:04
    RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、...
  • docker安装RabbitMQ

    万次阅读 2019-12-27 15:39:16
    想玩一下RabbitMQ,在网上查找了linux安装,感觉特别麻烦,本人买的阿里服务器被我安装了docker环境,所以用docker安装更为简洁、快速。 1、查找rabbitmq镜像 注意:如果docker pull rabbitmq 后面不带management...
  • rabbitmq面试题

    万次阅读 多人点赞 2018-12-12 09:53:59
    1.什么是rabbitmq 采用AMQP高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦 2.为什么要使用rabbitmq 1.在分布式系统下具备异步,削峰,负载均衡等一系列高级...
  • RabbitMQ消息确认机制

    万次阅读 2020-06-03 19:10:43
    在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 95,906
精华内容 38,362
关键字:

rabbitmq