精华内容
下载资源
问答
  • rabbitmq实战
    2022-06-22 23:48:41

    springboot 集成 rabbitmq 实战应用今天分享:分为生产者和消费者两个角色

    一、生产者

    1、pom文件引入

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2、yml配置文件添加

    spring:
      rabbitmq:
        host: 194.11.33.120
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        publisher-returns: true
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual

    3、对象的初始化

    
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.List;
    
    /**
     *类说明:消息队列配置
     */
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        public final static String EXCHANGE_LOG = "order.log.producer.reply";
        public final static String KEY_LOG = "order.log.reply";
    
        @Value("${spring.rabbitmq.host}")
        private String addresses;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
    
        @Value("${spring.rabbitmq.publisher-returns}")
        private boolean publisherReturns;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
    //        connectionFactory.setPublisherConfirms(publisherConfirms);
    //        connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory, List<SimpleMessageListenerContainer> list) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
    //抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低,消息堵塞
            factory.setPrefetchCount(10);
    
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    /*        factory.setDefaultRequeueRejected(false);
            factory.setAdviceChain(
                    RetryInterceptorBuilder
                            .stateless()
                            .recoverer(new RejectAndDontRequeueRecoverer())
                            .retryOperations(retryTemplate())
                            .build()
            );*/
            return factory;
        }
    
        @Bean
        public Queue queuelogMessage() {
            return new Queue("order.log.queue.reply");
        }
    
        @Bean
        public DirectExchange exchange() {
            return new DirectExchange(EXCHANGE_LOG);
        }
    
        @Bean
        public Binding bindingLogExchangeMessage() {
            return BindingBuilder
                    .bind(queuelogMessage())
                    .to(exchange())
                    .with(KEY_LOG);
        }
    }
    

    4、发送信息的工具类

     
    
    import cn.nan.config.RabbitConfig;
    import cn.nan.mall.model.MessageLog;
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 类说明:
     */
    @Slf4j
    @Component
    public class OrderLogSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String actionId) {
            log.info("TopicSender send the 1st : " + actionId);
            this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_LOG, RabbitConfig.KEY_LOG, actionId);
        }
    
        public void send(MessageLog messageLog, CorrelationData correlationData) {
            String msg1 = JSON.toJSONString(messageLog);
            log.info("TopicSender send the 1st : " + msg1);
            this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_LOG, RabbitConfig.KEY_LOG, msg1,correlationData);
        }
    }
    

    二、消费者

    1、pom和yml配置同生产者一致

    2、对象实例化:里面有消费者的配置,同时也有生产者的配置

    import cn.nan.mall.model.MessageLog;
    import cn.nan.mall.service.MessageLogService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.List;
    
    /**
     *类说明:消息队列配置
     */
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        public final static String EXCHANGE_LOG = "order.log.producer";
        public final static String KEY_LOG = "order.log";
    
        @Value("${spring.rabbitmq.host}")
        private String addresses;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
    
        @Value("${spring.rabbitmq.publisher-returns}")
        private boolean publisherReturns;
    
        @Autowired
        private MessageLogService messageLogService;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
            connectionFactory.setPublisherConfirms(publisherConfirms);
            connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate newRabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息成功发送到Exchange,messageId:" + correlationData.getId());
                    //修改日志表状态,状态改成 1,投递成功且未确认
                    if(updateMessageLog(1,Long.valueOf(correlationData.getId())) == 1) {
                        log.info("------modify status 1 ok--------");
                    }
                } else {
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
            });
            template.setMandatory(true);
            template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            });
            //不使用临时队列
    //        template.setUseTemporaryReplyQueues(false);
    //        template.setReplyAddress("amq.rabbitmq.reply-to");
    //        template.setUserCorrelationId(true);
    //        template.setReplyTimeout(10000);
            return template;
        }
    
        private int updateMessageLog(int status,Long messageId) {
            MessageLog messageLog = new MessageLog();
            messageLog.setMessageId(messageId);
            messageLog.setStatus(status);
            messageLog.setTryCount(1);
            return messageLogService.updateMessageLog(messageLog);
        }
    
    
        @Bean
        public Queue queuelogMessage() {
            return new Queue("order.log.queue");
        }
    
        @Bean
        public DirectExchange exchange() {
            return new DirectExchange(EXCHANGE_LOG);
        }
    
        @Bean
        public Binding bindingLogExchangeMessage() {
            return BindingBuilder
                    .bind(queuelogMessage())
                    .to(exchange())
                    .with(KEY_LOG);
        }
    
        /**
         * 消费者配置
         * @param connectionFactory
         * @param list
         * @return
         */
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory, List<SimpleMessageListenerContainer> list) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
    //抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低,消息堵塞
            factory.setPrefetchCount(10);
    
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    /*        factory.setDefaultRequeueRejected(false);
            factory.setAdviceChain(
                    RetryInterceptorBuilder
                            .stateless()
                            .recoverer(new RejectAndDontRequeueRecoverer())
                            .retryOperations(retryTemplate())
                            .build()
            );*/
            return factory;
        }
    
        //===============生产者发送确认==========
        @Bean
        public RabbitTemplate.ConfirmCallback confirmCallback(){
            return new RabbitTemplate.ConfirmCallback(){
    
                @Override
                public void confirm(CorrelationData correlationData,
                                    boolean ack, String cause) {
                    if (ack) {
                        log.info("发送者确认发送给mq成功");
                    } else {
                        //处理失败的消息
                        log.info("发送者发送给mq失败,考虑重发:"+cause);
                    }
                }
            };
        }
    
        @Bean
        public RabbitTemplate.ReturnCallback returnCallback(){
            return new RabbitTemplate.ReturnCallback(){
    
                @Override
                public void returnedMessage(Message message,
                                            int replyCode,
                                            String replyText,
                                            String exchange,
                                            String routingKey) {
                    log.info("无法路由的消息,需要考虑另外处理。");
                    log.info("Returned replyText:"+replyText);
                    log.info("Returned exchange:"+exchange);
                    log.info("Returned routingKey:"+routingKey);
                    String msgJson  = new String(message.getBody());
                    log.info("Returned Message:"+msgJson);
                }
            };
        }
    }
    

    3、监听消费

     
    
    import cn.nan.mall.model.MessageLog;
    import cn.nan.mall.service.MessageLogService;
    import com.alibaba.fastjson.JSON;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * 类说明:
     */
    @Slf4j
    @Component
    public class OrderLogReceiver {
    
        @Autowired
        private MessageLogService messageLogService;
    
        @RabbitListener(queues = "order.log.queue.reply")
        @RabbitHandler // 此注解加上之后可以接受对象型消息
        public void onMessage(Message message, Channel channel,@Headers Map<String, Object> headers) throws Exception {
            try {
                String msg = new String(message.getBody());
                log.info("OrderLogReceiver>>>>>>>message received:" + msg);
                MessageLog messageLog = JSON.parseObject(msg, MessageLog.class);
                try {
                    messageLog.setStatus(9);
                    messageLog.setTryCount(null);
                    messageLogService.updateMessageLog(messageLog);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条
                } catch (Exception e) {
                    e.printStackTrace();
                    log.info(e.getMessage());
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
                    log.info("OrderLogReceiver>>>>>>message nack");
                    throw e;
                }
            } catch (Exception e) {
                log.info(e.getMessage());
            }
        }
    }
    
    

        deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,Rab bitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 
          它代表了 RabbitMQ 向该 Channel   投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
       multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 

    到此、springboot 集成 rabbitmq 实战应用分享完毕,下篇会分享一些细节,顺序、重复消费、消息丢失等等,敬请期待!

    更多相关内容
  • RabbitMQ 实战

    千次阅读 2021-11-23 09:55:28
    1:安装 RabbitMQ 这里 我会先同时安装三台机器,为以后的高可用集群做准备 1.1:安装RabbitMQ 的依赖环境 安装常用的环境和工具包 yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto ...

    1:安装 RabbitMQ 

    这里 我会先同时安装三台机器,为以后的高可用集群做准备

    注意在进行以下操作之前可以先关闭防火墙 或者 开放防火墙端口

     开放防火墙端口
    //永久的添加该端口。去掉--permanent则表示临时。
    firewall-cmd --permanent --zone=public --add-port=5672/tcp
    firewall-cmd --permanent --zone=public --add-port=15672/tcp
    //重新加载配置,使得修改有效。
    firewall-cmd --reload 
    //查看开启的端口,出现5672/15672这开启正确
    firewall-cmd --permanent --zone=public --list-ports 

    (推荐使用)

    永久关闭防火墙

    首先查看防火墙的状态
    systemctl status firewalld.service

    然后执行命令进行关闭
    systemctl stop firewalld service      临时关闭防火墙

    systemctl disable firewalld.service   开机禁止防火墙服务器  永久关闭
    systemctl enable firewalld.service  开机启动防火墙服务器

    (自己学习的时候使用)

    1.1:安装RabbitMQ 的依赖环境

    安装常用的环境和工具包

    yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel

    erlang官网下载你需要的erlang的版本:

    https://www.erlang.org/downloads

     将下载的tar.gz 上传到 Linux 虚拟机 解压 安装

    tar -xzvf otp_src_23.3.4.9.tar.gz -C /usr/local 

    cd /usr/local/otp_src_23.3.4.9

    mkdir -p /usr/local/erlang

    ./configure --prefix=/usr/local/erlang   编译配置

    直到打印如下日志

     使用make install 进行安装

    然后检查环境变量是否有erlang  

    如果没有则需要配置环境变量
    echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile

    刷新环境变量
    source /etc/profile

    检查是否安装成功

    ll /usr/local/erlang/bin

    输入 erl 并用 halt() . 函数退出

    到这里 erlang 环境安装完成

    1.2:安装RabbitMQ

    下载 RabbitMQ :

    https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.26

    注意官网上的这句话

    Erlang/OTP Compatibility Notes

    This release requires Erlang 23.2 and supports Erlang 24.

    Erlang 和 rabbitMQ 是有版本对应关系的,版本不对应安装不成功

    上传到 linux

    rpm -ivh --nodeps rabbitmq-server-3.8.26-1.el7.noarch.rpm

    使用 yum 安装时 会报错 ,因为yum 比较谨慎一般都安装比较旧的版本,

     rpm -ivh --nodeps rabbitmq-server-3.9.5-1.el7.noarch.rpm   添加参数 --nodeps  忽略依赖校验

    再次执行 可以发现安装成功了

    #启动rabbitmq,-detached代表后台守护进程方式启动

    # 启动服务:rabbitmq-server -detached
    # 查看状态:rabbitmqctl status
    # 关闭服务:rabbitmqctl stop
    # 列出角色:rabbitmqctl list_users

    查看 状态如下 说明 安装启动成功

    查看用户列表

    #启用管理插件
    rabbitmq-plugins enable rabbitmq_management

     # 端口 15672(网页管理) 5672 (AMQP端口):
    #在浏览器中输入服务器IP:15672 就可以看到RabbitMQ的WEB管理页面了,但是现在并不能登录

    我们需要新建一个用户

     此时还无法登录,我们需要给管理控制台添加用户 并授予权限

    rabbitmqctl add_user developer   添加用户 

     根据提示输入密码:  dev123456

    给用户添加角色 并查看用户信息

    rabbitmqctl set_user_tags developer dev123456 administrator

    rabbitmqctl list_users  查看用户列表

    删除原有的 guest 用户

    rabbitmqctl delete_user guest

     使用 新的用户名 developer 密码  dev123456 登录 后台管理界面 可以发现三个机器的管理后台都可以 正常进入了

     

     1.3:常用的用户管理的命令及界面操作

    # 启动服务:rabbitmq-server -detached
    # 查看状态:rabbitmqctl status
    # 关闭服务:rabbitmqctl stop
    # 列出角色:rabbitmqctl list_users

     添加用户 rabbitmqctl  add_user developer
    #根据提示 添加密码

     删除用户 :rabbitmqctl delete_user guest

     修改密码 : rabbitmqctl change_password developer dev123456

     

     RabbitMQ中主要有administrator,monitoring,policymaker,management,impersonator,none几种角色

    修改角色: rabbitmqctl set_user_tags developer administrator

     也可以给用户设置多个角色,如给用户developer设置administrator,monitoring

    rabbitmqctl set_user_tags developer administrator monitoring

     
    

    权限包含 读 写 配置

    权限赋值 rabbitmqctl set_permissions -p / developer ".*" ".*" ".*"

    查看(指定vhostpath)所有用户的权限
    rabbitmqctl list_permissions

     查看virtual host为/的所有用户权限:
    rabbitmqctl list_permissions -p /

     

     查看指定用户的权限
    rabbitmqctl list_user_permissions developer

     清除用户权限
    rabbitmqctl clear_permissions developer

     

    1.4:RabbitMQ用户角色及权限控制

    RabbitMQ的用户角色分为5类:
    none、management、policymaker、monitoring、administrator

    none
    不能访问 management plugin

    management
    拥有这种角色的用户 通过AMQP做的任何事外
    还可以 列出自己可以通过AMQP登入的virtual hosts 
    查看自己的virtual hosts中的queues, exchanges 和 bindings
    查看和关闭自己的channels 和 connections
    查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动

    policymaker 
    该角色可以拥有management可以做的任何事外加权限:
    查看、创建和删除自己的virtual hosts所属的policies和parameters

    monitoring  
    management可以做的任何事外加:
    列出所有virtual hosts,包括他们不能登录的virtual hosts
    查看其他用户的connections和channels
    查看节点级别的数据如clustering和memory使用情况
    查看真正的关于所有virtual hosts的全局的统计信息

    administrator   最高权限
    policymaker和monitoring可以做的任何事外加:
    创建和删除virtual hosts
    查看、创建和删除users
    查看创建和删除permissions
    关闭其他用户的connections

    创建用户并设置角色:
    可以创建管理员用户,负责整个MQ的运维,例如:
    $sudo rabbitmqctl add_user  admin(用户名)  admin(密码)
    赋予其administrator角色:
    $sudo rabbitmqctl set_user_tags admin administrator  添加 administrator 角色

    创建和赋角色完成后查看并确认:
    $sudo rabbitmqctl list_users

    添加权限

    对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
    set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

    其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默认的exchange,'^$'不匹配任何资源
    需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
    为用户赋权:
    $sudo rabbitmqctl  set_permissions -p /TEST  admin '.*' '.*' '.*'
    该命令使用户admin具有/TEST这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
     

    2: Java 操作RabbitMQ

    2.1: 第一个Java RabbitMQ 程序

    创建一个 maven 工程  ,添加如下依赖和配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.rb</groupId>
      <artifactId>rb</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>rb</name>
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.30</version>
          <scope>test</scope>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
          <groupId>commons-io</groupId>
          <artifactId>commons-io</artifactId>
          <version>2.4</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.7</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
        </plugins>
      </build>
    </project>

    编写 一个 消息生产者

    package org.rb;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * Hello world!
     *
     */
    
    public class MsgProducer
    {
        private static final String QUEEN_NAME="hello";
        public static void main( String[] args )throws Exception {
            String msg = "Hello World";
            //创建链接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置工厂配置信息
            factory.setHost("192.168.217.128");
            factory.setPassword("dev123456");
            factory.setUsername("developer");
            //创建链接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //创建队列
            //参数说明
            /***
            1:队列名称
            2:是否持久化持久化会保存到磁盘,默认是保存到内存
            3:是否多个消费者共享消费
            4:是否自动删除
            5:其他参数,延迟或者死信队列等
            ***/
    
            channel.queueDeclare(QUEEN_NAME,false,false, false,null);
            /**
             * 1: 发送到那个交换机
             * 2:路由的key值
             * 3: 其他参数信息
             * 4:参数内容
             * **/
            channel.basicPublish("",QUEEN_NAME,null,msg.getBytes());
            System.out.println("消息發送完畢");
        }
    }

    执行生产者代码 发送信息,并在 rabbitmq 的管理控制台检查对列中的消息是否发送成功,看到如下结果 说明信息发送成功

     创建消费者

    package org.rb;
    
    import com.rabbitmq.client.*;
    
    public class MsgConsumer {
        private static final String QUEEN_NAME="hello";
        public static void main(String[] args) throws Exception{
            //创建链接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置工厂配置信息
            factory.setHost("192.168.217.128");
            factory.setPassword("dev123456");
            factory.setUsername("developer");
            //创建链接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            /**
             * 1:被消费队列名
             * 2:是否自动应答 true 是自动应答 false 手动应答
             * 3:消费者未成功消费的回调函数
             * 4: 消费者去掉消费的回调
             * */
            //正常获取消息
            DeliverCallback deliverCallback = (consumerTag,message)->{
                System.out.println(new String(message.getBody()));
            };
            //消费消息被中断
            CancelCallback cancelCallback=(consumerTag)->{
                System.out.println("消费消息被中断");
            };
    
            channel.basicConsume(QUEEN_NAME,true,deliverCallback,cancelCallback);
            System.out.println("消息接受完毕");
        }
    }
    
    

     可以看见消息被消费了,消息个数变成0 了

    2.2: work queen 模式

    一个生产者发送消息

    可以有多个消费者 ,但是只有一个消费者可以获取到消息,使用轮询方式来处理消息,消息不可以重复被消费

    编写 work01 工作线程,并在idea 中 设置使这个class 可以多个线程执行

     通过修改这里来表示 多个线程收到信息

    DeliverCallback deliverCallback = (consumerTag, message)->{
        System.out.println("C02接受到的消息:"+ new String(message.getBody()));
    };

    package org.rb.day01;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    //消费类
    public class Worker01 {
        private static final String QUEEN_NAME="hello";
    
        public static void main(String[] args) throws Exception{
           Channel channel =  RabbitMqUtils.getChannel();
            /**
             * 1:被消费队列名
             * 2:是否自动应答 true 是自动应答 false 手动应答
             * 3:消费者未成功消费的回调函数
             * 4: 消费者去掉消费的回调
             * */
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("接受到的消息:"+ new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消息被取消回调");
            };
            channel.basicConsume(QUEEN_NAME,true,deliverCallback,cancelCallback);
        }
    }

    编写生产者代码

    package org.rb.day01;
    
    import com.rabbitmq.client.Channel;
    import org.apache.commons.lang3.StringUtils;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    public class Producer01 {
        private static final String QUEEN_NAME="hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            /***
             1:队列名称  queue
             2:是否持久化持久化会保存到磁盘,默认是保存到内存 durable
             3:是否多个消费者共享消费 exclusive
             4:是否自动删除 autoDelete
             5:其他参数,延迟或者死信队列等  arguments
             ***/
            channel.queueDeclare(QUEEN_NAME,false,false,false,null);
            //为了发消息比较明显,使用控制台发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEEN_NAME,null,message.getBytes());
                System.out.println("消息"+message+"发送完成");
            }
        }
    }
    
    

    执行producer代码发送多个消息,会发现 消费端 会轮流的打印 发送的消息,并且不会重复消费,是轮询的的消费消息的 

     2.3:消息应答机制

    在消费者消费完之后 进行应答,只有在获取到消费端消费完的应到之后才删除队列里的消息

    避免消息丢失,自动应答并不靠谱 ,特别是在接受到大量消息的时候 ,如果后续处理消息的过程中 发生了异常,可能会导致大量消息丢失

    消息应答的方式:

    Channel.basicAck()   用于肯定确认,消息已经肯定处理成功了

    Channel.basicNack()   用于否定确认,不能确定消息已经肯定处理成功了

    Channel.basicReject()  用于拒绝确认,不能确定消息已经肯定处理成功了 比这个 Channel.basicNack() 对一个是否批量处理的参数mutilple

    手动应答的好处 可以批量应答,你比那个且可以减少网络阻塞,如下图所示当批量应答时 只要channel 中的第一个被应答 ,信道中的其他消息就会一并被应答

    而不批量应答只能一个一个的应答

     消息自动重新入队,开启手动应答

    当有多个消费端时 ,如果一个消息已经发送,并且被一个消费端获取了,但是队列并未收到ack ,这时队列并不会删除消息,而是将消息从新入队并将消息发送个另外一个可达的 消费端消费

     消息手动应答的代码

    package org.rb.day01;
    
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    public class NotAutoAckProducer {
        private static final String NOTAUTOACK_QUEEN_NAME="ack_queen";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",NOTAUTOACK_QUEEN_NAME,null,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }
    
    package org.rb.day01;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    //手动应答,消息不丢失,返回队列个可达的消费者消费
    public class NotAutoAckConsumer {
        private static final String NOTAUTOACK_QUEEN_NAME="ack_queen";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message)->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                /**
                 * 手动应答 获取消息标签
                 * 和不批量应答
                 * **/
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                System.out.println("NotAutoAckConsumer1接受到的消息:"+ new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消息被取消回调");
            };
    
            boolean autoAck = false;
            channel.basicConsume(NOTAUTOACK_QUEEN_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    
    package org.rb.day01;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    //手动应答,消息不丢失,返回队列个可达的消费者消费
    public class NotAutoAckConsumer2 {
        private static final String NOTAUTOACK_QUEEN_NAME="ack_queen";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message)->{
                try {
                    Thread.sleep(1000*20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                System.out.println("NotAutoAckConsumer2接受到的消息:"+ new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消息被取消回调");
            };
    
            boolean autoAck = false;
            channel.basicConsume(NOTAUTOACK_QUEEN_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }

    队列持久化:

    在消息发送的时候,将durable 设置为true 表示 开启持久化 

     channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null);

    package org.rb.day01;
    
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    //持久化队列
    public class PersistProducer {
        private static final String NOTAUTOACK_QUEEN_NAME="persist_queen";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            // durable true 表示 开启持久化
            channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",NOTAUTOACK_QUEEN_NAME,null,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }

    执行代码 去控制台 可以发现 queen 里面的 features 里面的只有个大写的D ,表示持久化了,此时关闭rabbbitmq 服务再重启,发现队列不会消失了

    消息持久化:

    只需要修改生产者在发送消息的时候 将第三个参数修改为

    //开启消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
    channel.basicPublish("",NOTAUTOACK_QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
    

    这里并不能保证绝对的消息不丢失 ,可能会在发送的某个时间点还没完全 处理完 但是 对服务挂了,也不能持久化

    package org.rb.day01;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.MessageProperties;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    //持久化队列
    public class PersistProducer2 {
        private static final String NOTAUTOACK_QUEEN_NAME="persist_queen";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            // durable true 表示 开启持队列久化
            channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                //开启消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
                channel.basicPublish("",NOTAUTOACK_QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }

    此时执行代码,关闭服务再次进入 控制台检查 发现 消息不会丢失了

    不公平分发

    在多个消费端的情况下 ,添加如下代码

    channel.basicQos(1);  
    // 设置不公平分发原则,体现一种能者多劳的方式 哪个线程处理的快,就会多处理消息而处理的慢的就会少处理

    预取值

    在上面的不公平分发的时候 传递给channel.basicQos(1)这个的参数为1 ,多个客户端里面的都是1 ,当 有一种情况我们需要发的数据的消息总数是已知的 ,这时候我们就可以通过改变这个参数来指定不同的客户端 消费 的消息数量, 比如 总共1000条消息,就可以指定 某个消费端 消费 200 条channel.basicQos(200) ,其他消费端消费 800 条 channel.basicQos(1), 这样就不管客户端 消费能力的问题,哪怕 消费端1 的消费能力很强 但是 它也只消费 200条消息, 消费端2 消费能力很差 但是 它也得消费800 条  ,这就是没有了能者多劳特性了

    2.4:发布确认

    在之前的讲解中,当生产者 发送消息给队列之后

    虽然已经通过 durable 将队列持久化 和 MessageProperties.PERSISTENT_TEXT_PLAIN 来持久化 消息 ,但是 生产者是不知道 消息是否真的持久化了的,这就需要 RabbitMQ 的应答机制来处理这个场景

    RabbitMQ 的应答机制主要有三种

    单个应答 : 一个一个应答 ,准确性最高 消息不回丢失 但是效率比较低,同步操作

    批量应答 : 效率比较高 ,但是 当消费端 批量消费的过程中 ,如果还没处理完就出现异常了,那么被取出的数据中 还未处理完的那部分消息就会丢失,同步操作

    异步批量应答 :采用多线程的方式 使用监听器,在发送消息的同时也在监控没有发送成功的数据

    package org.rb.day02;
    
    //发布确认
    //1: 单个确认
    //2:批量确认
    //3:异步确认
    
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.UUID;
    
    public class ConfirmProducer {
        public static int MESSAGE_COUNT = 800;
        public static void main(String[] args) throws Exception{
            //ConfirmProducer.publishMsgSingle(); //发布800个消息耗时1575ms
            //ConfirmProducer.publishMsgBatch();  //发布800个消息耗时242ms ,当出现消息未确认时无法知道那个消息没有被确认
            ConfirmProducer.publishMsgSynchBatch(); //发布800个消息耗时197ms  异步的时间
        }
    
        //1: 单个确认
        public static void publishMsgSingle() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queenName = UUID.randomUUID().toString();
            channel.queueDeclare(queenName,true,false,false,null);
            channel.confirmSelect(); // 开启发布确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "msg"+i;
                channel.basicPublish("",queenName,null,message.getBytes());
                boolean flag = channel.waitForConfirms();  //等候确认
                if(flag){
                  System.out.println("第"+i+"个消息发送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms");
        }
    
        //2:批量确认
        public static void publishMsgBatch() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queenName = UUID.randomUUID().toString();
            channel.queueDeclare(queenName,true,false,false,null);
            channel.confirmSelect(); // 开启发布确认
            //批量确认的大小 假设 200条
            int batchSize = 200;
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "msg"+i;
                channel.basicPublish("",queenName,null,message.getBytes());
                if(i%batchSize == 0 ){
                    channel.waitForConfirms();
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms");
        }
    
        //3:异步确认
        public static void publishMsgSynchBatch() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queenName = UUID.randomUUID().toString();
            channel.queueDeclare(queenName,true,false,false,null);
            channel.confirmSelect(); // 开启发布确认
    
            long start = System.currentTimeMillis();
            //消息成功发送回调函数  deliveryTag  消息标记  multiple 是否批量确认
            ConfirmCallback ackCallback = (deliveryTag, multiple)->{
                  System.out.println("正确确认的消息:"+deliveryTag);
            };
    
            //消息失败回调函数  deliveryTag  消息标记  multiple 是否批量确认
            ConfirmCallback nackCallback = (deliveryTag, multiple)->{
                //这里获取到未处理成功的消息
                System.out.println("未确认的消息:"+deliveryTag);
    
            };
    
            //创建监听器在发送消息之前,监听消息的成功和失败
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "msg"+i;
                channel.basicPublish("",queenName,null,message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms");
        }
    
    
    }
    

    采用异步方式的时候 怎样去处理未被及时确认的信息呢 ?

    最常用的一种方式是将为被确认的消息放到一个基于内存的可以被发布线程访问的队列

    比如 ConcurrentLinkedQueen ,这个队列可以在 confirm callbacks 与发布线程之间进行消息的传递

    异步确认处理未被确认消息的处理逻辑

    public static void publishMsgSynchBatch() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queenName = UUID.randomUUID().toString();
        channel.queueDeclare(queenName,true,false,false,null);
        channel.confirmSelect(); // 开启发布确认
    
        //方便批量删除 ,通过序号
        //支持多线程并发
        ConcurrentSkipListMap<Long,String> confirmMap = new ConcurrentSkipListMap<>();
    
        long start = System.currentTimeMillis();
        //消息成功发送回调函数  deliveryTag  消息标记  multiple 是否批量确认
        ConfirmCallback ackCallback = (deliveryTag, multiple)->{
            //步骤002 删除掉已经被消费的消息
            if(multiple){
                //可能会造成消息丢失 一般不用
                ConcurrentNavigableMap<Long,String> confirmedMap =  confirmMap.headMap(deliveryTag);
                confirmedMap.clear();
            }else{
                //推荐使用这种方式
                confirmMap.remove(deliveryTag);
            }
            System.out.println("正确确认的消息:"+deliveryTag);
    
        };
    
        //消息失败回调函数  deliveryTag  消息标记  multiple 是否批量确认
        ConfirmCallback nackCallback = (deliveryTag, multiple)->{
            //这里获取到未处理成功的消息
    
            //步骤003 处理 步骤002 操作完成之后违未被确认的消息
            String unConfirmMessage = confirmMap.get(deliveryTag);
            System.out.println("未确认的消息:"+deliveryTag+"内容是:"+unConfirmMessage);
    
        };
    
        //创建监听器在发送消息之前,监听消息的成功和失败
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "msg"+i;
            channel.basicPublish("",queenName,null,message.getBytes());
            //步骤001 记录下所有的记录
            confirmMap.put(channel.getNextPublishSeqNo(),message);
        }
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms");
    }
    

    2.5:交换机 

    当需要消息被多个消费者消费的时候 ,就需要交换机,即 发布订阅模式 

    交换机分为: 直接类型 direct  主题类型 topic  标题类型 header  广播类型 fanout

    队列和交换机之间需要使用绑定 来产生联系 

     

     以 fanout 模式为例演示 交换机 和 队列之间的绑定 关系 ,以及生产者发送消息 ,多个消费者获取消息消费

    分别编写 消息生产者和两个消费者

    package org.rb.day02;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.MessageProperties;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    public class ExProducer {
        private static final String EXCHANGE_NAME="EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish(EXCHANGE_NAME,"", null,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }
    
    package org.rb.day02;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    public class ExConsumer {
        private static final String EXCHANGE_NAME="EX";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列
            channel.queueBind(queenName,EXCHANGE_NAME,"");
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
               System.out.println("ExConsumer01接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    
    package org.rb.day02;
    
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    public class ExConsumer02 {
        private static final String EXCHANGE_NAME="EX";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列
            channel.queueBind(queenName,EXCHANGE_NAME,"");
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
               System.out.println("ExConsumer02接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }

    启动 生产者和消费者查看 管理控制台中的绑定关系

    说明交换机 队列声明并绑定成功

    在生产端执行 发送消息, 并在 两个消费端的控制台查看打印信息 ,发现两个消费端都消费了 消息

    直接交换机:direct

    如果RoutingKey 在生产者 绑定交换机得时候使用的是一样的,那么其实 和上面的 fanout 是一样的,但是 要发送的 时候 RoutingKey 不一样 ,那么就会发送消息到不同的 队列去

    可以看到 当我们改变生产者中的 RoutingKey 的时候 ,不同Routingkey 对应的consumer 会接受到对应的消息

    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    //演示直接交换机生产者
    public class DirectExProducer {
        private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                //根据不同的RoutingKey 将消息发送的不同的队列
                channel.basicPublish(DIRECT_EXCHANGE_NAME,"003", null,message.getBytes("UTF-8"));
                //channel.basicPublish(DIRECT_EXCHANGE_NAME,"002", null,message.getBytes("UTF-8"));
                //channel.basicPublish(DIRECT_EXCHANGE_NAME,"003", null,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }
    

    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    //演示直接交换机消费者
    public class DirectExConsumer {
        private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列  可以多重绑定
            channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"001");
            channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"002");
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DirectExConsumer01接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    
    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    //演示直接交换机消费者
    public class DirectExConsumer2 {
        private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列  可以多重绑定
            channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"003");
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DirectExConsumer2接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    

     

     

     

     Topic 主题交换机:

    Topic: 所有符合routingKey表达式所绑定的队列可以接收消息

    发送到topic类型交换机的消息的routing_key不能随便设置
    它必须是多个用点(.)分割的单词组成,单词可以是任意的
    但它们通常指定连接到该消息的某些功能
    路由关键字可包含任意多的单词但最高限制是255字节。

    绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列,还可以有两种特殊的正则

    (1)* (星号) 可以代替一个完整的单词.
    (2)# (井号) 可以代替零个或多个单词.

    如: *.aaa.*  表示 要发到有三个单词 但中间一个单词必须是 aaa 的

          #.aaa.*  表示 要发到有多个单词, 但倒数第二个单词必须是 aaa 的

          #.aaa.#  表示 要发到有多个单词, 单词包含aaa 的

          *.aaa.#  表示 要发到有多个单词, 第二个单词是aaa 的  等

    代码演示:

    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.Scanner;
    
    public class TopicProducer {
        private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.*", null,message.getBytes("UTF-8"));
    //            channel.basicPublish(TOPIC_EXCHANGE_NAME,"#.aaa.*", null,message.getBytes("UTF-8"));
    //            channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.*", null,message.getBytes("UTF-8"));
    //            channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.#", null,message.getBytes("UTF-8"));
     //            channel.basicPublish(TOPIC_EXCHANGE_NAME,"#.aaa.#", null,message.getBytes("UTF-8"));
                System.out.println("消息"+message+"发送完成");
            }
        }
    }
    
    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    public class TopicConsumer {
        private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列  可以多重绑定
            channel.queueBind(queenName,TOPIC_EXCHANGE_NAME,"www.aaa.www");
    
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DirectExConsumer01接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    
    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    public class TopicConsumer2 {
        private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列  可以多重绑定
            channel.queueBind(queenName,TOPIC_EXCHANGE_NAME,"www.SSS.www");
    
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DirectExConsumer02接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    
    package org.rb.day02;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    public class TopicConsumer3 {
        private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除
            String queenName = channel.queueDeclare().getQueue();
            //绑定交换机和队列  可以多重绑定
            channel.queueBind(queenName,TOPIC_EXCHANGE_NAME,"*.aaa.*");
    
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DirectExConsumer03接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            channel.basicConsume(queenName,true,deliverCallback,cancelCallback);
        }
    }
    
    

     也可以 在生产端使用一个map 来将参数包装之后 在循环发送消息,并使用不同的消费端规则生产多个消费端来检测这个夏曦会被路由到哪个消费端去消费

    2.6: 死信队列

    死信队列的来源 ,信息在消费过程中没有正确的被消费

    1. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
    2. 消息过期了
    3. 队列达到最大的长度

    死信队列&死信交换器:DLX 全称(Dead-Letter-Exchange),称之为死信交换器,
    当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,
    那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,
    与这个死信交换器绑定的队列就是死信队列。

    时间过期进入死信代码演示

    先编写一个 消费者 里面需要定义如下内容

    /**
     * 需要定义 正常的交换机            死信交换机
     *         正常队列                死信队列
     *         正常交换机的RoutingKey   死信的 RoutingKey
     *         以及正常的交换即和正常队列的绑定    死信交换机和死信队列的绑定
     *         以及 发生异常时 将信息发送到死信的操作
     *
     * **/

    代码如下

    package org.rb.day02;
    
    //正常的消费者
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 需要定义 正常的交换机            死信交换机
     *         正常队列                死信队列
     *         正常交换机的RoutingKey   死信的 RoutingKey
     *         以及正常的交换即和正常队列的绑定    死信交换机和死信队列的绑定
     *         以及 发生异常时 将信息发送到死信的操作
     *
     * **/
    public class DeadMessageConsumer {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //死信交换机
        public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
    
        //正常的交队列
        public static final String NORMAL_QUEUE = "NORMAL_QUEUE";
        //死信交队列
        public static final String DEAD_QUEUE = "DEAD_QUEUE";
    
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
        //死信的 RoutingKey
        public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //正常的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //=======================================
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DeadMessageConsumer接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            //正常队列
            //準備給正常隊列的參數
            Map<String,Object> arguments = new HashMap<>();
            //设置不能正常消费时的私信交换机的参数
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key",DEAD_ROUTING_KEY);
            arguments.put("x-message-ttl",10000); //设置过期时间
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,null);
    
            //死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //正常的交换即和正常队列的绑定
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);
            //死信交换机和死信队列的绑定
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_ROUTING_KEY);
            //=======================================
            System.out.println("..........等待接受消息..............");
            channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, cancelCallback);
    
        }
    
    }

    编写完成后 启动这个消费端 ,查看控制台 ,可以看到 相应的绑定关系如下

     再编写一个生产者 

    package org.rb.day02;
    
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    //测试死信队列
    public class DeadMessageProducer {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //定义消息被消费的过期时间
            //超过这个时间 还没收到ack 将会把信息 丢入到死信队列
            for (int i = 0; i < 25; i++) {
                String message = "messagewwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww"+i;
               // channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,properties,message.getBytes());
              //测试超过做大长度
               channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,null,message.getBytes());
            }
    
        }
    }
    

    将上一步中的 消费者线程关闭,启动生产者并发送信息 ,查看10 秒后 正常队列 和死信队列里面的数据变化,会发现 正常队列中 现有数据 ,然后 逐渐减少, 但是重新启动 消费者后 死信队列里面会逐渐增加,而正常队列逐渐减少

     超过10s之后  正常队列里的信息 进入了死信队列

     再编写一个 简单的 消费者去消费 死信队列 里面的消息

    package org.rb.day02;
    
    //正常的消费者
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    public class DeadMessageConsumer2 {
    
        //死信交队列
        public static final String DEAD_QUEUE = "DEAD_QUEUE";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
    
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DeadMessageConsumer2接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            //=======================================
            System.out.println("..........等待接受消息..............");
            channel.basicConsume(DEAD_QUEUE,true, deliverCallback, cancelCallback);
    
        }
    
    }
    

    启动 消费 死信的 消费者可以看到消息被消费了 ,再检查 管理控制台 发现上一步中的死信队列里的数据已经 被消费了 

    队列达到最大长度的 演示 ,

    删除之前生成的队列,并重新 启动 正常消费者

    重新生成队列,重新启动生产者 发送消息 ,发现 队列里面 超过最大长度的部分会进入死信队列

    代码:

    package org.rb.day02;
    
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    //测试死信队列
    public class DeadMessageProducer {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //定义消息被消费的过期时间
            //超过这个时间 还没收到ack 将会把信息 丢入到死信队列
            for (int i = 0; i < 25; i++) {
                String message = "messagewwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww"+i;
               // channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,properties,message.getBytes());
              //测试超过做大长度
               channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,null,message.getBytes());
            }
    
        }
    }
    

    package org.rb.day02;
    
    //正常的消费者
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 需要定义 正常的交换机            死信交换机
     *         正常队列                死信队列
     *         正常交换机的RoutingKey   死信的 RoutingKey
     *         以及正常的交换即和正常队列的绑定    死信交换机和死信队列的绑定
     *         以及 发生异常时 将信息发送到死信的操作
     *
     * **/
    public class DeadMessageConsumer {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //死信交换机
        public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
    
        //正常的交队列
        public static final String NORMAL_QUEUE = "NORMAL_QUEUE";
        //死信交队列
        public static final String DEAD_QUEUE = "DEAD_QUEUE";
    
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
        //死信的 RoutingKey
        public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //正常的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //=======================================
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DeadMessageConsumer接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            //正常队列
            //準備給正常隊列的參數
            Map<String,Object> arguments = new HashMap<>();
            //设置不能正常消费时的私信交换机的参数
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key",DEAD_ROUTING_KEY);
            //arguments.put("x-message-ttl",10000); //设置过期时间
            //测试超过最大长度
            arguments.put("x-max-length",20);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //正常的交换即和正常队列的绑定
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);
            //死信交换机和死信队列的绑定
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_ROUTING_KEY);
            //=======================================
            System.out.println("..........等待接受消息..............");
            channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, cancelCallback);
    
        }
    
    }
    

    package org.rb.day02;
    
    //正常的消费者
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    public class DeadMessageConsumer2 {
    
        //死信交队列
        public static final String DEAD_QUEUE = "DEAD_QUEUE";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
    
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("DeadMessageConsumer2接受到消息"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            //=======================================
            System.out.println("..........等待接受消息..............");
            channel.basicConsume(DEAD_QUEUE,true, deliverCallback, cancelCallback);
    
        }
    
    }
    
    

     分别启动 正常消费者 和消费死信队列的消费者 ,会发现 管理控台中的消息被消费了 ,并且 代码的控制台中 两个consumer 分别打印20 条 和 5 条 ,这样 超过最大长度 的 案例演示完成

     

     

     消息被拒绝进入死信队列

    重新编写 一个生产者和消费者 ,消费死信的消费者还用原来的

    package org.rb.day02;
    
    import com.rabbitmq.client.Channel;
    import org.rb.util.RabbitMqUtils;
    
    //测试死信队列
    public class DeadMessageProducer {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //定义消息被消费的过期时间
            //超过这个时间 还没收到ack 将会把信息 丢入到死信队列
            for (int i = 0; i < 20; i++) {
                String message = "info"+i;
               // channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,properties,message.getBytes());
              //测试超过做大长度
               channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY,null,message.getBytes());
            }
    
        }
    }
    

    package org.rb.day02;
    
    //正常的消费者
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import org.rb.util.RabbitMqUtils;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 需要定义 正常的交换机            死信交换机
     *         正常队列                死信队列
     *         正常交换机的RoutingKey   死信的 RoutingKey
     *         以及正常的交换即和正常队列的绑定    死信交换机和死信队列的绑定
     *         以及 发生异常时 将信息发送到死信的操作
     *
     * **/
    public class DeadMessageConsumer3 {
        //正常的交换机
        public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
        //死信交换机
        public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
    
        //正常的交队列
        public static final String NORMAL_QUEUE = "NORMAL_QUEUE";
        //死信交队列
        public static final String DEAD_QUEUE = "DEAD_QUEUE";
    
        //正常交换机的RoutingKey
        public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";
        //死信的 RoutingKey
        public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY";
    
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //正常的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //=======================================
            //接受消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                String msg = new String(message.getBody());
                if(msg.equals("info5")){
                    System.out.println("DeadMessageConsumer拒绝接受"+msg);
                    //requeue 表示是否重新放回队列,拒绝不放回队列 ,使其进入死信队列
                    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
                }else{
                    System.out.println("DeadMessageConsumer接受到消息"+new String(message.getBody()));
                    //mutiple 设置false 表示不批量应答
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            };
            CancelCallback cancelCallback = (consumerTag)->{
                //未处理
            };
            //正常队列
            //準備給正常隊列的參數
            Map<String,Object> arguments = new HashMap<>();
            //设置不能正常消费时的私信交换机的参数
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key",DEAD_ROUTING_KEY);
            //arguments.put("x-message-ttl",10000); //设置过期时间
            //测试超过最大长度
            //arguments.put("x-max-length",20);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //正常的交换即和正常队列的绑定
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);
            //死信交换机和死信队列的绑定
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_ROUTING_KEY);
            //=======================================
            System.out.println("..........等待接受消息..............");
            //autoAck 为false 不自动应答
            channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, cancelCallback);
    
        }
    
    }

    删除之前的同名队列 ,不让启动成都的时候会报错,然后再次启动消费者

    生成 交换机和队列等

    然后执行生产者的代码,并查看代码日志 ,发现被拒绝的消息也进入了死信队列

     启动消费死信的 消费者 该条信息被消费

     查看管理控制台 也发现消息被消费了

    3:SpringBoot  和 队列 TTL

    3.1: 使用springboot 来演示一个实际环境中的延迟队列

     途中的 蓝色部分 是死信队列 死信交换机 ,黑色部分之前的是 普通队列和普通的交换机

    需要编写一个配置类 来实现除了生产者和消费者的中间部分的代码逻辑

    普通交换机和普通队列使用不同的RoutingKey 来表示不同的规则 ,死信队列和死信交换机 使用相同的 RoutingKey

    这个架构图中的消息 会经过一个生产者产生 一条信息 分别发送到普通推列之后 ,分别经过10秒 和20 秒言延迟后变成延迟的消息 进入死信队列 ,再由消费端监听到死信队列的消息之后 消费

    创建springboot 项目 

    添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <!-- 必须的依赖 -->
      <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/>
      </parent>
      <groupId>org.rmq</groupId>
      <artifactId>sptingboot-rabbitmq</artifactId>
      <version>1.0-SNAPSHOT</version>
      <name>sptingboot-rabbitmq</name>
    
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!-- 必须的依赖 -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
          <groupId>commons-io</groupId>
          <artifactId>commons-io</artifactId>
          <version>2.9.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-pool2</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-amqp</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit-test -->
        <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit-test</artifactId>
          <version>2.3.10</version>
          <scope>test</scope>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.22</version>
          <scope>provided</scope>
        </dependency>
    
    
        <!--
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        -->
    
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.9.6</version>
        </dependency>
    
        <!--swagger图形化接口 開始-->
        <dependency>
          <groupId>io.springfox</groupId>
          <artifactId>springfox-swagger2</artifactId>
          <version>2.9.2</version>
        </dependency>
    
        <dependency>
          <groupId>io.springfox</groupId>
          <artifactId>springfox-swagger-ui</artifactId>
          <version>2.9.2</version>
        </dependency>
    
        <!--swagger图形化接口結束-->
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
        </plugins>
      </build>
    
    </project>

    配置文件 application.properties

    # 应用名
    spring.application.name=springboot-rabbitmq
    # rabbitmq配置信息
    # ip
    spring.rabbitmq.host=192.168.217.128
    # 端口
    spring.rabbitmq.port=5672
    # 用户名
    spring.rabbitmq.username=developer
    # 密码
    spring.rabbitmq.password=dev123456
    # 配置虚拟机
    spring.rabbitmq.virtual-host=/
    # 消息开启手动确认
    spring.rabbitmq.listener.direct.acknowledge-mode=manual

    编写生产者

    package org.rmq.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.util.Date;
    
    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class TestController {
        //普通交换机
        private static final String X_EXCHANGE = "X";
        //普通RoutingKey
        private static final String NOMAL_ROUTINGKEY_A = "XA";
        private static final String NOMAL_ROUTINGKEY_B = "XB";
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @ResponseBody
        @GetMapping("/sendmsg/{message}")
        public void sendMsg(@PathVariable("message") String message){
            log.info("当前时间:{},发送消息{}给两个ttl队列",new Date().toString(),message);
            rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_A,"消息来自ttl为10秒的队列:"+message);
            rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_B,"消息来自ttl为20秒的队列:"+message);
        }
    }
    

    编写 交换机 队列 routingKey 的配置以及绑定配置类

    package org.rmq.config;
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class TTLQueueConfig {
        //普通交换机
        private static final String X_EXCHANGE = "X";
        //死信交换机
        private static final String DEAD_LETTER_EXCHANGE = "Y";
    
        //普通队列
        private static final String QUEUE_A = "QA";
        private static final String QUEUE_B = "QB";
        //死信队列
        private static final String DEAD_LETTER_QUEUE = "QD";
    
        //普通RoutingKey
        private static final String NOMAL_ROUTINGKEY_A = "XA";
        private static final String NOMAL_ROUTINGKEY_B = "XB";
        //死信RoutingKey
        private static final String DEAD_ROUTINGKEY= "YD";
    
    
        //声明普通交换机
        @Bean("xExchange")
        public DirectExchange xExchange(){
           return  new DirectExchange(X_EXCHANGE);
        }
    
        //声明死信交换机
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return  new DirectExchange(DEAD_LETTER_EXCHANGE);
        }
    
        //声明两个普通队列
        @Bean("queueA")
        public Queue queueA(){
            //準備給正常隊列的參數
            Map<String,Object> arguments = new HashMap<>(3);
            //设置不能正常消费时的私信交换机的参数
            arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY);
            arguments.put("x-message-ttl",10000); //设置过期时间 单位为ms (毫秒)
            return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
        }
    
        @Bean("queueB")
        public Queue queueB(){
            //準備給正常隊列的參數
            Map<String,Object> arguments = new HashMap<>(3);
            //设置不能正常消费时的私信交换机的参数
            arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
            //设置死信交换机的RoutingKey
            arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY);
            arguments.put("x-message-ttl",20000); //设置过期时间 单位为ms (毫秒)
            return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
        }
    
        //声明死新队列
        @Bean("deadQueueD")
        public Queue deadQueueD(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        }
    
    
        //绑定普通交换机
        @Bean
        public Binding queueAbindingToxExchange(@Qualifier("queueA") Queue queueA,
                                                @Qualifier("xExchange") DirectExchange xExchange){
          return BindingBuilder.bind(queueA).to(xExchange).with(NOMAL_ROUTINGKEY_A);
        }
    
        @Bean
        public Binding queueBbindingToxExchange(@Qualifier("queueB") Queue queueB,
                                                @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with(NOMAL_ROUTINGKEY_B);
        }
    
        //绑定死信交换机
        @Bean
        public Binding deadQueueDbindingToYExchange(@Qualifier("deadQueueD") Queue deadQueueD,
                                                @Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(deadQueueD).to(yExchange).with(DEAD_ROUTINGKEY);
        }
    
    }
    

    编写消费者

    package org.rmq.consumer;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Slf4j
    @Component
    public class TTLQueueConsumer {
        //死信队列
        private static final String DEAD_LETTER_QUEUE = "QD";
        @RabbitListener(queues = DEAD_LETTER_QUEUE)
        public void recieveTTLInfo(Message message, Channel channel){
          String msg = new String(message.getBody());
          log.info("当前时间{},收到死信队列的消息{}",new Date().toString(),msg);
        }
    }
    

     启动项目,

    在浏览器访问

     

    至此 使用 死信队列 + ttl 来处理延迟消息的例子完成,  这里实际上就是工作中常用到的延迟队列

    3.2: TTL 队列优化

    在3.1 里面演示的延迟队列是存在问题的 ,当我们还需要定义更多的延迟时间的队列的时候 ,就还需要增加不同的 普通队列 和 RoutingKey  这样就不得不修改代码,我们希望可以写一个通用的来替代

    可以在之前的代码中添加一个新的 交换机和队列 ,不在消费端定义过期时间,而是在生产者端去指指定时间 ,这样就可以可解决问题

    在3.1 的TTLQueueConfig代码中添加 变量

     private static final String QUEUE_C = "QC";  

    private static final String NOMAL_ROUTINGKEY_C = "XC";

    再新增一个队列的定义,并绑定死信队列,不指定过期时间参数

    @Bean("queueC")
    public Queue queueC(){
        //準備給正常隊列的參數
        Map<String,Object> arguments = new HashMap<>(3);
        //设置不能正常消费时的私信交换机的参数
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        //设置死信交换机的RoutingKey
        arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY);
        //arguments.put("x-message-ttl",20000); //设置过期时间 单位为ms (毫秒)
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    
    @Bean
    public Binding queueCBindingToxExchange(@Qualifier("queueC") Queue queueC,
                                            @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with(NOMAL_ROUTINGKEY_C);
    }

    这样就加好了配置类中的内容

    在生产端 添加一个新的测试代码

    @ResponseBody
    @GetMapping("/sendmsgttl/{message}/{ttltime}")
    public void sendMsg(@PathVariable("message") String message,@PathVariable("ttltime") String ttltime){
        log.info("当前时间:{},发送消息{}给一个ttl队列,过期时间是{}ms",new Date().toString(),message,ttltime);
        rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_C,message,msg->{
            msg.getMessageProperties().setExpiration(ttltime);
            return msg;
        });
    }

    执行结果

     发现当有多条消极被发送时, 发送时间 基于了第一条的时间来执行了,哪怕第二条消息时间比第一条端,还是按第一条的时间来处理了,所以这里还是有问题的,我们还需要更多的优化

    3.3 TTL + 死信队列优化2

    解决上面的问题 我们需要安装一个插件

    官网地址 :Community Plugins — RabbitMQ     点击进去之后会发现,已经被托管到git 上了

    进入  git  https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    下载对应的版本 注意 Erlang 支持的版本 ,由于我的 符合这个版本要求,就直接下载啦

     

     下载之后 将 该文件 放到 rabbitmq 的 plugins 目录下

    我的机器目录  /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.26

     

     执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange  安装插件

     安装玩插件后需要重启服务

    rabbitmqctl  rabbitmq-server stop  关闭服务
    rabbitmq-server -detached   启动服务
    rabbitmqctl status    查看服务状态

     此时 我我们延迟交换机的 延迟队列 就变更到交换机了

    编写 一个新的 延迟对类的配置类

    package org.rmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class DelayedQueueConfig {
        private static final String DELAYED_EXCHANGE = "DELAYED.EXCHANGE";
        private static final String DELAYED_QUEUE = "DELAYED.QUEUE";
        private static final String DELAYED_ROUTING_KEY = "DELAYED.ROUTINGKEY";
        private static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message";
    
        @Bean("delayedExchange")
        public CustomExchange delayedExchange(){
            /**
             * name  交换机名称
             * type  交换机类型
             * durable  是否持久化交换机
             * autoDelete 是否自动删除交换机
             * arguments  其他参数
             */
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type","direct"); //延迟类型
            //x-delayed-message  延迟消息
            return new CustomExchange(DELAYED_EXCHANGE,DELAYED_EXCHANGE_TYPE,true,false,arguments);
        }
    
        @Bean("delayedQueue")
        public Queue delayedQueue(){
            return new Queue(DELAYED_QUEUE);
        }
    
        @Bean
        public Binding delayedQueueBindingTodelayedExchange(
                @Qualifier("delayedQueue") Queue delayedQueue,
                @Qualifier("delayedExchange") CustomExchange delayedExchange){
                return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    
    }
    

    编写生产者 延时时间多少 由生产多发消息的时候 指定

    //############################################
    private static final String DELAYED_EXCHANGE = "DELAYED.EXCHANGE";
    private static final String DELAYED_ROUTING_KEY = "DELAYED.ROUTINGKEY";
    //############################################
    @ResponseBody
    @GetMapping("/sendDelayedmsg/{message}/{delaredTime}")
    public void sendMsg(@PathVariable("message") String message,@PathVariable("delaredTime") Integer delaredTime){
        log.info("当前时间:{},发送消息{}给一个延迟队列,时间是{}ms",new Date().toString(),message,delaredTime);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE,DELAYED_ROUTING_KEY,message,msg->{
            msg.getMessageProperties().setDelay(delaredTime);
            return msg;
        });
    }

    编写消费者:

    package org.rmq.consumer;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Component
    @Slf4j
    public class DelayedQueueConsumer {
        private static final String DELAYED_QUEUE = "DELAYED.QUEUE";
        @RabbitListener(queues = DELAYED_QUEUE)
        public void recieveDelayedInfo(Message message){
            String msg = new String(message.getBody());
            log.info("当前时间{},收到延迟队列的消息{}",new Date().toString(),msg);
        }
    }
    执行结果发现延迟的消息按照我们预想打印消费了,延迟时间长的后打印,延迟时间短的后打印,说明插件可用

    4:发布确认的高级内容

    在之前的内容里,都没有讨论交换机,队列由于某些原因收不到消息的情况,怎么在交换机和队列粗问题的时候 ,生产者能够知道消息发送失败了,这样就可以记录或者重发那些消息

    4.1:交换机 确认回调

    编写配置文件

    package org.rmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConfirmConfig {
    
        private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE";
        private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE";
        private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY";
    
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            return  new DirectExchange(CONFIRM_EXCHANGE);
        }
    
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            return QueueBuilder.durable(CONFIRM_QUEUE).build();
        }
    
        @Bean
        public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue,
                                   @Qualifier("confirmExchange") DirectExchange confirmExchange){
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    
    }
    
    

    编写生产者

    //############################################
    private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE";
    private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY";
    //############################################
    @ResponseBody
    @GetMapping("/sendConfirmmsg/{message}")
    public void sendConfirmMsg(@PathVariable("message") String message){
        /**
        exchange
        routingKey
        message
        CorrelationData correlationData  放入确认回调的信息
       **/
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE+"1234",CONFIRM_ROUTING_KEY,message,correlationData);
        System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message);
    }

    这个 CONFIRM_EXCHANGE+"1234"  是故意制造错误 用来验证数据发送不到交换机

    编写消费者

    package org.rmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class ConfirmConsumer {
        private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE";
    
        @RabbitListener(queues = CONFIRM_QUEUE)
        public void recieveConfirmMsg(Message message){
            System.out.println("ConfirmConsumer 接受到消息:"+new String(message.getBody()));
        }
    
    }
    

    测试:故意修改交换机的RoutingKey , 让生产端的消息 不能发送到交换机 ,检查 控制台信息,报错

     如果没有这个 spring.rabbitmq.publisher-confirms=true 配置 会发现我们得 fallbak 函数 不会被调用,发送了编号为:3fec94f3-c023-4653-badd-fbf9505be401消息内容是:www012
    2021-11-26 20:00:12.881 ERROR 31628 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM.EXCHANGE1234' in vhost '/', class-id=60, method-id=40)

    这个错误是因为我们还少了个配置需要配置到 properties 文件

    # 应用名
    spring.application.name=springboot-rabbitmq
    # rabbitmq配置信息
    # ip
    spring.rabbitmq.host=192.168.217.128
    # 端口
    spring.rabbitmq.port=5672
    # 用户名
    spring.rabbitmq.username=developer
    # 密码
    spring.rabbitmq.password=dev123456
    # 配置虚拟机
    spring.rabbitmq.virtual-host=/
    # 消息开启手动确认
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    # 开启发布确认回调
    spring.rabbitmq.publisher-confirms=true

    spring.rabbitmq.publisher-confirms=true    需要将这个 配置 写入配置文件 有的版本是

    spring.rabbitmq.publisher-confirm-type= simple 或者 none  或者 corrected  请根据自己的 版本来选用

    none : 禁用发布确认模式

    corrected : 会触发消息发布确认,并且不会关闭信道 channel

    simple : 会调用 waitConfirms() 或 waitConfirmsOrDie() 等待broker 返回结果,根据返回结果判断调用下一步逻辑,需要注意的是 调用 waitConfirmsOrDie() 返回false 的时候 ,会关闭channel ,导致 、无法继续发消息到broker 

    4.2:消息回退

    当交换机可达 但是 交换机需要路由到对应的 队列的时候 无法进行路由,就可以通消息回退的方式来获取到那个消息不能发送成功,获取到这些消息之后,就可以进行重新发送

    在配置文件中 添加

    # 开启消息回退
    spring.rabbitmq.publisher-returns=true

    修改4.1 中的callback文件

    package org.rmq.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @Component
    public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
        }
    
       /**
        * correlationData  消息确认内容
        * ack 是否收到确认
        * cause 收到或未收到确认的原因
        * **/
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                String id = correlationData.getId();
                log.info("确认收到消息,"+"编号:"+id);
            }else{
                String id = correlationData.getId();
                log.info("未能确认收到编号为:"+id+"的消息,失败原因是:"+cause);
            }
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息{}无法路由被交换机{}回退,原因是{},路由key:{}",
                    message.getBody(),exchange,replyText,routingKey);
        }
    }
    

    修改生产者

    @ResponseBody
    @GetMapping("/sendConfirmmsg/{message}")
    public void sendConfirmMsg(@PathVariable("message") String message){
        /**
        exchange
        routingKey
        message
        CorrelationData correlationData  放入确认回调的信息
       **/
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY,message,correlationData);
        System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message);
    
        String id2 = UUID.randomUUID().toString();
        CorrelationData correlationData2 = new CorrelationData(id2);
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY+"1234",message,correlationData2);
        System.out.println("发送了编号为:"+id2+"消息"+"内容是:"+message+"key是:"+CONFIRM_ROUTING_KEY+"1234");
    }
    

     执行结果:

    4.3:备份交换机

    我们还可以对4.2中的 架构进行优化,但发送不到 queue 的时候 可以对交换机做备份,当无法路由时不直接 消息回退,而是通过交换机将消息发送给备份交换机

    修改 配置类

    package org.rmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConfirmConfig {
    
        private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE";
        private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE";
        private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY";
    
    
        // 备份交换机 备份被队列
        private static final String BACKUP_EXCHANGE = "BACKUP.EXCHANGE";
        private static final String BACKUP_QUEUE = "BACKUP.QUEUE";
        private static final String WARNING_QUEUE = "WARNING.QUEUE";
    
    //    @Bean("confirmExchange")
    //    public DirectExchange confirmExchange(){
    //        return  new DirectExchange(CONFIRM_EXCHANGE);
    //    }
    
        //建立普通交换机在无法消费数据是将消息转发给备份交换机
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true)
                    .withArgument("alternate-exchange",BACKUP_EXCHANGE).build();
        }
    
    
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            return QueueBuilder.durable(CONFIRM_QUEUE).build();
        }
    
        @Bean
        public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue,
                                   @Qualifier("confirmExchange") DirectExchange confirmExchange){
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    
        //声明备份交换机 备份被队列
        @Bean("backupExchange")
        public FanoutExchange backupExchange(){
            return new FanoutExchange(BACKUP_EXCHANGE);
        }
    
        @Bean("backupQueue")
        public Queue backupQueue(){
            return QueueBuilder.durable(BACKUP_QUEUE).build();
        }
    
        @Bean("warningQueue")
        public Queue warningQueue(){
            return QueueBuilder.durable(WARNING_QUEUE).build();
        }
    
        //绑定 备份交换机和备份队列
        @Bean
        public Binding bcackupQueueBindToBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
                                                       @Qualifier("backupExchange") FanoutExchange backupExchange){
            return BindingBuilder.bind(backupQueue).to(backupExchange);
        }
        @Bean
        public Binding warningQueueBindToBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
                                                       @Qualifier("backupExchange") FanoutExchange backupExchange){
            return BindingBuilder.bind(warningQueue).to(backupExchange);
        }
    
    }

    添加一个备份的消费者

    package org.rmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class WarningConsumer {
        private static final String WARNING_QUEUE = "WARNING.QUEUE";
        @RabbitListener (queues = WARNING_QUEUE)
        public void recieveWarningMsg(Message message){
            log.info("warning get message"+new String(message.getBody()));
        }
    }

    生产者 还是用 4.2中的代码逻辑

    @ResponseBody
    @GetMapping("/sendConfirmmsg/{message}")
    public void sendConfirmMsg(@PathVariable("message") String message){
        /**
        exchange
        routingKey
        message
        CorrelationData correlationData  放入确认回调的信息
       **/
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY,message,correlationData);
        System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message);
    
        String id2 = UUID.randomUUID().toString();
        CorrelationData correlationData2 = new CorrelationData(id2);
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY+"1234",message,correlationData2);
        System.out.println("发送了编号为:"+id2+"消息"+"内容是:"+message+"key是:"+CONFIRM_ROUTING_KEY+"1234");
    }
    

    执行代码 , 报错,需要在管理控制台删除之前在4.2 中创建的队列和交换机,并重新启动程序,因为之前在普通交换机上并没有设置出现不能转发时的备份交换机

    2021-11-27 10:14:48.997 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10)
    2021-11-27 10:14:50.009 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10)
    2021-11-27 10:14:52.016 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10)
     

    重新 执行发送信息可以看到 不能发送到普通交换机的消息 已经被备份交换机获取到了

     4.4: 优先级队列

    创建优先级消息的生产者

    @ResponseBody
    @GetMapping("/sendprimsg/{message}")
    public void sendPriMsg(@PathVariable("message") String message){
        for(int i=10;i>1;i--){
            int finalI = i;
            rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"queue:"+i, msg -> {
                msg.getMessageProperties().setPriority(finalI);  //设置优先级
                return msg;
            });
        }
    }
    

    设置配置类

    package org.rmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    //优先级队列配置类
    @Configuration
    public class priQueueConfig {
        private static final String EXCHANGE = "priority-exchange";
    
        public static final String QUEUE = "priority-queue";
    
        private static final String ROUTING_KEY = "priority.queue";
        @Bean
        DirectExchange priExchange(){
            return new DirectExchange(EXCHANGE);
        }
        @Bean
        Queue priQueue(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-max-priority",10);//设置最大的优先级数量
            return new Queue(QUEUE,true,false,false,map);
        }
        @Bean
        public Binding priQueueBindpriExchange(
                @Qualifier("priQueue") Queue priQueue,
                @Qualifier("priExchange") DirectExchange priExchange
        ){
            return BindingBuilder.bind(priQueue).to(priExchange).with(ROUTING_KEY);
        }
    
    }

    设置消费者

    package org.rmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class PriConsumer {
        @RabbitListener(queues ="priority-queue")
        public void hand(String msg){
           log.info("接受到了一个消息:"+msg);
        }
    
    }
    
    

    以上几个章节代码完整路径

    GitHub - wanglei111000/RabbitMqDemo

    GitHub - wanglei111000/Springboot-rabbitmq

    5:RabbitMQ 集群

    请参考我的另一个博客 : RabbitMQ HAProxy +Keepalived 高可用集群_Java 码农的博客-CSDN博客

    展开全文
  • RabbitMQ实战

    2022-02-14 18:13:22
    RabbitMQ实战 通过之前的几篇文档,大家应该已经了解了RabbitMQ的基本使用。 但是具体什么场景下使用有些小伙伴可能还是一头雾水,本篇文章我将通过几个小demo来具体讲讲RabbitMQ的使用场景。 希望通过本节内容的...

    RabbitMQ实战

    通过之前的几篇文章,大家应该已经了解了RabbitMQ的基本使用。 但是具体什么场景下使用有些小伙伴可能还是一头雾水,本篇文章我将通过几个小demo来具体讲讲RabbitMQ的使用场景。

    希望通过本节内容的介绍,让大家深入的了解RabbitMQ的使用。


    提示:以下是本篇文章正文内容,下面案例可供参考

    一、异步记录用户操作日志

    @Configuration
    public class RabbitMqConfig {
        public static final String EXCHANGE_NAME = "DEMO_EXCHANGE";
    
        public static final String QUEUE_NAME = "LOG_QUEUE";
    
        public static final String ROUTER_KEY = "log";
    
        @Bean
        Queue queue(){
            return new Queue(QUEUE_NAME,true,false,false);
        }
    
        @Bean
        DirectExchange exchange(){
            return new DirectExchange(EXCHANGE_NAME);
        }
    
        @Bean
        Binding binding(){
            return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);
        }
    
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        RabbitTemplate rabbitTemplate(ConnectionFactory factory){
            RabbitTemplate template = new RabbitTemplate(factory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
    }
    
    
    @RestController
    public class OrderController {
    
        final RabbitTemplate rabbitTemplate;
    
        public OrderController(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        @GetMapping("placeOrder")
        public void placeOrder(HttpServletRequest request, @RequestBody(required = false) Shop shop){
            // 模拟下单
            placeOrder(shop);
            LogDto logDto = LogDto.builder().ip(request.getRemoteAddr()).module("商品模块").msg("购买了三个苹果").username("小明").build();
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,RabbitMqConfig.ROUTER_KEY,logDto);
        }
    
        private void placeOrder(Shop shop) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    @Repository
    public class LogDao {
    
        final JdbcTemplate jdbcTemplate;
    
        final SimpleJdbcInsert simpleJdbcInsert;
    
        public LogDao(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
            // 此处指定log表
            this.simpleJdbcInsert = new SimpleJdbcInsert(jdbcTemplate).withTableName("log");
        }
    
        /**
         * 记录日志
         * @param logDo
         * @return
         */
        public int saveLog(LogDo logDo){
            Map map = JSON.parseObject(JSON.toJSONString(logDo), Map.class);
            return simpleJdbcInsert.execute(map);
        }
    }
    

    二、并发量配置

      @RabbitListener(queues = RabbitConfig.QUEUE_NAME,concurrency = "10")
        public void listenMsg(String msg){
            logger.info("logger===>{},thread-name:{}",msg,Thread.currentThread().getName());
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    三、延迟发送消息

    RabbitMQ延迟队列实现
    应用场景:订单30分钟后超时自动关闭


    展开全文
  • rabbitMQ实战

    2018-10-10 14:56:44
    这是牧码人王老师讲的,里面的rabbitMQ下载、安装、六种消息类型都很全,代码我也都试过了,直接复制粘贴就可以使用,希望可以帮助到大家
  • RabbitMQ实战详解

    千次阅读 2020-11-15 20:40:55
    RabbitMQ使用教程 1. MQ 的基本概念 1.1. MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。 小结 MQ,消息队列,存储消息的中间件 分布式系统...

    RabbitMQ实战详解

    在这里插入图片描述

    1. MQ 的基本概念

    1.1. MQ概述

    • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信

    在这里插入图片描述

    小结

    1. MQ,消息队列,存储消息的中间件
    2. 分布式系统通信两种方式:直接远程调用借助第三方 完成间接通信
    3. 发送方称为生产者,接收方称为消费者

    1.2 MQ 的优势和劣势

    优势:

    • 应用解耦
    • 异步提速
    • 削峰填古

    劣势:

    • 系统可用性降低
    • 系统复杂性提高
    • 一致性问题

    1.3 MQ 的优势

    1. 应用解耦

    没有使用MQ:
    在这里插入图片描述- 系统的耦合性越高容错性就越低可维护性就越低

    使用MQ:
    在这里插入图片描述

    • 使用 MQ 使得应用间解耦提升容错性和可维护性

    2. 异步提速

    没有使用MQ:
    在这里插入图片描述

    • 一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
    • 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

    使用MQ:
    在这里插入图片描述

    • 用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
    • 提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

    3. 削峰填谷

    没有使用MQ:
    在这里插入图片描述使用MQ:

    在这里插入图片描述在这里插入图片描述

    • 使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ
      ,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷
    • 使用MQ后,可以提高系统稳定性。

    小结:

    • 应用解耦:提高系统容错性和可维护性
    • 异步提速:提升用户体验和系统吞吐量
    • 削峰填谷:提高系统稳定性

    1.4 MQ 的劣势

    在这里插入图片描述

    • 系统可用性降低

    系统引入的外部依赖越多系统稳定性越差一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

    • 系统复杂度提高

    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

    • 一致性问题

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

    小结

    既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

    • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
    • 容许短暂的不一致性
    • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

    1.5 常见的 MQ 产品

    • 目前业界有很多的 MQ 产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

    在这里插入图片描述

    1.6 RabbitMQ 简介

    • AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP
      规范发布。类比HTTP

    在这里插入图片描述
    2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

    RabbitMQ 基础架构如下图:

    在这里插入图片描述
    RabbitMQ 中的相关概念:

    • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
    • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出多个vhost每个用户在自己的 vhost创建 exchange/queue
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
    • Exchange:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。**常用的类型有:direct (point-to-point), topic (publish-subscribe) and
      fanout (multicast)
    • Queue消息最终被送到这里等待 consumer 取走
    • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding
      信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

    官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

    在这里插入图片描述

    1.7 JMS

    • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
    • JMS 是 JavaEE 规范中的一种,类比JDBC
    • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

    小结

    • RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品
    • RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
    • AMQP 是协议,类比HTTP。
    • JMS 是 API 规范接口,类比 JDBC。

    2. RabbitMQ 的安装和配置

    • RabbitMQ 官方地址:http://www.rabbitmq.com/
    • 安装文档:资料/软件/安装 RabbitMQ.md

    上传文件:

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    1. 安装依赖环境

    在线安装依赖环境:

    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
    

    2. 安装Erlang

    # 安装
    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    
    

    如果出现如下错误
    在这里插入图片描述说明gblic 版本太低。我们可以查看当前机器的gblic 版本

    shell strings /lib64/libc.so.6 | grep GLIBC
    

    在这里插入图片描述
    当前最高版本2.12,需要2.15.所以需要升级glibc

    • 使用yum更新安装依赖

      sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
      
    • 下载rpm包

    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
    wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
    
    • 安装rpm包

      sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
      
    • 安装完毕后再查看glibc版本,发现glibc版本已经到2.17了

      strings /lib64/libc.so.6 | grep GLIBC
      

    3. 安装RabbitMQ

    # 安装
    rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
    # 安装
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    
    

    启动rabbitMQ

    在这里插入图片描述

    4. 开启管理界面及配置

    # 开启管理界面
    rabbitmq-plugins enable rabbitmq_management
    # 修改默认配置信息
    vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 
    # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
    

    5. 启动

    service rabbitmq-server start # 启动服务
    service rabbitmq-server stop # 停止服务
    service rabbitmq-server restart # 重启服务
    

    在这里插入图片描述

    6.访问

    192.168.101.22是本机虚拟机ip地址

    在这里插入图片描述登录
    输入用户名:guest 密码:guest

    安装成功后显示如下
    在这里插入图片描述

    6. 配置虚拟主机及用户

    6.1. 用户角色

    RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:

    创建用户heima 密码也是heima
    在这里插入图片描述角色说明

    1、 超级管理员(administrator)

    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    2、 监控者(monitoring)

    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    3、 策略制定者(policymaker)

    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

    4、 普通管理者(management)

    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

    5、 其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

    6.2. Virtual Hosts配置

    像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

    6.2.1. 创建Virtual Hosts

    在这里插入图片描述

    6.2.2. 设置Virtual Hosts权限

    在这里插入图片描述
    在这里插入图片描述
    没有配置config文件
    在这里插入图片描述- 设置配置文件

    cd /usr/share/doc/rabbitmq-server-3.6.5/
    cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    service rabbitmq-server restart
    

    在这里插入图片描述
    节点已配置

    在这里插入图片描述
    在这里插入图片描述

    3. RabbitMQ 快速入门

    3.1 入门程序

    需求:使用简单模式完成消息传递

    步骤:
    在这里插入图片描述

    1. 创建工程

    目录如下:
    在这里插入图片描述

    2.分别添加依赖

    pom.xml (这里生产者和消费者的依赖是一样的)

    <!--rabbitmq java客户端依赖-->
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.6.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    3.编写生产者发送消息

    package com.itheima.producer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //简单模式
    //发送消息
    public class Producer_HelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
          //1.创建连接工厂
          ConnectionFactory factory=new ConnectionFactory();
          //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
           //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue 暂时不用交换机
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建
            channel.queueDeclare("hello_world",true,false,false,null);
          //6.发送消息
            /*
            String exchange, String routingKey, boolean mandatory, byte[] body
            参数:
                1. exchange:交换机名称,简单模式下交换机会使用默认的 ""
                2. routingKey: 路由名称
                3. props: 配置信息
                4. body: 发送消息数据
             */
            String body="hello rabbitmq----";
            channel.basicPublish("","hello_world",null,body.getBytes());
            //7.释放资源
            /*channel.close();
            connection.close();*/
    
        }
    }
    
    

    4.编写消费者接收消息

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_HelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue 暂时不用交换机
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建
            channel.queueDeclare("hello_world",true,false,false,null);
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);
                   System.out.println("body:"+new String(body));//byte数组转string字符串
               }
           };
            channel.basicConsume("hello_world",true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    效果如下:

    在这里插入图片描述
    消费者控制台成功收到消息

    在这里插入图片描述

    3.2 小结

    上述的入门案例中其实使用的是如下的简单模式:

    在这里插入图片描述
    在上图的模型中,有以下概念:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接收者,会一直等待消息到来
    • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

    4. RabbitMQ 的工作模式

    4.1 Work queues 工作队列模式

    1. 模式说明

    在这里插入图片描述

    • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端多个消费端共同消费同一个队列中的消息。(队列存在竞争关系,可以理解为抢红包
    • 应用场景:对于任务过重任务较多情况使用工作队列可以提高任务处理的速度。

    2. 代码编写

    Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。

    Producer_WorkQueues

    package com.itheima.producer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //简单模式
    //发送消息
    public class Producer_WorkQueues {
        public static void main(String[] args) throws IOException, TimeoutException {
          //1.创建连接工厂
          ConnectionFactory factory=new ConnectionFactory();
          //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
           //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue 暂时不用交换机
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
          //6.发送消息
            /*
            String exchange, String routingKey, boolean mandatory, byte[] body
            参数:
                1. exchange:交换机名称,简单模式下交换机会使用默认的 ""
                2. routingKey: 路由名称
                3. props: 配置信息
                4. body: 发送消息数据
             */
            for (int i = 1; i <=10; i++) {
                String body=i+"hello rabbitmq----";
                channel.basicPublish("","work_queues",null,body.getBytes());
            }
    
            //7.释放资源
            /*channel.close();
            connection.close();*/
    
        }
    }
    
    

    Consumer_WorkQueues1

    package com.itheima.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_WorkQueues1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue 暂时不用交换机
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
               }
           };
            channel.basicConsume("work_queues",true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    Consumer_WorkQueues2

    package com.itheima.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_WorkQueues2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue 暂时不用交换机
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            //如果没有一个名字为hello_world的队列,则会创建队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
               }
           };
            channel.basicConsume("work_queues",true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    3. 测试

    注意:先运行消费者1和消费者2,再运行生产者

    结果如下:消费者1消费消息序号 1,3,5,7,9
    消费者2消费消息序号 2,4,6,8,10

    在这里插入图片描述
    在这里插入图片描述

    4. 小结

    1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
    2. Work Queues 对于任务过重任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可

    4.2 Pub/Sub 订阅模式

    1. 模式说明

    在这里插入图片描述
    在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

    • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • C:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      1. Fanout:广播,将消息交给所有绑定到交换机的队列
      2. Direct:定向,把消息交给符合指定routing key 的队列
      3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

    2. 代码编写

    生产者Producer_PubSub

    package com.itheima.producer;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //发布订阅模式
    //发送消息
    public class Producer_PubSub {
        public static void main(String[] args) throws IOException, TimeoutException {
          //1.创建连接工厂
          ConnectionFactory factory=new ConnectionFactory();
          //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
           //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建交换机
            /*
            exchangeDeclare
            (String exchange,  交换机名称
            BuiltinExchangeType type,  交换机类型 枚举类型
                     DIRECT("direct"), :定向
                     FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列
                     TOPIC("topic"),:通配符方式
                     HEADERS("headers");参数匹配
            boolean durable, 是否持久化
            boolean autoDelete, 是否自动删除
            boolean internal, 内部使用一般false
            Map<String, Object> arguments) 参数列表
             */
            String exchangeName="test_fanout";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6.创建两个队列
            String queue1Name="test_fanout_queue1";
            String queue2Name="test_fanout_queue2";
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7,绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1.queue: 队列名称
                2.exchange:交换机
                3.routingKey:路由键:绑定规则
                    如果交换机的类型为fanout,routingKey设置为""
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
            //8.发送消息
            String body="日志信息:张三调用了findAll方法...日志级别:info";
            channel.basicPublish(exchangeName,"",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
        }
    }
    
    

    Consumer_PubSub1

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_fanout_queue1";
            String queue2Name="test_fanout_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息打印到控制台");
               }
           };
            channel.basicConsume(queue1Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    Consumer_PubSub2

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_fanout_queue1";
            String queue2Name="test_fanout_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息保存数据库");
               }
           };
            channel.basicConsume(queue2Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    运行结果:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    3. 小结

    1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到

    2. 发布订阅模式与工作队列模式的区别:

      工作队列模式不用定义交换机而发布/订阅模式需要定义交换机

      发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)

      发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

    4.3 Routing 路由模式

    1. 模式说明:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
    • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

    图解:
    在这里插入图片描述

    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

    2. 代码编写

    Producer_Routing

    package com.itheima.producer;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //Routing
    //发送消息
    public class Producer_Routing {
        public static void main(String[] args) throws IOException, TimeoutException {
          //1.创建连接工厂
          ConnectionFactory factory=new ConnectionFactory();
          //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
           //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建交换机
            /*
            exchangeDeclare
            (String exchange,  交换机名称
            BuiltinExchangeType type,  交换机类型 枚举类型
                     DIRECT("direct"), :定向
                     FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列
                     TOPIC("topic"),:通配符方式
                     HEADERS("headers");参数匹配
            boolean durable, 是否持久化
            boolean autoDelete, 是否自动删除
            boolean internal, 内部使用一般false
            Map<String, Object> arguments) 参数列表
             */
            String exchangeName="test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
            //6.创建两个队列
            String queue1Name="test_direct_queue1";
            String queue2Name="test_direct_queue2";
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7,绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1.queue: 队列名称
                2.exchange:交换机
                3.routingKey:路由键:绑定规则
                    如果交换机的类型为fanout,routingKey设置为""
             */
            //队列1的绑定 error
            channel.queueBind(queue1Name,exchangeName,"error");
            //队列2的绑定 info,error,warning
            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"error");
            channel.queueBind(queue2Name,exchangeName,"warning");
            //8.发送消息
            String body="日志信息:张三调用了delete方法出错误了...日志级别:error";
            channel.basicPublish(exchangeName,"error",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
    
    
        }
    }
    
    

    Consumer_Routing1

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Routing1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_direct_queue1";
            String queue2Name="test_direct_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息打印到控制台");
               }
           };
            channel.basicConsume(queue2Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    Consumer_Routing2

    package com.itheima.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Routing2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_direct_queue1";
            String queue2Name="test_direct_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息存储到数据库");
               }
           };
            channel.basicConsume(queue1Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    运行结果

    在这里插入图片描述

    在这里插入图片描述

    3. 小结

    • Routing 模式:要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

    4.4 Topics 通配符模式

    1. 模式说明

    • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic
      类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
    • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者
    • item.insert,item.* 只能匹配 item.insert

    在这里插入图片描述图解:

    • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
    • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
    • *代表一个单词,#代表0个或者多个单词

    2.代码实现

    Producer_Topics

    package com.itheima.producer;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    //发送消息
    public class Producer_Topics {
        public static void main(String[] args) throws IOException, TimeoutException {
          //1.创建连接工厂
          ConnectionFactory factory=new ConnectionFactory();
          //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
           //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建交换机
            /*
            exchangeDeclare
            (String exchange,  交换机名称
            BuiltinExchangeType type,  交换机类型 枚举类型
                     DIRECT("direct"), :定向
                     FANOUT("fanout"), :扇形(广播),发送消息到每一个与之绑定的队列
                     TOPIC("topic"),:通配符方式
                     HEADERS("headers");参数匹配
            boolean durable, 是否持久化
            boolean autoDelete, 是否自动删除
            boolean internal, 内部使用一般false
            Map<String, Object> arguments) 参数列表
             */
            String exchangeName="test_topic";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
            //6.创建两个队列
            String queue1Name="test_topic_queue1";
            String queue2Name="test_topic_queue2";
            /*
            String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
            1.queue 队列名称
            2.durable 是否持久化,当mq重启之后,还在
            3.exclusive:
                 *是否独占:只能有一个消费者监听队列
                 *当connection关闭时,是否删除队列
            4.autoDelete: 是否自动删除,当没有consumer时,自动删除掉
            5.arguments: 参数
             */
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7,绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1.queue: 队列名称
                2.exchange:交换机
                3.routingKey:路由键:绑定规则
                    如果交换机的类型为fanout,routingKey设置为""
             */
            //routing key:系统的名称,日志级别
            //--需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
            channel.queueBind(queue1Name,exchangeName,"#.error");
            channel.queueBind(queue1Name,exchangeName,"order.*");
            channel.queueBind(queue2Name,exchangeName,"*.*");
            //8.发送消息
            String body="日志信息:张三调用了findAll方法...日志级别:info";
            channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
    
    
        }
    }
    
    

    Consumer_Topic1

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Topic1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_topic_queue1";
            String queue2Name="test_topic_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息存入数据库");
               }
           };
            channel.basicConsume(queue1Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    Consumer_Topic2

    package com.itheima.consumer;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Topic2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.101.22");//ip地址 默认localhost
            factory.setPort(5672);//端口 默认值5672
            factory.setVirtualHost("/itcast");//虚拟机默认值 /虚拟机
            factory.setUsername("heima");//用户名 默认guest
            factory.setPassword("heima");//密码 默认guest
            //3.创建连接 connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.队列已经声明一次了,这里不用再声明队列
            String queue1Name="test_topic_queue1";
            String queue2Name="test_topic_queue2";
            //6.接收消息
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck: 是否自动确认
                3. callback: 回调对象
             */
           Consumer consumer=new DefaultConsumer(channel){
               /*
                   回调方法,当收到消息后,会自动执行该方法
                        1.consumerTag:标识
                        2.envelope:获取一些信息。交换机,路由key...
                        3.properties: 配置信息
                        4.body: 数据
                */
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /* System.out.println("consumerTag:"+consumerTag);
                   System.out.println("exchange:"+envelope.getExchange());
                   System.out.println("RoutingKey:"+envelope.getRoutingKey());
                   System.out.println("properties:"+properties);*/
                   System.out.println("body:"+new String(body));//byte数组转string字符串
                   System.out.println("将日志信息打印到控制台");
               }
           };
            channel.basicConsume(queue2Name,true,consumer);
            //关闭资源?不要,因为消费者要监听
        }
    }
    
    

    3.测试

    在这里插入图片描述在这里插入图片描述

    4. 小结

    • Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

    4.5 工作模式总结

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    5. Spring 整合RabbitMQ

    5.1 Spring 整合 RabbitMQ

    需求:使用 Spring 整合 RabbitMQ

    在这里插入图片描述

    生产者

    在这里插入图片描述
    pom.xml 生产者跟消费者的pom依赖是一样的

     <dependencies>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>5.1.7.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.1.8.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>5.1.7.RELEASE</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    rabbitmq.properties

    rabbitmq.host=192.168.101.22
    rabbitmq.port=5672
    rabbitmq.username=heima
    rabbitmq.password=heima
    rabbitmq.virtual-host=/itcast
    

    spring-rabbitmq-producer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加载配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"/>
        <!--定义管理交换机、队列-->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
        默认交换机类型为direct,名字为:"",路由键为队列的名称
        -->
        <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
        <!--定义广播交换机中的持久化队列,不存在则自动创建-->
        <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
    
        <!--定义广播交换机中的持久化队列,不存在则自动创建-->
        <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
    
        <!--定义广播类型交换机;并绑定上述两个队列-->
        <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
            <rabbit:bindings>
                <rabbit:binding queue="spring_fanout_queue_1"/>
                <rabbit:binding queue="spring_fanout_queue_2"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
        <!--定义广播交换机中的持久化队列,不存在则自动创建-->
        <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
        <!--定义广播交换机中的持久化队列,不存在则自动创建-->
        <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
        <!--定义广播交换机中的持久化队列,不存在则自动创建-->
        <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
    
        <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
            <rabbit:bindings>
                <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
                <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
                <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    </beans>
    

    ProducersTest

    package com.itheima;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducersTest {
        //1.注入rabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //简单模式
        @Test
        public void testHelloWorld(){
          //2.发送消息
            rabbitTemplate.convertAndSend("spring_queue","hello world spring...");
        }
        //发送fanout消息
        @Test
        public void testFanout(){
            //2.发送消息
            rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout...");
        }
    
        //发送topics消息
        @Test
        public void testTopic(){
            //2.发送消息
            rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic...");
        }
    }
    
    

    测试:分别运行测试方法,结果如下图

    在这里插入图片描述

    消费者

    在这里插入图片描述pom.xml与生产者一致

    rabbitmq.properties

    rabbitmq.host=192.168.101.22
    rabbitmq.port=5672
    rabbitmq.username=heima
    rabbitmq.password=heima
    rabbitmq.virtual-host=/itcast
    

    spring-rabbitmq-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加载配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"/>
    
        <bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
    <!--    <bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
        <bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
        <bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
        <bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
        <bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>-->
    
        <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
            <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
          <!--  <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
            <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
            <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
            <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
            <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
        </rabbit:listener-container>
    </beans>
    

    SpringQueueListener

    package com.itheima.rabbitmq.listener;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class SpringQueueListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            //打印消息
            System.out.println(new String(message.getBody()));
        }
    }
    
    

    ConsumerTest

    package com.itheima;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
    public class ConsumerTest {
    
        @Test
        public void test(){
          boolean flag=true;
          while(true){
    
          }
        }
    }
    
    

    测试:

    在这里插入图片描述
    队列参数简介

    <!--
            id: bean的名称
            name:quque名称
            auto-declare:自动创建
            auto-delete: 自动删除,最后一个消费者和该队列断开连接后,自动删除队列
            durable:是否持久化
            exclusive:是否独占
        -->
        <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
    

    direct exchange补充

    在这里插入图片描述

    5.2 小结

    • 使用 Spring 整合 RabbitMQ 将组件全部使用配置方式实现,简化编码
    • Spring 提供 RabbitTemplate 简化发送消息 API
    • 使用监听机制简化消费者编码

    6. SpringBoot 整合 RabbitMQ

    生产端

    1. 创建生产者SpringBoot工程
    2. 引入start,依赖坐标
    <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    1. 编写yml配置,基本信息配置
    2. 定义交换机,队列以及绑定关系的配置类
    3. 注入RabbitTemplate,调用方法,完成消息发送

    在这里插入图片描述pom.xml

    <!--
            父工程依赖
            -->
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
        </dependencies>
    

    RabbitMQConfig

    package com.itheima.rabbitmq.config;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME="boot_topic_exchange";
        public static final String QUEUE_NAME="boot_quque";
    
        //1.交换机
        @Bean("bootExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
        //2.队列 Queue
        @Bean("bootQueue")
        public Queue bootQueue(){
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
        //3.队列和交换机的绑定关系 binding
        /*
          1.知道哪个队列
          2.知道哪个交换机
          3.routing key
         */
        @Bean
        public Binding bindQueueExange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    }
    
    

    ProducerApplication

    package com.itheima;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ProducerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ProducerApplication.class);
        }
    }
    
    

    application.yml

    # 配置rabbitmq的基本信息 IP端口 username passwordspring:
    spring:
      rabbitmq:
        host: 192.168.101.22
        port: 5672
        virtual-host: /itcast
        username: heima
        password: heima
    
    

    ProducerTest

    package com.itheima.test;
    import com.itheima.rabbitmq.config.RabbitMQConfig;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class ProducerTest {
       //1.注入rabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSend(){
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello---");
        }
    
    }
    
    

    测试:运行Junit Test

    在这里插入图片描述
    在这里插入图片描述

    消费端

    1. 创建消费者SpringBoot工程
    2. 引入start,依赖坐标
            <dependency>
                		<groupId>org.springframework.boot</groupId>
                		<artifactId>spring-boot-starter-amqp</artifactId>
         	</dependency>
    
    
    1. 编写yml配置,基本信息配置
    2. 定义监听类,使用@RabbitListener注解完成队列监听。

    在这里插入图片描述ConsumerSpringbootApplication

    package com.itheima;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ConsumerSpringbootApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(ConsumerSpringbootApplication.class, args);
    	}
    
    }
    
    

    RabbitMQListener

    package com.itheima;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQListener {
        @RabbitListener(queues = "boot_quque")
        public void ListenerQueue(Message message){
            System.out.println(new String(message.getBody()));
        }
    }
    
    

    application.yml

    # 配置rabbitmq的基本信息 IP端口 username passwordspring:
    spring:
      rabbitmq:
        host: 192.168.101.22
        port: 5672
        virtual-host: /itcast
        username: heima
        password: heima
    
    
    

    pom.xml

      <!--
            父工程依赖
            -->
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
        </dependencies>
    

    运行ConsumerSpringbootApplication, 消费者拿到消息

    在这里插入图片描述

    小结

    • SpringBoot提供了快速整合RabbitMQ的方式
    • 基本信息在yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
    • 生产端直接注入RabbitTemplate完成消息发送
    • 消费端直接使用@RabbitListener完成消息接收
    展开全文
  • 前言 Spring如何解决的循环依赖,是近两年流行起来的一道Java面试题。其实笔者本人对这类框架源码题还是持一定的怀疑态度的。如果笔者作为面试官,可能会问一些诸如“如果注入的属性为null,你会从哪几个方向去排查...
  • RabbitMQ实战指南.pdf

    2021-02-26 10:04:45
    JA ( 工业技术->自动化技术、计算机技术->计算技术、计算机技术->计算机软件 ) 内容提要:《RabbitMQ实战指南》从消息中间件的概念和RabbitMQ的历史切入,主要阐述RabbitMQ的安装、使用、配置、管理、运维、原理、...
  • RabbitMQ实战指南

    2021-04-07 20:56:37
    RabbitMQ实战指南 ISBN: 978-7-121-32991-3 推荐指数: ★★★★★ 作者:朱忠华 阅读时间: 2021-04-07 页数: 335 从入门到深入, 简单易懂又直观, 实例很多,动手性很强, 值得推荐. 从安装,到基本知识讲解, 原理性...
  • RabbitMQ介绍 RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的...
  • RabbitMQ实战经验分享

    2020-12-30 20:51:32
    下面分享下RabbitMQ实战经验,希望对大家有所帮助:一、生产消息关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。先上发消息的代码private bool MarkErrorSend(string[] lstMsg){try...
  • RabbitMQ 实战教程

    2021-08-27 22:30:59
    RabbitMQ 实战教程 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,...
  • Spring Boot整合RabbitMQ实战 本篇文章将带你了解Rabbitmq,work模型,发布订阅模型,topic模型,生产者confirm消息确认机制,消费者确认机制,return消息机制,TTL队列,死信队列等相关操作 在springboot 中引入...
  • RabbitMQ
  • rabbitmq实战指南

    2018-12-09 16:37:00
    rabbitmq-plugins enable rabbitmq_federation_management rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_shovel_management 集群种erlang和rabbitmq的版本要一致 shovel和...
  • 在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ、Kafka、Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理。 Broker的核心功能 Broker的核心功能是...
  • RabbitMQ实战教程

    千次阅读 2020-07-27 19:11:44
    RabbitMQ实战教程 1. MQ引言 1.1 什么是MQ MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,...
  • RabbitMq(3)之实战

    万次阅读 2022-02-17 15:41:36
    实战场景 说明:订单服务进行创建订单后,进行状态变更为对应的下单状态,向延时队列中发送数据在5分钟之内没有进行付款的订单进行超时操作;订单状态入库之后再调用商品库存服务进行对应的商品扣减,商品扣减完成...
  • RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成∶ 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、...
  • PHP操作rabbitmq实战

    2020-05-28 18:11:24
    PHP操作rabbitmq实战     随着近几年来消息队列越来越多的被各大企业应用到业务场景中,rabbitmq也成为了一款被广大码农所喜爱的消息中间件产品。rabbitmq是基于amqp协议实现的,且与redis内置的队列相比在消费...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,989
精华内容 8,795
关键字:

rabbitmq实战