精华内容
下载资源
问答
  • RabbitMQ使用场景

    2021-10-05 21:50:09
    一. RabbitMQ 简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对...RabbitMQ使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。A

    一. RabbitMQ 简介

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    二. RabbitMQ 使用场景

    1. 解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)

    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

    图片

    传统模式的缺点:

    • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败

    • 订单系统与库存系统耦合

    引入消息队列

    • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

    • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

    • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    • 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。

    基于消息的模型,关心的是“通知”,而非“处理”。

    短信、邮件通知、缓存刷新等操作使用消息队列进行通知。

    消息队列和RPC的区别与比较:

    • RPC: 异步调用,及时获得调用结果,具有强一致性结果,关心业务调用处理结果。

    • 消息队列:两次异步RPC调用,将调用内容在队列中进行转储,并选择合适的时机进行投递(错峰流控)

    2. 异步提升效率

    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

    扩展:

    异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

    (1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

    图片

    (2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

    图片

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

    图片

    3. 流量削峰

    流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛

    应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。

    之前架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。

    引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。

    关于流量削峰:秒杀系统流量削峰这事儿应该怎么做?

    这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。

    图片

    三. 引入消息队列的优缺点

    优点

    优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦、异步、削峰。

    缺点

    • 系统的可用性降低

    系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。

    • 系统的复杂性提高

    引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?

    • 一致性问题

    A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

    总结

    所以总结来说,消息队列是一种十分复杂的架构,引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避。引入MQ系统复杂度提升了一个数量级,但是在有些场景下,就是复杂十倍百倍,还是需要使用MQ。

    展开全文
  • RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。...RabbitMQ 使用场景 服务解耦 假设有

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    (MQ和redis在分布式系统中都是相当重要的角色)

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密
    在这里插入图片描述
    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
    在这里插入图片描述

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对
    在这里插入图片描述
    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
    在这里插入图片描述
    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景
    在这里插入图片描述

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢
    在这里插入图片描述
    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    在这里插入图片描述

    rabbitmq的六种工作模式

    简单模式

    在这里插入图片描述
    发送消息的程序是生产者
    队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    消费者等待从队列接收消息

    案例:

    pom.xml
    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.tedu</groupId>
    	<artifactId>rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>5.4.3</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.8.0</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    
    

    消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    
    

    工作模式

    在这里插入图片描述
    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		//参数:queue,durable,exclusive,autoDelete,arguments
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		while (true) {
    		    //控制台输入的消息发送到rabbitmq
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			//如果输入的是"exit"则结束生产者进程
    			if ("exit".equals(msg)) {
    				break;
    			}
    			//参数:exchage,routingKey,props,body
    			ch.basicPublish("", "helloworld", null, msg.getBytes());
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    
    

    消费者接收消息

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    
    				//遍历字符串中的字符,每个点使进程暂停一秒
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    
    

    运行测试

    运行:
    一个生产者
    两个消费者
    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    	消费者一: 1,3,5
    	消费者二: 2,4
    

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失
    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("helloworld", false, callback, cancel);
    	}
    }
    
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。
    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者
    在这里插入图片描述

    消息持久化

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    ch.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    ch.queueDeclare("task_queue", true, false, false, null);
    

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    下面是"工作模式"最终完成的生产者和消费者代码

    生产者代码

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class Test3 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue", true,false,false,null);
    
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第三个参数设置消息持久化
    			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    
    

    消费者代码

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test4 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue",true,false,false,null);
    		
    		System.out.println("等待接收数据");
    		
    		ch.basicQos(1); //一次只接收一条消息
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("task_queue", false, callback, cancel);
    	}
    }
    

    剩下四种请看下篇

    展开全文
  • 我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。 虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。 在我们的日志系统中,我们...

    主题模式

    在这里插入图片描述
    在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

    虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

    在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

    这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

    要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

     - * 可以通配单个单词。
     - # 可以通配零个或多个单词。
    

    用一个例子来解释这个问题是最简单的
    在这里插入图片描述
    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “.orange.” 绑定。和Q2是 “..rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    Q1对所有橙色的动物感兴趣。
    Q2想接收关于兔子和慢速动物的所有消息。
    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,“lazy.orange.male.rabbit”,即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    完成代码

    生产者

    package rabbitmq.topic;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".contentEquals(msg)) {
    				break;
    			}
    			System.out.print("输入routingKey: ");
    			String routingKey = new Scanner(System.in).nextLine();
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
    			
    			System.out.println("消息已发送: "+routingKey+" - "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    package rabbitmq.topic;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入bindingKey,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 topic_logs 交换机
    		//允许使用多个 bindingKey
    		for (String bindingKey : a) {
    			ch.queueBind(queueName, "topic_logs", bindingKey);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    RPC模式

    在这里插入图片描述
    如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.

    在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

    客户端

    在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

    RPCClient client = new RPCClient();
    String result = client.call("4");
    System.out.println( "第四个斐波那契数是: " + result);
    

    回调队列 Callback Queue

    使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

    //定义回调队列,
    //自动生成对列名,非持久,独占,自动删除
    callbackQueueName = ch.queueDeclare().getQueue();
    
    //用来设置回调队列的参数对象
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    //发送调用消息
    ch.basicPublish("", "rpc_queue", props, message.getBytes());
    
    消息属性 Message Properties
    AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4:
    deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
    contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
    replyTo:通常用于指定回调队列。
    correlationId:将RPC响应与请求关联起来非常有用。
    

    关联id (correlationId):

    在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

    这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

    小结

    在这里插入图片描述
    RPC的工作方式是这样的:

    • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
    • 请求被发送到rpc_queue队列。
    • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
    • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。

    服务器端:

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCServer {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		/*
    		 * 定义队列 rpc_queue, 将从它接收请求信息
    		 * 
    		 * 参数:
    		 * 1. queue, 对列名
    		 * 2. durable, 持久化
    		 * 3. exclusive, 排他
    		 * 4. autoDelete, 自动删除
    		 * 5. arguments, 其他参数属性
    		 */
    		ch.queueDeclare("rpc_queue",false,false,false,null);
    		ch.queuePurge("rpc_queue");//清除队列中的内容
    		
    		ch.basicQos(1);//一次只接收一条消息
    		
    		
    		//收到请求消息后的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//处理收到的数据(要求第几个斐波那契数)
    				String msg = new String(message.getBody(), "UTF-8");
    				int n = Integer.parseInt(msg);
    				//求出第n个斐波那契数
    				int r = fbnq(n);
    				String response = String.valueOf(r);
    				
    				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
    				BasicProperties replyProps = new BasicProperties.Builder()
    						.correlationId(message.getProperties().getCorrelationId())
    						.build();
    				/*
    				 * 发送响应消息
    				 * 1. 默认交换机
    				 * 2. 由客户端指定的,用来传递响应消息的队列名
    				 * 3. 参数(关联id)
    				 * 4. 发回的响应消息
    				 */
    				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
    				//发送确认消息
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
    		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    	}
    
    	protected static int fbnq(int n) {
    		if(n == 1 || n == 2) return 1;
    		
    		return fbnq(n-1)+fbnq(n-2);
    	}
    }
    
    

    客户端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCClient {
    	Connection con;
    	Channel ch;
    	
    	public RPCClient() throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		con = f.newConnection();
    		ch = con.createChannel();
    	}
    	
    	public String call(String msg) throws Exception {
    		//自动生成对列名,非持久,独占,自动删除
    		String replyQueueName = ch.queueDeclare().getQueue();
    		//生成关联id
    		String corrId = UUID.randomUUID().toString();
    		
    		//设置两个参数:
    		//1. 请求和响应的关联id
    		//2. 传递响应数据的queue
    		BasicProperties props = new BasicProperties.Builder()
    				.correlationId(corrId)
    				.replyTo(replyQueueName)
    				.build();
    		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
    		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
    		
    		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
    		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    		
    		//接收响应数据的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
    				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
    					//把收到的响应数据,放入阻塞集合
    					response.offer(new String(message.getBody(), "UTF-8"));
    				}
    			}
    		};
    
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//开始从队列接收响应数据
    		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
    		//返回保存在集合中的响应数据
    		return response.take();
    	}
    	
    	public static void main(String[] args) throws Exception {
    		RPCClient client = new RPCClient();
    		while (true) {
    			System.out.print("求第几个斐波那契数:");
    			int n = new Scanner(System.in).nextInt();
    			String r = client.call(""+n);
    			System.out.println(r);
    		}
    	}
    }
    
    
    展开全文
  • 来源:my.oschina.net/u/4115134/blog/32233711、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送...

    来源:my.oschina.net/u/4115134/blog/3223371

    1、RabbitMQ的基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    2、RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可。

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难。

    这是由于服务之间耦合度过于紧密。

    再来考虑用RabbitMQ解耦的情况。

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可。

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对。

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力。

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了。

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力。

    这是消息队列服务器非常典型的应用场景。

    异步调用

    考虑定外卖支付成功的情况。

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长。

    这样就造成整条调用链路响应非常缓慢。

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右。

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作。

    推荐好文
    
    强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
    
    分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
    能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!
    
    
    
    
    展开全文
  • 解决方案:rabbitmq使用场景-超时未支付订单处理 消息的TTL(Time To Live) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个...
  • kafka与RabbitMq使用业务场景区别 1.kafka与rabbitMq使用区别 架构模型 rabbitmq RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer...
  • RabbitMQ使用场景与六种工作模式rabbitmq的六种工作模式简单模式使用场景工作模式使用场景发布订阅模式使用场景路由模式使用场景主题模式使用场景 rabbitmq的六种工作模式 简单模式 生产者将消息交给默认的交换机,...
  • 2.RabbitMQ应用场景 MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在...
  • 这是陈东景于2021年8月29日下午16点原创作品,转载请标明出处!!!! 在进行软件设计的过程中,如果软件设计业务上存在需要短时间内处理大批量的信息,又需要能保证软件能正常运行(保证...目前我们使用到的比...
  • 最终, 消息会被广播到所有消息接受者 Exchanges 交换机 RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。 相反,生产者只能...
  • RabbitMQ是一个由erlang开发的...消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是...
  • RabbitMq是基于AMQP协议的开源消息代理软件,与spring的整合在上一篇中已经介绍了。如果想细致了解可以去spring官网上查看(https://spring.io/projects/spring-amqp#overview)。 如何保证消息投递成功 1.开启应答...
  • 1、核心场景 1.解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)   场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。 传统模式的缺点:   1.假如库存...
  • 使用场景:简单的发送与接收,没有特别的处理。 一个P向queue发送一个message,一个C从该queue接收message并打印。producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。 场景2:单发送多...
  • 特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。 二、RabbitMQ 官网拜读 首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及...
  • RabbitMQ应用场景

    2021-04-18 22:21:38
    RabbitMQ应用场景 原文链接:https://blog.csdn.net/whoamiyang/article/details/54954780 1 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将...
  • 2.RabbitMQ应用场景 MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在...
  • RabbitMQ 使用场景# 2. 异步提升效率# 3. 流量削峰# 优点# 缺点# 一. RabbitMQ 简介# MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用...
  • 在前面的博客中我们已经详细的介绍了RabbitMQ的5种模型,接下来让我们来看看在SpringBoot中如何使用RabbitMQRabbitMQ5种模型回顾 1.点对点模型:生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。...
  • 1. MQ的应用场景 1.1 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式 串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部...
  • RabbitMQ之应用场景

    2021-08-16 10:08:29
    该笔记大部分搬运B站编程不良人的RabbitMQ,顺便把图文合并记录,便于回顾,仅用于学习! 视频地址:https://www.bilibili.com/video/BV1dE411K7MG 作者真的非常好,别白嫖,记得三连 如有侵权,请联系删除! 1. ...
  • 配置就不写了,直接上SpringBoot使用rabbitmq,有人看在补配置 rabbitmq常用的6种工作模式 1.简单模式 2.工作模式 3.发布/订阅模式 4.路由模式 5.主题模式 6.RPC模式 rabbitmq使用的常量 队列,交换机,路由都用一个...
  • RabbitMQ的五种工作模式及实例使用场景 1. 简单模式 1)、做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在...
  • Rabbitmq的应用场景

    2021-03-20 21:11:09
    Rabbitmq的应用场景 一、异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式 串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部...
  • rabbitmq使用mqtt协议

    千次阅读 2021-10-28 20:52:38
    三、使用步骤1.引入库2.读入数据总结 前言 在网上学习物联网,发现有人可以用 springboot + rabbitmq可以搭建物联网(IOT)平台,rabbitmq 不是消息队列吗,原来rabbitmq有两种协议,消息队列是用的AMQP协议,而用...
  • RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长链接,...
  • rabbitmq使用详解

    千次阅读 2021-12-17 10:48:36
    rabbitmq使用详解
  • docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management #开启防火墙 firewall-cmd --zone=public --add-port=15672/tcp --per
  • RabbitMQ RabbitMQ的五种工作模式 ...应用场景: 短信,聊天 代码实例: 发布者 // An highlighted block public static void main(String[] args) throws IOException, TimeoutException { //
  • RabbitMQ 02 使用场景

    2021-03-22 10:35:24
    RabbitMQ 使用场景服务解耦1.假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可但是,随着我们的应用规模不断扩大,会有更多的服务需要...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 48,423
精华内容 19,369
关键字:

rabbitmq使用场景