精华内容
下载资源
问答
  • rabbitmq实战项目
    2022-09-01 15:40:32

    RabbitMQ简介

    RabbitMQ 中文官方文档 (RabbitMQ 中文官方文档)

    1. 消息队列是应用程序和应用程序之间的一种通信方法。

    2. RabbitMQ : erlang语言开发、 基于AMQP协议。

    3. 同类产品:ActiveMQ、 ZeroMQ、 RabbitMQ、 RocketMQ、 Kafka。

    4. 六种模式: 简单模式、 工作模式、 发布与订阅模式、 路由模式、通配符模式、 远程调用模式(基本不会用到)

    5. 关键词:{Broker: 服务器实体、 Exchange :消息交换机、 Queue: 消息队列载体、Binding: 绑定 、Routing Key: 路由关键字、 VHost: 虚拟主机、Producer: 消息生产者 、 Consumer: 消息消费者、Channel: 消息通道 }

    6. 关键概念:由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

     RabbitMQ五大模式实战

    基于SpringBoot开发的RabbitMQ应用程序,利用SpringBoot的自动配置和起步依赖方便更快的构建项目。

    环境准备

    1. 准备一台RabbitMQ服务器 链接: RabbitMQ安装提取码: qhxi 
    2. 本次使用的是SpringBoot项目
    3. pom依赖
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.3.4.RELEASE</version>
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
      <dependencies>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-configuration-processor</artifactId>
              <optional>true</optional>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
              <exclusions>
                  <exclusion>
                      <groupId>org.junit.vintage</groupId>
                      <artifactId>junit-vintage-engine</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>
          <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
          </dependency>
      </dependencies>
      <build>
          <plugins>
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
          </plugins>
      </build>
      

    4. 配置application.yml文件
      spring:
        rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest
      server:
        port: 8082
      

    5. 启动类和目录结构是SpringBoo正常设置,这里不再赘述。

    注意:启动类名设置为RabbitmqProducerApplication

    • 简单模式
    1. 简单模式配置文件
      @Configuration
      public class RabbitSimpleConfig {
          @Bean
          public Queue simpleQueue(){
              return new Queue("simpleQueue");
          }
      }
      

    2. 简单模式生产者
      @SpringBootTest(classes = RabbitmqProducerApplication.class)
      public class ProducerTest {
          @Autowired
          RabbitTemplate rabbitTemplate;	
          @Test
          public void simpleProduct(){
              for (int num = 0; num < 20; num++) {
                  rabbitTemplate.convertAndSend("simpleQueue", "简单模式"+num);
              }
          }
      }
      

    3. 简单模式消费者
      @Component
      public class MessageListener {
          @RabbitListener(queues = "simpleQueue")
          public void simpleListener(String message){
              System.out.println("简单模式监听器:"+message);
          }	
      }
      

    • 工作模式
    1. 工作模式配置文件
      @Bean
       public Queue workQueue(){
           return new Queue("workQueue");
       }
      

    2. 工作模式生产者
      @Test
      public void workProduct(){
          for (int num = 0; num < 20; num++) {
              rabbitTemplate.convertAndSend("workQueue", "工作模式"+num);
          }
      }
      

    3. 工作模式消费者
       @RabbitListener(queues = "workQueue")
       public void workListener1(String message) {
           System.out.println("工作模式监听器1:" + message);
       }
      
       @RabbitListener(queues = "workQueue")
       public void workListener2(String message) {
           System.out.println("工作模式监听器2:" + message);
       }
      

    • 发布订阅模式
    • 发布订阅模式配置文件
      //配置交换器
      @Bean
      public FanoutExchange fanoutExchange() {
          return new FanoutExchange("fanoutExchange");
      }
      //配置队列
      @Bean
      public Queue fanoutQueue1() {
          return new Queue("fanoutQueue1", true, false, false, null);
      }
      
      @Bean
      public Queue fanoutQueue2() {
          return new Queue("fanoutQueue2", true, false, false, null);
      }
      //配置绑定
      @Bean
      public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
              return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
      }
      
      @Bean
      public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
          return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
      }
      

    • 发布订阅模式生产者
      @Test
      public void FanoutProduct(){
          for (int num = 0; num < 10; num++) {
              rabbitTemplate.convertAndSend("fanoutExchange","","发布订阅模式"+num);
          }
      }
      

    • 发布订阅模式消费者
      @RabbitListener(queues = "fanoutQueue1")
      public void fanoutListener1(String message) {
          System.out.println("发布订阅监听器1:" + message);
      }
      
      @RabbitListener(queues = "fanoutQueue2")
      public void fanoutListener2(String message) {
          System.out.println("发布订阅监听器2:" + message);
      }
      

    • 路由模式
    1. 路由模式配置文件
      //配置交换机
      @Bean
      public DirectExchange directExchange() {
          return new DirectExchange("directExchange");
      }
      
      //配置队列
      @Bean
      public Queue directQueue1() {
          return new Queue("directQueue1", true, false, false, null);
      }
      
      @Bean
      public Queue directQueue2() {
          return new Queue("directQueue2", true, false, false, null);
      }
      //配置绑定
      @Bean
      public Binding directBinding1(Queue directQueue1, DirectExchange directExchange) {
          return BindingBuilder.bind(directQueue1).to(directExchange).with("one");
      }
      
      @Bean
      public Binding directBinding2(Queue directQueue2, DirectExchange directExchange) {
          return BindingBuilder.bind(directQueue2).to(directExchange).with("two");
      }
      

    2. 路由模式生产者
      @Test
      public void directProduct1() {
          for (int num = 0; num < 5; num++) {
              rabbitTemplate.convertAndSend("directExchange","one", "发送到路由队列1消息"+num);
          }
      }
      @Test
      public void directProduct2() {
          for (int num = 0; num < 5; num++) {
              rabbitTemplate.convertAndSend("directExchange","two", "发送到路由队列2消息"+num);
          }
      }
      

    3. 路由模式消费者
      @RabbitListener(queues = "directQueue1")
      public void fanoutListener1(String message) {
          System.out.println("路由模式监听器1:" + message);
      }
      
      @RabbitListener(queues = "directQueue2")
      public void fanoutListener2(String message) {
          System.out.println("路由模式监听器2:" + message);
      }
      

    • 通配符模式
    1. 通配符模式配置文件
      //配置队列
      @Bean
      public Queue topicQueue1() {
         return new Queue("topicQueue1");
      }
      
      @Bean
      public Queue topicQueue2() {
         return new Queue("topicQueue2");
      }
      //配置交换器
      @Bean
      public TopicExchange topicExchange() {
         return new TopicExchange("topicExchange");
      }
      //配置绑定
      @Bean
      public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {
         return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*");
      }
      
      @Bean
      public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
         return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");
      }
      

    2. 通配符模式生产者
      /*
       * 通配符模式测试
       * */
      @Test
      public void topicProduct() {
          rabbitTemplate.convertAndSend("topicExchange","topic.one", "routkey为topic.one的消息");
          rabbitTemplate.convertAndSend("topicExchange","topic.one.two", "routkey为topic.one.two的消息");
      }
      

    3. 通配符模式消费者
      @RabbitListener(queues = "topicQueue1")
      public void fanoutListener1(String message) {
          System.out.println("通配符监听器1:" + message);
      }
      
      @RabbitListener(queues = "topicQueue2")
      public void fanoutListener2(String message) {
          System.out.println("通配符监听器2:" + message);
      }
      

      总结

    以上就是SpringBoot+RabbitMQ五大模式的简单使用实例,到目前为止RabbitMQ也是Sping AMQP的唯一实现。感谢支持,你的支持是我前进的动力!!!

    更多相关内容
  • rabbitMQ实战

    2018-10-10 14:56:44
    这是牧码人王老师讲的,里面的rabbitMQ下载、安装、六种消息类型都很全,代码我也都试过了,直接复制粘贴就可以使用,希望可以帮助到大家
  • rabbitmq实战项目

    2021-10-08 10:15:44
    rabbitmq

        刚学完rabbitmq不久,但对项目中如何对rabbitmq进行使用了解的不是很清楚,所有从网上找了项目熟悉一下rabbitmq的使用。rabbitmq和springboot的整合就不再详细介绍了。

    1、在pom.xml中导入依赖

     <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>3.4.1</version>
     </dependency>

    1、先编写一个消息发送的实体类。

     2、在配置类中定义队列、交换机以及将它们绑定。

    @Configuration
    public class RabbitmqTopicConfig {
    
        private static final String queueName="limitQueue";
        private static final String excahngeName="limitExchange";
    
        @Bean
        public Queue queue(){
            return new Queue(queueName);
        }
    
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange(excahngeName);
        }
    
        @Bean
        public Binding binding(){
            return BindingBuilder.bind(queue()).to(exchange()).with("limit");
        }
    }

     3、新建消息发送类。

     4、发送消息(先注入消息发送类)

    5、新建消息处理类(对消息进行处理)

    展开全文
  • RabbitMQ实战视频.zip

    2020-03-19 10:23:02
    特别是在第二阶段的学习,将采用SpringBoot整合RabbitMQ的方式来实战实际的业务场景,并用来解决微服务项目以及分布式系统中常见的典型问题,比如服务模块异步解耦、延迟监听消费处理、秒杀系统高并发方案的解决、限...
  • springboot与rabbitmq结合的实战、实例项目,有助于帮助你了解springboot中怎么使用rabbitmq。 获取资源:关注我!给我留言或者私信发邮箱~
  • rabbitmq项目实战

    千次阅读 2021-02-18 10:26:35
    很久之前就想写了,消息队列不管是在分布式项目,还是单体项目中,都是使用率很高的消息中间件,先简单介绍一下市面上常用到的消息队列,然后说一下rabbitmq的使用 消息队列中间件是分布式系统中重要的组件,主要...

    很久之前就想写了,消息队列不管是在分布式项目,还是单体项目中,都是使用率很高的消息中间件,先简单介绍一下市面上常用到的消息队列,然后说一下rabbitmq的使用
    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题
    常用的有:RabbitMQ,ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ,每一种mq都有自己的使用场景,一般都是在web应用或者安卓应用使用

    rabbitmq使用 框架使用spring cloud

    1 创建生产者并导入依赖

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    package com.pactera.business.rabbit;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.io.UnsupportedEncodingException;
    
    
    @Configuration
    public class RabbitConfig {
        /**
         * 所有的消息发送都会转换成JSON格式发到交换机
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    
            // 消息是否必须路由到一个队列中,配合Return使用
            rabbitTemplate.setMandatory(true);
    
            // 消息返回, yml需要配置 publisher-returns: true
            // 为RabbitTemplate设置ReturnCallback
            //主要是两个回调,一个是confirm回调,一个是return回调,这两个有什么不同呢?
            //经检验得知,如果推送去一个不存在交换机上,那么就会触发confirm回调;如果推送去一个存在的交换机,但对应的路由键(或者说队列)不存在,则会触发return回调。
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText,
                                            String exchange, String routingKey) {
                    try {
                        System.out.println("--------收到无法路由回发的消息--------");
                        System.out.println("ReturnCallback:     " + "消息:" + message);
                        System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
                        System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
                        System.out.println("ReturnCallback:     " + "交换机:" + exchange);
                        System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
                        System.out.println("properties:" + message.getMessageProperties());
                        System.out.println("body:" + new String(message.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // Confirm异步确认,收到服务端的ACK以后会调用
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("--------收到服务端异步确认--------");
                    System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
                    System.out.println("ConfirmCallback:     " + "确认情况:" + ack);
                    System.out.println("ConfirmCallback:     " + "原因:" + cause);
                }
            });
            return rabbitTemplate;
        }
    
    
    }
    
    
    
    package com.pactera.business.rabbit;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    @ConfigurationProperties(prefix = "mq")
    public class RabbitSender {
    
        @Value("${mq.directexchange}")
        private String directExchange;
    
    
        // 自定义的模板,所有的消息都会转换成JSON发送
        @Autowired
        AmqpTemplate amqpTemplate;
    
    
        public void send(Map<String, Object> map, String directRoutingKey) {
            // 发送JSON字符串
            ObjectMapper mapper = new ObjectMapper();
            String sycMsg = null;
            try {
                sycMsg = mapper.writeValueAsString(map);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            System.out.println(sycMsg);
            amqpTemplate.convertAndSend(directExchange, directRoutingKey, sycMsg);
        }
    
    
    }
    
    

    调用生产者消息队列产生一条队列发送给消费者

        rabbitSender.send(paramMap, app.getAppKey());
    

    2 配置rabbitmq文件

    //application.yml 配置

    mq:
      directexchange: PARK_CLOUD_TO_APP_EXCHANGE
      spacequeue: PARK_SPACE_QUEUE
      directroutingkey: park-space
    

    //application-dev.yml

    // 生产者
      rabbitmq:
        username: admin
        password: admin
        host: 192.168.9.58
        port: 5672
        virtual-host: park-dev
    // 消费者
     rabbitmq:
        username: admin
        password: admin
        host: 192.168.9.58
        port: 5672
        virtual-host: park-dev
        listener:
          simple:
            # 消费端最小并发数
            concurrency: 1
            # 消费端最大并发数
            max-concurrency: 5
            # 一次处理的消息数量
            prefetch: 2
            # 手动应答
            acknowledge-mode: manual
            retry:
              enabled: true
              max-attempts: 3
              initial-interval: 5000
          direct:
            acknowledge-mode: manual
    

    3 创建消费者

    package com.pactera.business.mq;
    
    import com.pactera.business.task.DataSynTask;
    import com.pactera.utlis.JsonUtils;
    import com.pactera.utlis.LoggerHelper;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.Map;
    
    @Component
    @PropertySource("classpath:application.yml")
    public class FirstConsumer {
        @Autowired
        private DataSynTask dataSynTask;
    
        @RabbitHandler
        @RabbitListener(queues = "${mq.energyqueue}", containerFactory =
                "rabbitListenerContainerFactory")
        public void process(String sycMsg, Channel channel, Message message) throws IOException {
            System.out.println("energy Queue received msg : " + sycMsg);
            try {
                Map<String, Object> paramMap = JsonUtils.JsonToMap2(sycMsg);
                // 消费生产者队列里的消息信息
                dataSynTask.sycUserOrg(paramMap, 2);
                System.out.println("energy Queue 消费消息--成功 : " + sycMsg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                if (message.getMessageProperties().getRedelivered()) {
                    LoggerHelper.info(this.getClass(), "消息已重复处理失败,拒绝再次接收...");
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
                } else {
                    LoggerHelper.info(this.getClass(), "消息即将再次返回队列处理...");
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            }
        }
    
    }
    
    
    package com.pactera.business.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    
    import java.util.HashMap;
    
    @Configuration
    @PropertySource("classpath:application.yml")
    public class RabbitConfig {
    
        @Value("${mq.energyqueue}")
        private String energyqueue;
    
        @Value("${mq.directexchange}")
        private String directExchange;
    
        @Value("${mq.directroutingkey}")
        private String directroutingkey;
    
        // 创建队列
        @Bean("energyqueue")
        public Queue getFirstQueue() {
            //队列持久化
            return new Queue(energyqueue, true);
        }
    
    
        // 创建交换机
        @Bean("directExchange")
        public DirectExchange getDirectExchange() {
            //交换机持久化
            return new DirectExchange(directExchange, true, false, new HashMap<>());
        }
    
        // 定义绑定关系
        @Bean
        public Binding bindFirst(@Qualifier("energyqueue") Queue queue, @Qualifier(
                "directExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(directroutingkey);
        }
    
    
        /**
         * 在消费端转换JSON消息
         * 监听类都要加上containerFactory属性
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            factory.setAutoStartup(true);
            return factory;
        }
    }
    
    

    关键词:
    Channel:是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

    Channel:的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

    Publisher:向交换器发布消息的客户端应用程序,即消息的生产者

    Consumer:从消息队列中取得消息的客户端应用程序,即消息的消费者

    Queue(队列)是RabbitMQ的内部对象,用于存储消息,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。(另外需要注意的是 生产者不是跟队列直接相连的,中间还有个exchange,这里省略了)

    routing key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。(Exchange type 就是 exchange的路由类型, binding key就是exchange与queue绑定时的一个key,相当于给这个绑定取个名字.这个名字是跟 routing key 相同或者包含的)

    Binding:RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

    Binding key:在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

    Exchange:用来接收生产者发送的消息并将这些消息路由给服务器中的队列。(根据Exchange Type来决定 怎么分发到不同的Queue上)

    Exchange Types:RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。另模式其实还有更多,但是常用的就这几种

    fanout(广播模式):fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。即每个Queue都会收到相同的消息.

    direct(完全匹配模式):direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

    RPC:MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

    RabbitMQ中实现RPC的机制是:
    1:客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
    2:服务器端收到消息并处理
    3:服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
    4:客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

    展开全文
  • RabbitMq项目实战 实战一:使用延迟队列和备份队列实现考试信息定时存储 需求分析: 在线考试系统: 考生考试——随机生成试卷——考生规定时间内答卷——提交试卷——计算分数——考生每道题目入库 分析: 假如考试...

    RabbitMq项目实战

    实战一:使用延迟队列和备份队列实现考试信息定时存储

    需求分析:

    在线考试系统:

    考生考试——随机生成试卷——考生规定时间内答卷——提交试卷——计算分数——考生每道题目入库

    分析:

    假如考试时间为2个小时,考生生成试卷后,如果遇到掉线或者其他情况未能在规定时间内交卷的,系统默认交卷并计算成绩和题目入库。并通过微信告诉学生,您的试卷已经自动提交。

    image-20210927145007058

    实现方式一(不推荐):通过死信队列实现延迟队列

    image-20210927225316272

    将过期消息入死信队列,消费死信队列。

    创建交换机和队列

    package marchsoft.modules.rabbitmq.examprocess;
    
    import lombok.Getter;
    
    @Getter
    public enum ExamProcessEnum {
        /**
         * 计算考试成绩队列(死信队列)
         */
        QUEUE_EXAM_PROCESS("exam.process.direct", "exam.process.computed", "exam.process.computed"),
        /**
         * 考级结果计算通知ttl队列
         */
        QUEUE_TTL_PROCESS_COMPUTED("exam.process.direct.ttl", "exam.process.computed.ttl", "exam.process.computed.ttl"),
        /**
         * 计算考试成绩队列(备份队列)
         */
        QUEUE_BACKUP_PROCESS("exam.process.direct.backup", "exam.process.computed.backup", ""),
        /**
         * 计算考试成绩队列(报警队列)
         */
        QUEUE_WARN_PROCESS("exam.process.direct.backup", "exam.process.computed.warn", "");
    
    
    
        /**
         * 交换名称
         */
        private String exchange;
        /**
         * 队列名称
         */
        private String name;
        /**
         * 路由键
         */
        private String routeKey;
    
       ExamProcessEnum(String exchange, String name, String routeKey) {
            this.exchange = exchange;
            this.name = name;
            this.routeKey = routeKey;
        }
    }
    
    
    @Configuration
    public class ExamProcessConfig {
    
    
        //死信交换机
        @Bean
        public DirectExchange examProcessDirect() {
            System.out.println("创建交换机");
            return (DirectExchange) ExchangeBuilder
                    .directExchange(ExamProcessEnum.QUEUE_EXAM_PROCESS.getExchange())
                    .durable(true)
                    .build();
        }
        //死信队列
        @Bean
        public Queue examProcessQueue() {
            return new Queue(ExamProcessEnum.QUEUE_EXAM_PROCESS.getName());
        }
    
    
        //死信队列与死信交换机进行绑定
        @Bean
        public Binding bindingExamProcessQueueToExchange(@Qualifier("examProcessQueue") Queue queue, @Qualifier("examProcessDirect") DirectExchange customExchange) {
            return BindingBuilder
                    .bind(queue)
                    .to(customExchange)
                    .with(ExamProcessEnum.QUEUE_EXAM_PROCESS.getRouteKey());
        }
    
    
        //备份交换机
        @Bean
        public FanoutExchange examProcessBackUpFanout() {
            return (FanoutExchange) ExchangeBuilder
                    .fanoutExchange(ExamProcessEnum.QUEUE_BACKUP_PROCESS.getExchange())
                    .durable(true)
                    .build();
        }
    
        //备份队列
        @Bean
        public Queue examBackUpProcessQueue() {
            return new Queue(ExamProcessEnum.QUEUE_BACKUP_PROCESS.getName());
        }
    
    
        //备份队列与备份交换机进行绑定
        @Bean
        public Binding bindingExamProcessBackUpQueueToExchange(@Qualifier("examBackUpProcessQueue") Queue queue, @Qualifier("examProcessBackUpFanout") FanoutExchange fanoutExchange) {
            return BindingBuilder
                    .bind(queue)
                    .to(fanoutExchange);
        }
    
        //警告队列
        @Bean
        public Queue examProcessWarnQueue() {
            return new Queue(ExamProcessEnum.QUEUE_WARN_PROCESS.getName());
        }
    
        //警告队列与备份交换机进行绑定
        @Bean
        public Binding bindingExamProcessWarnQueueToExchange(@Qualifier("examProcessWarnQueue") Queue queue, @Qualifier("examProcessBackUpFanout") FanoutExchange fanoutExchange) {
            return BindingBuilder
                    .bind(queue)
                    .to(fanoutExchange);
        }
    
        //普通队列与普通交换机进行绑定
        @Bean
        public Binding bindingExamProcessTtlQueueToExchange(@Qualifier("examProcessTtlQueue") Queue queue, @Qualifier("bindingExamProcessTtlExchangeToBackUpExchange") DirectExchange directExchange) {
            return BindingBuilder
                    .bind(queue)
                    .to(directExchange)
                    .with(ExamProcessEnum.QUEUE_TTL_PROCESS_COMPUTED.getRouteKey());
        }
    
    
        //普通队列绑定死信交换机
        @Bean
        public Queue examProcessTtlQueue() {
            Map<String, Object> args = new HashMap<>(2);
            //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", ExamProcessEnum.QUEUE_EXAM_PROCESS.getExchange());
            //声明当前队列的死信路由 key
            args.put("x-dead-letter-routing-key", ExamProcessEnum.QUEUE_EXAM_PROCESS.getRouteKey());
            //druable 持久化  后面输入队列的名称
            return QueueBuilder.
                    durable(ExamProcessEnum.QUEUE_TTL_PROCESS_COMPUTED.getName())
                    .withArguments(args)
                    .build();
        }
    
    
        //普通交换机绑定备份交换机
        @Bean
        public DirectExchange bindingExamProcessTtlExchangeToBackUpExchange() {
            //普通交换机绑定备份交换机
            return (DirectExchange) ExchangeBuilder.directExchange(ExamProcessEnum.QUEUE_TTL_PROCESS_COMPUTED.getExchange())
                    .durable(true)
                    .withArgument("alternate-exchange", ExamProcessEnum.QUEUE_BACKUP_PROCESS.getExchange())
                    .build();
        }
    
    
    }
    

    生产者

    @Component
    @Slf4j
    public class ExamProcessSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        public void sendMessage(Long recordId, long delayTimes) {
            //延长五分钟。
            long waitMinute = 1000 * 60 * 5;
            long delayTime = waitMinute + 1000 * delayTimes;
            //给延迟队列发送消息
            rabbitTemplate.convertAndSend(ExamProcessEnum.QUEUE_TTL_PROCESS_COMPUTED.getExchange(), ExamProcessEnum.QUEUE_TTL_PROCESS_COMPUTED.getRouteKey(), recordId, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTime));
                    return message;
                }
            }, new CorrelationData(UUID.randomUUID().toString()));
            log.info("操作人:{},向消息队列中发送了一场考试记录,id为:{},消息过期时间为:{}", SecurityUtils.getCurrentUserId(), recordId, delayTime);
        }
    
    }
    
    
    发送具体业务
            @Override
        @Transactional(rollbackFor = Exception.class)
        public void sureBeginExam(Long recordId) {
            //isSendQueue 如果确定考试,考试信息入队列。
            Map<String, Long> stringLongMap = judgeOperateStatus(recordId);
            Long examTimeLimit = stringLongMap.get("examTimeLimit");
            EExamRecord examRecord = new EExamRecord();
            examRecord.setId(recordId);
            examRecord.setBeginTime(LocalDateTime.now());
            examRecord.setExamStatus(ExamStatusEnum.IN_ANSWER.getCode());
            //清除考试记录缓存
            ExamCacheUtils.clearExamRecordCache(recordId);
            if (eExamRecordMapper.updateById(examRecord) <= 0) {
                BaseUtils.errorLog(ResultEnum.UPDATE_OPERATION_FAIL, "禁止考试,请登录重试", recordId);
            }
            //消息队列发送一条消息
            examProcessSender.sendMessage(recordId, examTimeLimit);
        }
    

    消费者

    @Component
    @Slf4j
    public class ExamProcessCustomer {
    
        @Autowired
        private ExamTaskService examTaskService;
    
    
        //死信队列,存储过期的消息
        @RabbitListener(queues = "exam.process.computed")
        public void handleExamProcessComputed(Long recordId) {
            log.info("进入消费队列, recordId:{}",recordId);
            examTaskService.completeExamByMq(recordId);
            log.info("exam process recordId:{}", recordId);
        }
    
    
        //备份队列
        @RabbitListener(queues = "exam.process.computed.backup")
        public void handleExamBackUpComputed(Long recordId) {
            log.info("进入备份队列, recordId:{}",recordId);
            //todo 问题:当我为消息设置过期时间时,如果消息进入备份队列,会忽略过期时间。
            //examTaskService.completeExamByMq(recordId);
    
            log.info("exam process recordId:{}", recordId);
        }
        //报警队列
        @RabbitListener(queues = "exam.process.computed.warn")
        public void handleExamWarnComputed(Long recordId) {
            log.info("进入报警队列, recordId:{}",recordId);
        }
    }
    
    消费具体业务
         @Override
        public void completeExamByMq(Long recordId) {
            //1.判断考试是否进行算分
            //  1.1 已算分
            //    1.1.1判断考试是否入库   入库:不做任何处理  未入库:入库
            //  1.2 未算分
            //    1.2.1进行算分   并提示用户已进行算分。
            log.info(StrUtil.format("【计算考试成绩(消息)】考试记录id:{}", recordId));
            EExamRecord examRecord = eExamRecordMapper.selectById(recordId);
            if (ObjectUtil.isNull(examRecord)) {
                BaseUtils.errorLog(ResultEnum.UPDATE_OPERATION_FAIL, "数据不存在", recordId);
                return;
            }
            if (examRecord.getExamStatus().equals(ExamStatusEnum.SOCER_ANSWER.getCode())) {
                log.info(StrUtil.format("【成绩已计算,无需重复计算】考试记录id:{}", recordId));
                if (!examRecord.getDetailEnter()) {
                    //考试信息入库
                    this.examDetailToDataBase(examRecord);
                    log.info(StrUtil.format("【考试详情入库(消息)】考试记录id:{}", recordId));
                } else {
                    return;
                }
            } else if (!examRecord.getExamStatus().equals(ExamStatusEnum.DATA_LOSE)) {
                //计算成绩
                this.computedUserExamScore(examRecord);
                //考试信息入库
                this.examDetailToDataBase(examRecord);
            }
        }
    
    

    消息/交换机未接收回调

    注意:因为我在这里配置了备份交换机,当交换机向队列发送消息出现问题时直接走的备份交换机,并没有走回调。但是当消息向交换机传递出问题时,此时无法转备份交换机,会走回调。

    @Component
    @Slf4j
    public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        //将创建的消息接收的回调对象添加到rabbitTemplate中。
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(this);
        }
    
        /**
         * 交换机确定是否收到消息的回调方法
         * 1.发消息  交换机成功接受到了  回调
         * 1.1CorrelationData保存回调消息的ID及相关信息
         * 1.2交换机收到消息   ack:true
         * 1.3cause 失败的原因  cause:null
         * 2.发消息  交换机没有成功接收   回调
         * 2.1CorrelationData保存回调消息的ID及相关信息
         * 2.2交换机收到消息   ack:false
         * 2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println(correlationData);
            String id = correlationData == null ? "" : correlationData.getId();
            if (ack) {
                log.info("交换机已经收到 id 为:{}的消息", id);
            } else {
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
            }
    
        }
    
        //当消息无法路由的时候的回调方法
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String
                exchange, String routingKey) {
            log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
                    String(message.getBody()), exchange, replyText, routingKey);
        }
    }
    

    回调

    @Component
    @Slf4j
    public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        //将创建的消息接收的回调对象添加到rabbitTemplate中。
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(this);
        }
    
        /**
         * 交换机确定是否收到消息的回调方法
         * 1.发消息  交换机成功接受到了  回调
         * 1.1CorrelationData保存回调消息的ID及相关信息
         * 1.2交换机收到消息   ack:true
         * 1.3cause 失败的原因  cause:null
         * 2.发消息  交换机没有成功接收   回调
         * 2.1CorrelationData保存回调消息的ID及相关信息
         * 2.2交换机收到消息   ack:false
         * 2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println(correlationData);
            String id = correlationData == null ? "" : correlationData.getId();
            if (ack) {
                log.info("交换机已经收到 id 为:{}的消息", id);
            } else {
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
            }
    
        }
    
        //当消息无法路由的时候的回调方法
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String
                exchange, String routingKey) {
            log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
                    String(message.getBody()), exchange, replyText, routingKey);
        }
    }
    

    问题:

    使用上述方式,我们实现了所需的需求,但是遇到了两个问题。

    问题1:消息过期时间不同造成消息阻塞

    我们将死信队列当作延迟队列使用,出现的问题是如果消息的过期时间不同时,假如第一条消息10分钟到期,第二条消息2分钟到期,此时会出现第一条消息没有过期而阻塞第二条消息消费的情况。

    如果每一场考试的时间相同还好,但是如果时间不同,那么就会造成消息阻塞的情况,因此使用此种发似实现并不是最优解。

    问题2:带有过期时间的消息入备份队列,过期时间不生效

    rabbitmq为一个普通交换机设置了备份交换机,我向普通交换机发送了一条5分钟后进行消费的消息,但是普通交换机出现问题走了备份交换机,但是备份交换机的消费者直接把信息消费了,并没有等5分钟,导致我这场考试直接被提交了。

    实现方式二(推荐):使用延迟插件

    使用方式一实现时,出现最明显的一个问题就是消息阻塞的问题。因此使用延迟插件,可以很好地解决这个问题。

    image-20210928210611834

    注意:延迟插件的使用需要安装。

    创建交换机和队列

    @Getter
    public enum UnCommitExamEnum {
        /**
         * 计算考试成绩队列(死信队列)
         */
        UNCOMMIT_EXAM_COMPUTED("uncommit.exam.computed.direct", "uncommit.exam.computed", "uncommit.exam.computed");
    
    
        /**
         * 交换名称
         */
        private String exchange;
        /**
         * 队列名称
         */
        private String name;
        /**
         * 路由键
         */
        private String routeKey;
    
        UnCommitExamEnum(String exchange,String name,String routeKey) {
            this.exchange = exchange;
            this.name = name;
            this.routeKey = routeKey;
        }
    }
    
    @Configuration
    public class UnCommitExamConfig {
    
        @Bean
        public CustomExchange examComputedExchange() {
            Map<String, Object> arguments = new HashMap<>();
            //设置自定义交换机的类型。
            arguments.put("x-delayed-type", "direct");
            //1.交换机名称
            //2.交换机的类型
            //3.是否需要持久化
            //4.是否需要自动删除
            //5.其他参数
            return new CustomExchange(UnCommitExamEnum.UNCOMMIT_EXAM_COMPUTED.getExchange(), "x-delayed-message", true, false, arguments);
        }
    
        @Bean
        public Queue examComputedQueue() {
            return QueueBuilder
                    .durable(UnCommitExamEnum.UNCOMMIT_EXAM_COMPUTED.getName())
                    .build();
        }
    
        @Bean
        public Binding bindingExamComputedQueueToExchange() {
            return BindingBuilder
                    .bind(examComputedQueue())
                    .to(examComputedExchange())
                    .with(UnCommitExamEnum.UNCOMMIT_EXAM_COMPUTED.getRouteKey())
                    .noargs();
        }
    
    }
    

    生产者

    @Component
    @Slf4j
    public class UnCommitExamSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        public void sendMessage(Long recordId, long examTime) {
            int oneMinute = 1000 * 60;
            int waitMinute =oneMinute * 5;
            //过期时间 原有时间上推后五分钟。
            long delayTime =(long) 1000 * examTime + waitMinute;
            //延迟插件最大的时间限制,设置时间超过这个时间将没有延迟效果。
            long maxMinute =(long) oneMinute * 60 * 24 * 45;
            if (delayTime > maxMinute) {
                log.info("考试:{},时间较长,不提供消息支持。考试时间:{}", recordId, delayTime);
                return;
            }
            //过期时间
            int examComputedTime = (int)delayTime;
            //给延迟队列发送消息
            rabbitTemplate.convertAndSend(UnCommitExamEnum.UNCOMMIT_EXAM_COMPUTED.getExchange(), UnCommitExamEnum.UNCOMMIT_EXAM_COMPUTED.getRouteKey(), recordId, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setDelay(examComputedTime);
                    return message;
                }
            }, new CorrelationData(UUID.randomUUID().toString()));
            log.info("操作人:{},向消息队列中发送了一场考试记录,id为:{},消息过期时间为:{}", SecurityUtils.getCurrentUserId(), recordId, examComputedTime);
        }
    
    
    }
    
    

    消费者

    @Component
    @Slf4j
    public class UnCommitExamCustomer {
        @Autowired
        private ExamTaskService examTaskService;
    
    
        //死信队列,存储过期的消息
        @RabbitListener(queues = "uncommit.exam.computed")
        public void handleExamProcessComputed(Long recordId) {
            log.info("进入考试消费队列, recordId:{}",recordId);
            examTaskService.completeExamByMq(recordId);
            log.info("exam process recordId:{}", recordId);
        }
    
    }
    

    遗留问题:

    问题1:延迟队列(插件)如何设置备份队列

    问题2:备份队列中消息直接被消费,未在消息过期后消费

    rabbitmq为一个普通交换机设置了备份交换机,我向普通交换机发送了一条5分钟后进行消费的消息,但是普通交换机出现问题走了备份交换机,但是备份交换机的消费者直接把信息消费了,并没有等5分钟

    问题3:为延迟队列(插件)设置未入队回调,出现提示未找到路由key,实际消息到期队列中的消息还是被消费了。

    发送消息时,通过日志发现出发了回调(此处回调和实现方式一的回调一摸一样,监听全局消息入队的回调)

    image-20210928211238087

    到时间后,消息正常被消费。

    image-20210928211334561

    思考:延迟插件之所以实现延迟效果,是因为消息由交换机入队列时,消息不直接进入队列,而是存储在mnesia(一个分布式数据系统)表中,当消息到期时,则自动入队列进行消费。
    ExamByMq(recordId);
    log.info(“exam process recordId:{}”, recordId);
    }

    }

    
    
    
    ### 遗留问题:
    
    #### 问题1:延迟队列(插件)如何设置备份队列
    
    #### 问题2:备份队列中消息直接被消费,未在消息过期后消费
    
    rabbitmq为一个普通交换机设置了备份交换机,我向普通交换机发送了一条5分钟后进行消费的消息,但是普通交换机出现问题走了备份交换机,但是备份交换机的消费者直接把信息消费了,并没有等5分钟
    
    #### 问题3:为延迟队列(插件)设置未入队回调,出现提示未找到路由key,实际消息到期队列中的消息还是被消费了。
    
    发送消息时,通过日志发现出发了回调(此处回调和实现方式一的回调一摸一样,监听全局消息入队的回调)
    
    [外链图片转存中...(img-XlcNf06U-1632990667944)]
    
    到时间后,消息正常被消费。
    
    [外链图片转存中...(img-Jwn5ozQA-1632990667947)]
    
    思考:延迟插件之所以实现延迟效果,是因为消息由交换机入队列时,消息不直接进入队列,而是存储在`mnesia`(一个分布式数据系统)表中,当消息到期时,则自动入队列进行消费。
    
    展开全文
  • 1. 用到技术:springboot+mybatis+shiro+rabbitmq+redis项目实战, 2. mybatisplus根据表自动生成dao,service,controler层的增删改成方法,极大提升开发效率 3. 包含功能:登录,用户管理,角色管理,菜单管理。用到...
  • RabbitMQ实战

    2021-07-10 23:28:54
    导入my-rabbitmq项目 项目下载地址: https://download.csdn.net/download/zpcandzhj/10585077 5.2.简单队列 5.2.1.图示 P:消息的生产者 C:消息的消费者 红色:队列 生产者将消息发送到队列,消费者从队列中获取...
  • RabbitMQ 实战

    千次阅读 2021-11-23 09:55:28
    1:安装 RabbitMQ 这里 我会先同时安装三台机器,为以后的高可用集群做准备 1.1:安装RabbitMQ 的依赖环境 安装常用的环境和工具包 yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto ...
  • 近期在做一个电商秒杀项目,在服务优化的阶段用到了 RabbitMQ 这个消息中间件,让秒杀请求不再瞬时冲击秒杀接口,而是利用消息中间件来让请求如队列般排队而来。 2 具体步骤 下面是最基础的一种 SpringBoot 集成 ...
  • springboot 集成 rabbitmq 实战应用
  • RabbitMQ实战经验分享

    2020-12-30 20:51:32
    下面分享下RabbitMQ实战经验,希望对大家有所帮助:一、生产消息关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。先上发消息的代码private bool MarkErrorSend(string[] lstMsg){try...
  • RabbitMQ 整合 PD 商城实战项目流程总结 一、订单流量削峰(解耦) 简单模式,若多添加几个消费者则用工作模式。 导入商城项目 将 step5 课前资料里面 /elasticsearch/pd商城.zip 解压。 第二层目录下 pd-web ...
  • 主要为大家详细介绍了C#操作RabbitMQ的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • RabbitMQ,代码示例,项目讲解
  • RabbitMQ实战应用技巧

    2020-12-21 19:08:22
    1. RabbitMQ实战应用技巧1.1. 前言由于项目原因,之后会和RabbitMQ比较多的打交道,所以让我们来好好整理下RabbitMQ的应用实战技巧,尽量避免日后的采坑1.2. 概述RabbitMQ有几个重要的概念:虚拟主机,交换机,队列...
  • RabbitMQ 实战教程

    2020-11-05 17:37:57
    RabbitMQ 实战教程 RabbitMQ(Rabbit Message queue)=消息中间件,是一种理念(规范),RabbitMQ是其中的一个落地产品 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,...
  • RabbitMQ详细实战教程

    2022-03-22 14:24:01
    RabbitMQ详细实战教程 CSDN创作 C位 CSDN创作 C位
  • RabbitMQ实战教程

    万人学习 2018-09-02 16:24:33
    本课程适用于RabbitMQ实战者以及SpringBoot整合RabbitMQ实战需求者。课程将首先带领大家拜读RabbitMQ官网的技术开发手册,之后会将学到的知识深入的实战到实际的各种应用场景中,从而加深知识点的理解, 其中包括...
  • 今天阿笨给大家分享的是通过RabbitMQ的优先级消息队列特性来解决我们业务中需要优先处理的任务。 1.1、本次分享课程适合人群如下: 1、有一定的NET开发基础并对RabbitMQ技术有一定了解和认识。 ...
  • 在本这篇文章中,我将使用RabbitMQ来介绍Celery的基本概念,然后为一个小型演示项目设置Celery 。最后,设置一个Celery Web控制台来监视我的任务 基本概念 来!看图说话: Broker Broker(RabbitMQ)负责创建任务...
  • RabbitMQ 实战指南》第一章 RabbitMQ 简介
  • SpringBoot RabbitMQ 实战

    2021-04-12 14:40:52
    RabbitMQ 七种队列模式 一、简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。 二、工作队列模式(Work queues) 在多个消费者之间分配任务...
  • RabbitMQ 实战教程 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,710
精华内容 5,084
关键字:

rabbitmq实战项目