精华内容
下载资源
问答
  • RabbitMQ实战

    2021-11-23 18:17:05
    其中课程的学习链接地址:RabbitMQ实战教程-@rabbitlistener,rabbitlistener-Java视频教程-编程语言-CSDN程序员研修院 RabbitMQ 官网拜读 首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感...

    RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。

    其中课程的学习链接地址:RabbitMQ实战教程-@rabbitlistener,rabbitlistener-Java视频教程-编程语言-CSDN程序员研修院 

    RabbitMQ 官网拜读

    首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友可以耐心的阅读其中的相关介绍,相信会有一定的收获,地址可见:

    RabbitMQ Tutorials — RabbitMQ

    阅读该手册过程中,我们可以得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而我们在实战应用中,实际上也是紧紧围绕着 “消息模型” 来展开撸码的!

    下面,我就介绍一下这一消息模型的演变历程,当然,这一历程在 RabbitMQ 官网也是可以窥览得到的!

    enter image description here

    enter image description here

    enter image description here

    上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名!

    1. 生产者:发送消息的程序
    2. 消费者:监听接收消费消息的程序
    3. 消息:一串二进制数据流
    4. 队列:消息的暂存区/存储区
    5. 交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种
    6. 路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列!

    正如上图所展示的消息模型的演变,接下来我们将以代码的形式实战各种典型的业务场景!

    SpringBoot 整合 RabbitMQ 实战

    工欲善其事,必先利其器。我们首先需要借助 IDEA 的 Spring Initializr 用 Maven 构建一个 SpringBoot 的项目,并引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依赖。搭建完成之后,可以简单的写个 RabbitMQController 测试一下项目是否搭建是否成功(可以暂时用单模块方式构建)

    紧接着,我们进入实战的核心阶段,在项目或者服务中使用 RabbitMQ,其实无非是有几个核心要点要牢牢把握住,这几个核心要点在撸码过程中需要“时刻的游荡在自己的脑海里”,其中包括:

    1. 我要发送的消息是什么
    2. 我应该需要创建什么样的消息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等
    3. 我要处理的消息是实时的还是需要延时/延迟的?
    4. 消息的生产者需要在哪里写,消息的监听消费者需要在哪里写,各自的处理逻辑是啥

    基于这样的几个要点,我们先小试牛刀一番,采用 RabbitMQ 实战异步写日志与异步发邮件。当然啦,在进行实战前,我们需要安装好 RabbitMQ 及其后端控制台应用,并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件。

    RabbitMQ 安装完成后,打开后端控制台应用:http://localhost:15672  输入guest guest 登录,看到下图即表示安装成功

    enter image description here

    然后是项目配置文件层面的配置 application.properties

     
    
    1. spring.rabbitmq.host=127.0.0.1

    2. spring.rabbitmq.port=5672

    3. spring.rabbitmq.username=guest

    4. spring.rabbitmq.password=guest

    5. spring.rabbitmq.listener.concurrency=10

    6. spring.rabbitmq.listener.max-concurrency=20

    7. spring.rabbitmq.listener.prefetch=5

    其中,后面三个参数主要是用于“并发量的配置”,表示:并发消费者的初始化值,并发消费者的最大值,每个消费者每次监听时可拉取处理的消息数量。

    接下来,我们需要以 Configuration 的方式配置 RabbitMQ 并以 Bean 的方式显示注入 RabbitMQ 在发送接收处理消息时相关 Bean 组件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充当消息的发送组件,后者是用于管理  RabbitMQ监听器listener 的容器工厂,其代码如下:

     
    
    1. @Configuration

    2. public class RabbitmqConfig {

    3. private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);

    4. @Autowired

    5. private Environment env;

    6. @Autowired

    7. private CachingConnectionFactory connectionFactory;

    8. @Autowired

    9. private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    10. /**

    11. * 单一消费者

    12. * @return

    13. */

    14. @Bean(name = "singleListenerContainer")

    15. public SimpleRabbitListenerContainerFactory listenerContainer(){

    16. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    17. factory.setConnectionFactory(connectionFactory);

    18. factory.setMessageConverter(new Jackson2JsonMessageConverter());

    19. factory.setConcurrentConsumers(1);

    20. factory.setMaxConcurrentConsumers(1);

    21. factory.setPrefetchCount(1);

    22. factory.setTxSize(1);

    23. factory.setAcknowledgeMode(AcknowledgeMode.AUTO);

    24. return factory;

    25. }

    26. /**

    27. * 多个消费者

    28. * @return

    29. */

    30. @Bean(name = "multiListenerContainer")

    31. public SimpleRabbitListenerContainerFactory multiListenerContainer(){

    32. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    33. factoryConfigurer.configure(factory,connectionFactory);

    34. factory.setMessageConverter(new Jackson2JsonMessageConverter());

    35. factory.setAcknowledgeMode(AcknowledgeMode.NONE);

    36. factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));

    37. factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));

    38. factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));

    39. return factory;

    40. }

    41. @Bean

    42. public RabbitTemplate rabbitTemplate(){

    43. connectionFactory.setPublisherConfirms(true);

    44. connectionFactory.setPublisherReturns(true);

    45. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    46. rabbitTemplate.setMandatory(true);

    47. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

    48. @Override

    49. public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    50. log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);

    51. }

    52. });

    53. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

    54. @Override

    55. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    56. log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);

    57. }

    58. });

    59. return rabbitTemplate;

    60. }}

    RabbitMQ 实战:业务模块解耦以及异步通信

    在一些企业级系统中,我们经常可以见到一个执行 function 通常是由许多子模块组成的,这个 function 在执行过程中,需要 同步的将其代码从头开始执行到尾,即执行流程是 module_A -> module_B -> module_C -> module_D,典型的案例可以参见汇编或者 C 语言等面向过程语言开发的应用,现在的一些 JavaWeb 应用也存在着这样的写法。

    而我们知道,这个执行流程其实对于整个 function 来讲是有一定的弊端的,主要有几点:

    1. 整个 function 的执行响应时间将很久;
    2. 如果某个 module 发生异常而没有处理得当,可能会影响其他 module 甚至整个 function 的执行流程与结果;
    3. 整个 function 中代码可能会很冗长,模块与模块之间可能需要进行强通信以及数据的交互,出现问题时难以定位与维护,甚至会陷入 “改一处代码而动全身”的尴尬境地!

    故而,我们需要想办法进行优化,我们需要将强关联的业务模块解耦以及某些模块之间实行异步通信!下面就以两个场景来实战我们的优化措施!

    场景一:异步记录用户操作日志

    对于企业级应用系统或者微服务应用中,我们经常需要追溯跟踪记录用户的操作日志,而这部分的业务在某种程度上是不应该跟主业务模块耦合在一起的,故而我们需要将其单独抽出并以异步的方式与主模块进行异步通信交互数据。

    下面我们就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也实现“用户登录成功记录日志”的场景。如前面所言,我们需要在脑海里回荡着几个要点:

    • 消息模型:DirectExchange+RoutingKey 消息模型
    • 消息:用户登录的实体信息,包括用户名,登录事件,来源的IP,所属日志模块等信息
    • 发送接收:在登录的 Controller 中实现发送,在某个 listener 中实现接收并将监听消费到的消息入数据表;实时发送接收

    首先我们需要在上面的 RabbitmqConfig 类中创建消息模型:包括 Queue、Exchange、RoutingKey 等的建立,代码如下:

    enter image description here

    上图中 env 获取的信息,我们需要在 application.properties 进行配置,其中 mq.env=local

    enter image description here

    此时,我们将整个项目/服务跑起来,并打开 RabbitMQ 后端控制台应用,即可看到队列以及交换机及其绑定已经建立好了,如下所示:

    enter image description here

    enter image description here

    接下来,我们需要在 Controller 中执行用户登录逻辑,记录用户登录日志,查询获取用户角色视野资源信息等,由于篇幅关系,在这里我们重点要实现的是用MQ实现 “异步记录用户登录日志” 的逻辑,即在这里 Controller 将充当“生产者”的角色,核心代码如下:

     
    
    1. @RestController

    2. public class UserController {

    3. private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);

    4. private static final String Prefix="user";

    5. @Autowired

    6. private ObjectMapper objectMapper;

    7. @Autowired

    8. private UserMapper userMapper;

    9. @Autowired

    10. private UserLogMapper userLogMapper;

    11. @Autowired

    12. private RabbitTemplate rabbitTemplate;

    13. @Autowired

    14. private Environment env;

    15. @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)

    16. public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){

    17. BaseResponse response=new BaseResponse(StatusCode.Success);

    18. try {

    19. //TODO:执行登录逻辑

    20. User user=userMapper.selectByUserNamePassword(userName,password);

    21. if (user!=null){

    22. //TODO:异步写用户日志

    23. try {

    24. UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));

    25. userLog.setCreateTime(new Date());

    26. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

    27. rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));

    28. rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));

    29. Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

    30. message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);

    31. rabbitTemplate.convertAndSend(message);

    32. }catch (Exception e){

    33. e.printStackTrace();

    34. }

    35. //TODO:塞权限数据-资源数据-视野数据

    36. }else{

    37. response=new BaseResponse(StatusCode.Fail);

    38. }

    39. }catch (Exception e){

    40. e.printStackTrace();

    41. }

    42. return response;

    43. }}

    在上面的“发送逻辑”代码中,其实也体现了我们最开始介绍的演进中的几种消息模型,比如我们是将消息发送到 Exchange 的而不是 Queue,消息是以二进制流的形式进行传输等等。当用 postman 请求到这个 controller 的方法时,我们可以在 RabbitMQ 的后端控制台应用看到一条未确认的消息,通过 GetMessage 即可看到其中的详情,如下:

    enter image description here

    最后,我们将开发消费端的业务代码,如下:

     
    
    1. @Component

    2. public class CommonMqListener {

    3. private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);

    4. @Autowired

    5. private ObjectMapper objectMapper;

    6. @Autowired

    7. private UserLogMapper userLogMapper;

    8. @Autowired

    9. private MailService mailService;

    10. /**

    11. * 监听消费用户日志

    12. * @param message

    13. */

    14. @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")

    15. public void consumeUserLogQueue(@Payload byte[] message){

    16. try {

    17. UserLog userLog=objectMapper.readValue(message, UserLog.class);

    18. log.info("监听消费用户日志 监听到消息: {} ",userLog);

    19. //TODO:记录日志入数据表

    20. userLogMapper.insertSelective(userLog);

    21. }catch (Exception e){

    22. e.printStackTrace();

    23. }

    24. }

    将服务跑起来之后,我们即可监听消费到上面 Queue 中的消息,即当前用户登录的信息,而且,我们也可以看到“记录用户登录日志”的逻辑是由一条异于主业务线程的异步线程去执行的:

    enter image description here

    “异步记录用户操作日志”的案例我想足以用于诠释上面所讲的相关理论知识点了,在后续篇章中,由于篇幅限制,我将重点介绍其核心的业务逻辑!

    场景二:异步发送邮件

    发送邮件的场景,其实也是比较常见的,比如用户注册需要邮箱验证,用户异地登录发送邮件通知等等,在这里我以 RabbitMQ 实现异步发送邮件。实现的步骤跟场景一几乎一致!

    1. 消息模型的创建

    enter image description here

    2. 配置信息的创建

    enter image description here

    3. 生产端

    enter image description here

    4. 消费端

    enter image description here

    RabbitMQ 实战:并发量配置与消息确认机制

    实战背景

    对于消息模型中的 listener 而言,默认情况下是“单消费实例”的配置,即“一个 listener 对应一个消费者”,这种配置对于上面所讲的“异步记录用户操作日志”、“异步发送邮件”等并发量不高的场景下是适用的。但是在对于秒杀系统、商城抢单等场景下可能会显得很吃力!

    我们都知道,秒杀系统跟商城抢单均有一个共同的明显的特征,即在某个时刻会有成百上千万的请求到达我们的接口,即瞬间这股巨大的流量将涌入我们的系统,我们可以采用下面一图来大致体现这一现象:

    enter image description here

    当到了“开始秒杀”、“开始抢单”的时刻,此时系统可能会出现这样的几种现象:

    • 应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象;
    • 接口逻辑没有考虑并发情况,数据库读写锁发生冲突,导致最终处理结果跟理论上的结果数据不一致(如商品存库量只有 100,但是高并发情况下,实际表记录的抢到的用户记录数据量却远远大于 100);
    • 应用占据服务器的资源直接飙高,如 CPU、内存、宽带等瞬间直接飙升,导致同库同表甚至可能同 host 的其他服务或者系统出现卡顿或者挂掉的现象;

    于是乎,我们需要寻找解决方案!对于目前来讲,网上均有诸多比较不错的解决方案,在此我顺便提一下我们的应用系统采用的常用解决方案,包括:

    • 我们会将处理抢单的整体业务逻辑独立、服务化并做集群部署;
    • 我们会将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口,从而减少数据库读写锁冲突的发生以及由于接口逻辑的复杂出现线程堵塞而导致应用占据服务器资源飙升;
    • 我们会将抢单业务所在系统的其他同数据源甚至同表的业务拆分独立出去服务化,并基于某种 RPC 协议走 HTTP 通信进行数据交互、服务通信等等;
    • 采用分布式锁解决同一时间同个手机号、同一时间同个 IP 刷单的现象;

    下面,我们用 RabbitMQ 来实战上述的第二点!即我们会在“请求” -> "处理抢单业务的接口" 中间架一层消息中间件做“缓冲”、“缓压”处理,如下图所示:

    enter image description here

    并发量配置与消息确认机制

    正如上面所讲的,对于抢单、秒杀等高并发系统而言,如果我们需要用 RabbitMQ 在 “请求” - “接口” 之间充当限流缓压的角色,那便需要我们对 RabbitMQ 提出更高的要求,即支持高并发的配置,在这里我们需要明确一点,“并发消费者”的配置其实是针对 listener 而言,当配置成功后,我们可以在 MQ 的后端控制台应用看到 consumers 的数量,如下所示:

    enter image description here

    其中,这个 listener 在这里有 10 个 consumer 实例的配置,每个 consumer 可以预监听消费拉取的消息数量为 5 个(如果同一时间处理不完,会将其缓存在 mq 的客户端等待处理!)

    另外,对于某些消息而言,我们有时候需要严格的知道消息是否已经被 consumer 监听消费处理了,即我们有一种消息确认机制来保证我们的消息是否已经真正的被消费处理。在 RabbitMQ 中,消息确认处理机制有三种:Auto - 自动、Manual - 手动、None - 无需确认,而确认机制需要 listener 实现 ChannelAwareMessageListener 接口,并重写其中的确认消费逻辑。在这里我们将用 “手动确认” 的机制来实战用户商城抢单场景。

    1.在 RabbitMQConfig 中配置确认消费机制以及并发量的配置

    enter image description here

    2.消息模型的配置信息

    enter image description here

    3.RabbitMQ 后端控制台应用查看此队列的并发量配置

    enter image description here

    4.listener 确认消费处理逻辑:在这里我们需要开发抢单的业务逻辑,即“只有当该商品的库存 >0 时,抢单成功,扣减库存量,并将该抢单的用户信息记录入表,异步通知用户抢单成功!”

    enter image description here

    enter image description here

    紧接着我们采用 CountDownLatch 模拟产生高并发时的多线程请求(或者采用 jmeter 实施压测也可以!),每个请求将携带产生的随机数:充当手机号 -> 充当消息,最终入抢单队列!在这里,我模拟了 50000 个请求,相当于 50000 手机号同一时间发生抢单的请求,而设置的产品库存量为 100,这在 product 数据库表即可设置

    enter image description here

    6.将抢单请求的手机号信息压入队列,等待排队处理

    enter image description here

    7.在最后我们写个 Junit 或者写个 Controller,进行 initService.generateMultiThread(); 调用模拟产生高并发的抢单请求即可

     
    
    1. @RestController

    2. public class ConcurrencyController {

    3. private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);

    4. private static final String Prefix="concurrency";

    5. @Autowired

    6. private InitService initService;

    7. @RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET)

    8. public BaseResponse robbingThread(){

    9. BaseResponse response=new BaseResponse(StatusCode.Success);

    10. initService.generateMultiThread();

    11. return response;

    12. }}

    8.最后,我们当然是跑起来,在控制台我们可以观察到系统不断的在产生新的请求(线程)– 相当于不断的有抢单的手机号涌入我们的系统,然后入队列,listener 监听到请求之后消费处理抢单逻辑!最后我们可以观察两张数据库表:商品库存表、商品成功抢单的用户记录表 - 只有当库存表中商品对应的库存量为 0、商品成功抢单的用户记录刚好 100 时 即表示我们的实战目的以及效果已经达到了!!

    enter image description here

    总结:如此一来,我们便将 request 转移到我们的 mq,在一定程度缓解了我们的应用以及接口的压力!当然,实际情况下,我们的配置可能远远不只代码层次上的配置,比如我们的 mq 可能会做集群配置、负载均衡、商品库存的更新可能会考虑分库分表、库存更新可能会考虑独立为库存 Dubbo 服务并通过 Rest Api 异步通信交互并独立部署等等。这些优化以及改进的目的其实无非是为了能限流、缓压、保证系统稳定、数据的一致等!而我们的 MQ,在其中可以起到不可磨灭的作用,其字如其名:“消息队列”,而队列具有 “先进先出” 的特点,故而所有进入 MQ 的消息都将 “乖巧” 的在 MQ 上排好队,先来先排队,先来先被处理消费,由此一来至少可以避免 “瞬间时刻一窝蜂的 request 涌入我们的接口” 的情况!

    展开全文
  • 本文摘录总结自《RabbitMQ实战指南》。 一、消息中间件 消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它...

    本文摘录总结自《RabbitMQ实战指南》。

    一、消息中间件

    消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。

    消息中间件的作用如下:

    • 解耦:消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可;

    • 冗余〈存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险;

    • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数;

    • 削峰:访问量剧增的情况并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯;

    • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理;

    • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性;

    • 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度;

    • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

    二、RabbitMQ

    RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

    1. 生产者和消费者

    (1) 生产者

    Producer:生产者,就是投递消息的一方。

    生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。

    (2) 消费者

    Consumer:消费者,就是接收消息的一方。

    消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。

    2. Broker和队列

    (1) Broker

    Broker:消息中间件的服务节点。

    对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。

    (2) 队列

    Queue:队列,是RabbitMQ的内部对象,用于存储消息。

    多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。

    3. 交换器、路由键和绑定键

    (1) 交换器

    Exchange:交换器。

    生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,要么会返回给生产者,要么直接丢弃。

    (2) 路由键

    RoutingKey:路由键。

    生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

    (3) 绑定键

    BindingKey:绑定键。

    RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。

    在使用绑定的时候,其中需要的路由键是BindingKey。在发送消息的时候,其中需要的路由键是RoutingKey。一般可以把两者认为是等价的。

    4. 交换器类型

    RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。

    (1) fanout

    一般翻译为扇出或广播模式。它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

    (2) direct

    direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

    (3) topic

    topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:

    • RoutingKey为一个点号.分隔的字符串(被点号.分隔开的每一段独立的字符串称为一个单词),如"com.rabbitmq.client";

    • 令BindingKey和RoutingKey一样也是点号.分隔的字符串;

    • BindingKey中可以存在两种特殊字符串#*,用于做模糊匹配,其中#用于匹配一个单词,而*用于匹配多规格单词(可以是零个)。

    上图中路由键为"com.rabbitmq.client"的消息会同时路由到Queuel和Queue2,路由键为"com.hidden.client"的消息只会路由到Queue2中。

    (4) headers

    headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

    5. RabbitMQ运转流程

    (1) 生产者发送消息流程

    • 生产者连接到RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel);

    • 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;

    • 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等;

    • 生产者通过路由键将交换器和队列绑定起来;

    • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息;

    • 相应的交换器根据接收到的路由键查找相匹配的队列;

    • 如果找到,则将从生产者发送过来的消息存入相应的队列中;

    • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者;

    • 关闭信道;

    • 关闭连接。

    为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

    (2) 消费者接收消息流程

    • 消费者连接到RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel);

    • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作;

    • 等待RabbitMQ Broker回应并投递相应队列中的消息;

    • 消费者确认(ack)接收到的消息;

    • RabbitMQ从队列中删除相应己经被确认的消息;

    • 关闭信道;

    • 关闭连接。

    RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。

    无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。

    6. AMQP协议

    AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定时的BindingKey相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。

    三、RabbitMQ进阶

    1. 消息何去何从

    mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Altemate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存储起来,而不用返回给客户端。

    (1) mandatory参数

    当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。生产者可以通过调用channel.addReturnListener来添加ReturnListener监昕器实现。

    (2) immediate参数

    当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。

    概括来说,mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。RabbitMQ 3.0版本去掉了immediate参数,因为会影响镜像队列的性能,增加代码复杂性。

    (3) 备份交换器

    备份交换器,英文名称为Altemate Exchange,简称AE,或者更直白地称之为"备胎交换器"。

    生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。

    2. 过期时间(TTL)

    TTL,Time to Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。

    目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信"(Dead Message),消费者将无法再收到该消息(这点不是绝对的)。

    对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为需要扫描一遍队列才知道哪些消息是过期的,开销较大,采用当消息被发给消费者时判断是否到期并抹去的方法。

    3. 死信队列

    DLX,全称为Dead Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

    消息变成死信一般是由于以下几种情况:

    • 消息被拒绝(Basic.Reject/Basic.Nack),井且设置requeue参数为false;

    • 消息过期;

    • 队列达到最大长度。

    4. 延迟队列

    延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

    在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的DLX和TTL模拟出延迟队列的功能。假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过exchange.normal这个交换器将发送的消息存储在queue.normal这个队列中。消费者订阅的并非是queue.normal这个队列,而是queue.dlx这个队列。当消息从queue.normal这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。

    5. 优先级队列

    优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

    如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

    6. 持久化

    持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、右机等)下的数据丢失。

    如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。要确保消息不会丢失,需要将消息和队列都设置为持久化,但对消息设置持久化会严重影响RabbitMQ的性能。

    将交换器、队列、消息都设置了持久化之后也不能百分之百保证数据不丢失。

    7. 生产者确认

    当消息的生产者将消息发送出去之后,生产者如何得知消息到底有没有正确地到达服务器呢?RabbitMQ针对这个问题,提供了两种解决方式:

    • 通过事务机制实现;
    • 通过发送方确认(publisher confirm)机制实现。

    (1) 事务机制

    RabbitMQ客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit和channel.txRollback。channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。

    (2) 发送方确认机制

    生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。

    (3) 两者的区别

    事务机制和publisher∞nfirm机制两者是互斥的,不能共存。事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。

    publisherconfmn的优势在于并不一定需要同步确认。所以可以改进一下使用方式,总结有如下两种:

    • 批量confirm方法:每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;

    • 异步confirm方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会因调这个方法进行处理。

    批量confirm在出现返回Basic.Nack或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量confirm的性能应该是不升反降的。

    8. 消费者的要点

    对于RabbitMQ消费端来说,还有几点需要注意:

    • 消息分发;
    • 消息顺序性。

    (1) 消息的分发

    当RabbitMQ队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。但没有考虑到每个生产者的性能以及当前任务量等。可以用channel.basicQos方法允许限制信道上的消费者所能保持的最大未确认消息的数量。

    (2) 消息的顺序性

    消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。

    如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。

    展开全文
  • RabbitMQ实战指南.pdf

    2021-02-26 10:04:45
    JA ( 工业技术->自动化技术、计算机技术->计算技术、计算机技术->计算机软件 ) 内容提要:《RabbitMQ实战指南》从消息中间件的概念和RabbitMQ的历史切入,主要阐述RabbitMQ的安装、使用、配置、管理、运维、原理、...

    19c5bd5bf59e0c28fb9aef2bcb49e39f.png

    9480dab0d3767b916aed43b9cc86cc65.png

    作 者 :朱忠华

    出版发行 : 北京:电子工业出版社 , 2017.11

    ISBN号 :978-7-121-32991-3

    页 数 : 336

    原书定价 : 79.00

    开本 : 128开

    主题词 : JAVA语言-程序设计-指南

    中图法分类号 : TP312;JA ( 工业技术->自动化技术、计算机技术->计算技术、计算机技术->计算机软件 )

    内容提要:《RabbitMQ实战指南》从消息中间件的概念和RabbitMQ的历史切入,主要阐述RabbitMQ的安装、使用、配置、管理、运维、原理、扩展等方面的细节。《RabbitMQ实战指南》大致可以分为基础篇、进阶篇和高阶篇三个部分。基础篇首先介绍RabbitMQ的基本安装及使用方式,方便零基础的读者以最舒适的方式融入到RabbitMQ之中。其次介绍RabbitMQ的基本概念,包括生产者、消费者、交换器、队列、绑定等。之后通过Java语言讲述了客户端如何与RabbitMQ建立(关闭)连接、声明(删除)交换器、队列、绑定关系,以及如何发送和消费消息等。进阶篇讲述RabbitMQ的TTL、死信、延迟队列更多...《RabbitMQ实战指南》从消息中间件的概念和RabbitMQ的历史切入,主要阐述RabbitMQ的安装、使用、配置、管理、运维、原理、扩展等方面的细节。《RabbitMQ实战指南》大致可以分为基础篇、进阶篇和高阶篇三个部分。基础篇首先介绍RabbitMQ的基本安装及使用方式,方便零基础的读者以最舒适的方式融入到RabbitMQ之中。其次介绍RabbitMQ的基本概念,包括生产者、消费者、交换器、队列、绑定等。之后通过Java语言讲述了客户端如何与RabbitMQ建立(关闭)连接、声明(删除)交换器、队列、绑定关系,以及如何发送和消费消息等。进阶篇讲述RabbitMQ的TTL、死信、延迟队列、优先级队列、RPC、消息持久化、生产端和消费端的消息确认机制等内容,以期读者能够掌握RabbitMQ的使用精髓。《RabbitMQ实战指南》中间篇幅主要从RabbitMQ的管理、配置、运维这三个角度来为读者提供帮助文档及解决问题的思路。高阶篇主要阐述RabbitMQ的存储机制、流控及镜像队列的原理,深入地讲述RabbitMQ的一些实现细节,便于读者加深对RabbitMQ的理解。《RabbitMQ实战指南》还涉及网络分区的概念,此内容可称为魔鬼篇,需要掌握前面的所有内容才可理解其中的门道。《RabbitMQ实战指南》最后讲述的是RabbitMQ的一些扩展内容及附录,供读者参考之用。

    参考文献格式 : 朱忠华.RabbitMQ实战指南[M].北京:电子工业出版社,2017.11.

    展开全文
  • Spring Boot整合RabbitMQ实战 本篇文章将带你了解Rabbitmq,work模型,发布订阅模型,topic模型,生产者confirm消息确认机制,消费者确认机制,return消息机制,TTL队列,死信队列等相关操作 在springboot 中引入...

    Spring Boot整合RabbitMQ实战

    本篇文章将带你了解Rabbitmq,work模型,发布订阅模型,topic模型,生产者confirm消息确认机制,消费者确认机制,return消息机制,TTL队列,死信队列等相关操作

    在springboot 中引入Rabbitmq

    只需要在 pom.xml 中引入,版本跟随spingboot版本
    pom.xml

    <!--rabbitmq-->
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>${spring-boot.version}</version>
            </dependency>

    application.properties

    # rabbitmq
    # 配置虚拟机
    spring.rabbitmq.virtual-host=/
    # 开启消息确认机制 confirm 异步
    spring.rabbitmq.publisher-confirm-type=correlated
    # 之前的旧版本 开启消息确认机制的方式
    # spring.rabbitmq.publisher-confirms=true
    # 开启return机制
    spring.rabbitmq.publisher-returns=true
    # 消息开启手动确认
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456

    work模型

    RabbitmqConfig.java

    @SpringBootConfiguration
    public class RabbitmqConfig {
    
        // 配置一个工作模型队列
        @Bean
        public Queue queueWork1() {
            return new Queue("queue_work");
        }
    }

    不指定交换器和路由的话,会使用默认的交换器和路由

    image.png

    RabbitmqController.java

    @RestController
    public class RabbitmqController {
        @Resource
        private IRabbitmqService rabbitmqService;
    
        /**
         * 生产消费模式
         *
         * @return success
         */
        @GetMapping("/sendWork")
        public Object sendWork() {
            rabbitmqService.sendWork();
            return "发送成功...";
        }
    }

    IRabbitmqService.java

    public interface IRabbitmqService {
        void sendWork();
    }

    RabbitmqService.java

    @Service
    public class RabbitmqService implements IRabbitmqService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void sendWork() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("queue_work", "测试work模型: " + i);
            }
        }
    }

    WorkReceiveListener.java

    @Component
    @Slf4j
    public class WorkReceiveListener {
        @RabbitListener(queues = "queue_work")
        public void receiveMessage(String msg, Channel channel, Message message) {
            // 只包含发送的消息
            log.info("1接收到消息:" + msg);
            // channel 通道信息
            // message 附加的参数信息
        }
    
        @RabbitListener(queues = "queue_work")
        public void receiveMessage2(Object obj, Channel channel, Message message) {
            // 包含所有的信息
            log.info("2接收到消息:{}", obj);
        }
    }

    测试工作模型http://127.0.0.1:8080/sendWork

    控制台打印

    image.png

    发布订阅模型

    RabbitmqConfig.java

    // 发布订阅模式
        // 声明两个队列
        @Bean
        public Queue queueFanout1() {
            return new Queue("queue_fanout1");
        }
    
        @Bean
        public Queue queueFanout2() {
            return new Queue("queue_fanout2");
        }
    
        // 准备一个交换机
        @Bean
        public FanoutExchange exchangeFanout() {
            return new FanoutExchange("exchange_fanout");
        }
    
        // 将交换机和队列进行绑定
        @Bean
        public Binding bindingExchange1() {
            return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
        }
    
        @Bean
        public Binding bindingExchange2() {
            return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
        }

    RabbitmqController.java

    /**
         * 发布订阅模式
         *
         * @return success
         */
        @RequestMapping("/sendPublish")
        public String sendPublish() {
            rabbitmqService.sendPublish();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendPublish();

    RabbitmqService.java

    @Override
        public void sendPublish() {
            for (int i = 0; i < 5; i++) {
                // 使用 convertSendAndReceive 方法时的结果:使用此方法,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间
                // rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅模型:" + i);
                //使用 convertAndSend 方法时的结果:输出时没有顺序,不需要等待,直接运行
                rabbitTemplate.convertAndSend("exchange_fanout", "", "测试发布订阅模型:" + i);
            }
        }

    PublishReceiveListener.java

    @Component
    @Slf4j
    public class PublishReceiveListener {
    
        @RabbitListener(queues = "queue_fanout1")
        public void receiveMsg1(String msg) {
            log.info("队列1接收到消息:{}" , msg);
        }
    
        @RabbitListener(queues = "queue_fanout2")
        public void receiveMsg2(String msg) {
            log.info("队列2接收到消息:{}" , msg);
        }
    }

    测试发布订阅模型http://localhost:8080/sendPublish

    控制台打印

    image.png

    topic 模型

    RabbitmqConfig.java

    // topic 模型
        @Bean
        public Queue queueTopic1() {
            return new Queue("queue_topic1");
        }
    
        @Bean
        public Queue queueTopic2() {
            return new Queue("queue_topic2");
        }
    
        @Bean
        public TopicExchange exchangeTopic() {
            return new TopicExchange("exchange_topic");
        }
    
        /**
         * *(星号):可以(只能)匹配一个单词
         * #(井号):可以匹配多个单词(或者零个)
         */
    
    
        @Bean
        public Binding bindingTopic1() {
            //# 匹配多个
            return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");
        }
    
        @Bean
        public Binding bindingTopic2() {
            // * 匹配一个
            return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");
        }

    RabbitmqController.java

    /**
         * topic 模式
         *
         * @return success
         */
        @RequestMapping("/sendTopic")
        public String sendTopic() {
            rabbitmqService.sendTopic();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendTopic();

    RabbitmqService.java

    @Override
        public void sendTopic() {
            for (int i = 0; i < 10; i++) {
                if (i % 2 == 0) {
                    rabbitTemplate.convertSendAndReceive("exchange_topic", "topic.km.topic", "测试发布订阅模型:" + i);
                } else {
                    rabbitTemplate.convertSendAndReceive("exchange_topic", "topic.km", "测试发布订阅模型:" + i);
                }
            }
        }

    TopicReceiveListener.java

    @Component
    @Slf4j
    public class TopicReceiveListener {
    
        @RabbitListener(queues = "queue_topic1")
        public void receiveMsg1(String msg) {
            log.info("消费者1接收到:{}" , msg);
        }
    
        @RabbitListener(queues = "queue_topic2")
        public void receiveMsg2(String msg) {
            log.info("消费者2接收到:{}" , msg);
        }
    }

    测试topic模式http://localhost:8080/sendTopic

    控制台打印

    image.png

    可以看到消费者1可以接受所有的信息,消费者2只能接受奇数的消息
    在绑定路由规则中
    -(星号)*:可以(只能)匹配一个单词
    -(井号)#:可以匹配多个单词(或者零个)

    发送方confirm机制

    application.properties

    # 开启消息确认机制 confirm 异步
    spring.rabbitmq.publisher-confirm-type=correlated
    # 之前的旧版本 开启消息确认机制的方式
    # spring.rabbitmq.publisher-confirms=true

    RabbitmqConfig.java

    // confirm 机制
        @Bean
        public Queue queueConfirm() {
            return new Queue("queue_confirm");
        }

    RabbitmqController.java

    /**
         * 确认机制
         *
         * @return success
         */
        @RequestMapping("/sendConfirm")
        public String sendConfirm() {
            rabbitmqService.sendConfirm();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendConfirm();

    RabbitmqService.java

    // 配置 confirm 机制
        private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 消息相关的数据,一般用于获取 唯一标识 id
             * @param b true 消息确认成功,false 失败
             * @param s 确认失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b) {
                    log.info("confirm 消息确认成功...{}", correlationData.getId());
                } else {
                    log.info("confirm 消息确认失败...{} cause: {}", correlationData.getId(), s);
                }
            }
        };
    
        @Override
        public void sendConfirm() {
            rabbitTemplate.convertAndSend("queue_confirm", new User(1, "km", "km123", new Date()), new CorrelationData("" + System.currentTimeMillis()));
            rabbitTemplate.setConfirmCallback(confirmCallback);
        }

    ConfirmReceiveListener.java

    @Component
    @Slf4j
    public class ConfirmReceiveListener {
        @RabbitListener(queues = "queue_confirm")
        public void receiveMsg(User user) {
            log.info("接收到的消息为:{}", user);
        }
    }

    测试发送方Confirmhttp://localhost:8080/sendConfirm

    控制台打印

    image.png

    消费者ack接收以及拒绝

    application.properties

    # 消息开启手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    RabbitmqConfig.java

    // 测试消费者ack确认
        @Bean
        public Queue queueAck() {
            return new Queue("queue_ack");
        }
    
        @Bean
        public TopicExchange exchangeAck() {
            return new TopicExchange("exchange_ack");
        }
    
        @Bean
        public Binding bindingAck() {
            return BindingBuilder.bind(queueAck()).to(exchangeAck()).with("topic.*");
        }

    RabbitmqController.java

    /**
         * 测试消费者ack
         *
         * @return success
         */
        @RequestMapping("/sendNeedAck")
        public String sendNeedAck() {
            rabbitmqService.sendNeedAck();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendNeedAck();

    RabbitmqService.java

    @Override
        public void sendNeedAck() {
            rabbitTemplate.convertAndSend("exchange_ack","topic.ack","测试 消费者ack机制");
        }

    ConfirmAckListener.java

    @Component
    @Slf4j
    public class ConfirmAckListener {
    
    
        @RabbitListener(queues = "queue_ack")
        public void receiveMessage(Object obj, Channel channel, Message message) {
            // 包含所有的信息
            try {
                log.info("接收到消息:{}", obj);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                // ack 不确认 requeue 是否重新放入队列 multiple 是否批量处理
    //            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                //nack 与 reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
    //            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    测试消费者ack接收以及rejecthttp://127.0.0.1:8080/sendNeedAck

    控制台打印
    ack 确认

    image.png

    ack 不确认 及其 reject

    image.png

    nack 与 reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。所以控制台中会一直打印

    return机制

    RabbitmqConfig.java

    // 测试return机制
        @Bean
        public Queue queueReturn() {
            return new Queue("queue_return");
        }
    
        @Bean
        public TopicExchange exchangeReturn() {
            return new TopicExchange("exchange_return");
        }
    
        @Bean
        public Binding bindingReturn() {
            return BindingBuilder.bind(queueReturn()).to(exchangeReturn()).with("return.*");
        }

    RabbitmqController.java

    /**
         * 返回机制
         *
         * @return success
         */
        @RequestMapping("/sendReturn")
        public String sendReturn() {
            rabbitmqService.sendReturn();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendReturn();

    RabbitmqService.java

    // 测试return机制
        @Override
        public void sendReturn() {
            rabbitTemplate.setReturnCallback(returnCallback);
    //        rabbitTemplate.convertAndSend("exchange_return", "return.km.km", "测试 return 机制");
            rabbitTemplate.convertAndSend("exchange_return", "return.km", "测试 return 机制");
        }

    ReturnReceiveListener.java

    @Component
    @Slf4j
    public class ReturnReceiveListener {
        @RabbitListener(queues = "queue_return")
        public void receiveMsg(String msg) {
            log.info("接收的消息为:{}" , msg);
        }
    }

    测试return方式
    sendReturnhttp://127.0.0.1:8080/sendReturn

    控制台打印
    正确使用路由

    image.png

    错误使用路由

    image.png

    死信队列/延时队列

    死信队列到期后,会将信息转到另外一个普通队列接收
    RabbitmqConfig.java

    // TTL 队列
        @Bean
        public Queue queueTTL() {
            Map<String, Object> map = new HashMap<>(1);
            map.put("x-message-ttl", 10000);
            return new Queue("queue_ttl", true, false, false, map);
        }
    
        // 产生死信的队列
        @Bean
        public Queue queueDLX() {
            Map<String, Object> map = new HashMap<>(4);
            // 5秒后,消息自动变为死信
            map.put("x-message-ttl", 5000);
            map.put("x-dead-letter-exchange", "exchange_receive");
            map.put("x-dead-letter-routing-key", "receive_key");
            return new Queue("queue_dlx", true, false, false, map);
        }
    
        // 死信交换机
        @Bean
        public DirectExchange exchangeDLX() {
            return new DirectExchange("exchange_dlx");
        }
    
        // 给死信队列绑定交换机
        @Bean
        public Binding bindingDLX() {
            return BindingBuilder.bind(queueDLX()).to(exchangeDLX()).with("receive_key");
        }
    
        // 死信接收交换机
        @Bean
        public DirectExchange exchangeReceive() {
            return new DirectExchange("exchange_receive");
        }
    
        // 接收死信的队列
        @Bean
        public Queue queueReceive() {
            return new Queue("queue_receive");
        }
    
        // 将交换机与队列绑定
        @Bean
        public Binding bindingReceive() {
            return BindingBuilder.bind(queueReceive()).to(exchangeReceive()).with("receive_key");
        }

    RabbitmqController.java

    /**
         * 测试死信队列
         *
         * @return success
         */
        @RequestMapping("/sendDead")
        public String sendDead() {
            rabbitmqService.sendDead();
            return "发送成功...";
        }

    IRabbitmqService.java

    void sendDead();

    RabbitmqService.java

    @Override
        public void sendDead() {
            rabbitTemplate.convertAndSend("exchange_dlx", "receive_key","测试死信队列");
        }

    DeadReceiveListener.java

    @Component
    @Slf4j
    public class DeadReceiveListener {
        @RabbitListener(queues = "queue_receive")
        public void receiveMsg(String msg) {
            log.info("队列接收到消息:{}" , msg);
        }
    
    }

    测试死信队列http://127.0.0.1:8080/sendDead

    控制台打印

    image.png

    可以看到控制台五秒后打印接收的消息

    博文不易,欢迎点赞关注!❤


    源码地址:rabbitmq: rabbitmq实战 (gitee.com)


    参考文档:

    展开全文
  • import java.util.concurrent.C 【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】 开源分享完整内容戳这里 ountDownLatch; import java.util.concurrent.ExecutorService; import ...
  • RabbitMQ 实战教程

    2021-08-03 19:13:11
    RabbitMQ 实战教程 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,...
  • RabbitMQ
  • RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成∶ 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、...
  • RabbitMQ实战应用技巧

    2020-12-21 19:08:22
    1. RabbitMQ实战应用技巧1.1. 前言由于项目原因,之后会和RabbitMQ比较多的打交道,所以让我们来好好整理下RabbitMQ的应用实战技巧,尽量避免日后的采坑1.2. 概述RabbitMQ有几个重要的概念:虚拟主机,交换机,队列...
  • RabbitMQ简介 什么是消息中间件 消息: 消息队列中间件: 传递模式: 点对点:基于队列 发布/订阅:基于内容节点 消息中间件的...
  • RabbitMQ是Celery广泛使用的消息代理。在本这篇文章中,我将使用RabbitMQ来介绍Celery的基本概念,然后为一个小型演示项目设置Celery 。最后,设置一个Celery Web控制台来监视我的任务基本概念来!看图说话:Broker...
  • RabbitMQ实战指南之消息可靠性

    千次阅读 2021-07-26 14:10:32
    在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们...
  • RabbitMQ实战指南

    2021-04-07 20:56:37
    RabbitMQ实战指南 ISBN: 978-7-121-32991-3 推荐指数: ★★★★★ 作者:朱忠华 阅读时间: 2021-04-07 页数: 335 从入门到深入, 简单易懂又直观, 实例很多,动手性很强, 值得推荐. 从安装,到基本知识讲解, 原理性...
  • RabbitMQ实战视频教程

    2021-03-04 08:05:35
    课程目标本课程将带领大家认识、理解并实战消息中间件RabbitMQ,在学习完本课程后,将能更好的理解消息中间件的...适用人群消息中间件学习者、RabbitMQ实战者以及SpringBoot整合RabbitMQ实战需求者课程简介"RabbitM...
  • RabbitMQ实战教程

    2021-04-06 21:36:51
    RabbitMQ实战教程1.MQ引言1.1.什么是MQ1.2 MQ有哪些1.3 不同MQ特点2 RabbitMQ 的引言2.1 RabbitMQ2.2 RabbitMQ 的安装2.2.1 下载2.2.2 下载安装包2.2.3 安装步骤3 RabiitMQ 配置3.1 RabbitMQ 管理命令行3.2 web管理...
  •   RabbitMQ java 客户端使用com.rabbitmq.client作为顶级包名,关键是 Class和Interface有...RabbitMQ相关的开发工作,基本是是围绕Connection和Channel这两个类展开的下面主要是连接,交换器,队列的创建与绑定
  • RabbitMQ 实战

    千次阅读 2021-11-23 09:55:28
    1:安装 RabbitMQ 这里 我会先同时安装三台机器,为以后的高可用集群做准备 1.1:安装RabbitMQ 的依赖环境 安装常用的环境和工具包 yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto ...
  • 4.2Multicast 4.3Redis 4.4Simple 三、 zookeeper ...=================================================================== ...Dubbo 是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC ...
  • 如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。 可以通过在声明交换器(调用channel.exchangeDeclare...
  • websocket+rabbitmq实战

    2021-02-28 10:12:14
    1. websocket+rabbitmq实战1.1. 前言接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息1.2. 遇坑基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现...
  • RabbitMQ实战 高效部署分布式消息队列》 本书是为那些熟悉Python、PHP、Java、.NET或者任何其他现代编程语言的开发者编写的。无须任何RabbitMQ经验。 本书对RabbitMQ做了全面、翔实的讲解,体现了两位专家的...
  • 一、实战前言 RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、...
  • 6-RabbitMQ实战

    2021-09-21 16:23:08
    文章目录6-RabbitMQ实战-1-RabbitMQ 简介和环境搭建什么是消息队列?为什么要使用消息队列?**1.1.搜索与商品服务的问题**1.2.消息队列(MQ)1.2.1.什么是消息队列1.2.2.AMQP和JMS1.2.3.常见MQ产品1.2.4.RabbitMQ2 ...
  • 设置配置文件 cd /etc/rabbitmq cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/ mv rabbitmq.config.example rabbitmq.config 3.3.7.开启用户远程访问 vi /etc/rabbitmq/rabbitmq...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 19,480
精华内容 7,792
关键字:

rabbitmq实战