精华内容
下载资源
问答
  • rabbitmq几种工作模式
    2022-05-28 16:57:31

    rabbitmq工作模式

    rabbitmq工作模式图

    1. 无exchange交换机:
      a. hello world简单模式:1个生产者,1个队列,一个消费者
      b. work queues工作队列模式:1个生产者,1个队列,多个消费者;
      当消息生产者发送消息过多时,多个消费者接收可以缓解消息堆积。
      分为:
      ① 轮询工作队列:消息被轮流发送到各个消费者
      ②公平(能者多劳)工作队列:消费消息能力强的消费者,优先接收更多的消息

    2. 有交换机:
      2.1 无路由键key:
      c. pub/sub发布订阅模式:1个生产者,1个交换机,多个队列,多个消费者;
      同一条消息,可以被全部的消费者接收消费
      2.2 有路由键key:
      d. routing路由模式:1个生产者,1个交换机,多个队列,多个消费者;
      同一条消息,可以根据路由键key,被发送到不同的队列,进而被不同消费者消费;
      路由键key是具体的字符,没有通配符或#
      e. topic主题模式:1个生产者,1个交换机,多个队列,多个消费者;
      同一条消息,根据路由键key,被发送到不同队列,进而被不同消费者消费;
      路由键key可以使用通配符: 和#、
      *。

    更多相关内容
  • RabbitMQ官方地址:http://www.rabbitmq.com/RabbitMQ提供了6种模式:官网对应模式介绍:https://www.rabbitmq.com/getstarted.htmlAMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放...

    RabbitMQ六种工作模式

    RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

    RabbitMQ官方地址:http://www.rabbitmq.com/

    RabbitMQ提供了6种模式

    • 简单模式
    • work模式
    • Publish/Subscribe发布与订阅模式
    • Routing路由模式
    • Topics主题模式
    • RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)

    官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

    相关概念介绍

    AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    RabbitMQ是AMQP协议的Erlang的实现。

    概念说明
    连接Connection一个网络连接,比如TCP/IP套接字连接。
    信道Channel多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
    客户端ClientAMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
    服务节点Broker消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
    端点AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
    消费者Consumer一个从消息队列里请求消息的客户端程序。
    生产者Producer一个向交换机发布消息的客户端应用程序。

    环境搭建

    创建工程

    首先创建一个空工程,作为父工程。

    image-20220531155654075

    在父工程下创建一个生产者工程项目mq_simple_publisher

    image-20220531160627889

    在父工程下创建一个生产者工程项目mq_simple_consumer

    image-20220531160647948

    添加依赖

    往两个项目的pom.xml文件中添加依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.2</version>
    </dependency>
    

    1️⃣ 创建工程(生产者、消费者)

    2️⃣ 添加依赖

    3️⃣ 编写生产者发送信息

    4️⃣ 编写消费者接收信息

    1)Hello World简单消息模式

    模式说明

    image-20220531155900300

    在上图的模型中,有以下概念:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    编写代码

    生产者

    package com.soberw.simple;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * @author soberw
     * @Classname Publisher
     * @Description 生产者:向消息中间件上发送消息数据的应用程序
     * @Date 2022-05-31 15:19
     */
    public class Publisher {
    
        public static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
    
            //1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置mq服务器连接信息
            //设置虚拟主机名称   默认是  /
            factory.setVirtualHost("/");
            //设置mq服务器连接地址  默认是 localhost
            factory.setHost("192.168.6.200");
            //设置连接用户名  默认为  guest
            factory.setUsername("soberw");
            //设置密码  默认为 guest
            factory.setPassword("123456");
            //设置连接端口  默认为 5672
            factory.setPort(5672);
    
            //2、创建连接
            Connection connection = factory.newConnection();
    
            //3、在连接上创建频道(信道),信道相当于一个逻辑上的连接。
            // 为了提高系统性能,不是每次都连接或者关闭connection
            // 而是每次都打开或者关闭信道
            Channel channel = connection.createChannel();
    
            //4、声明(创建)队列
    
            /*
             * queue 参数1:队列名称
             * durable 参数2:是否定义持久化队列,当mq重启之后,是否还在
             * exclusive 参数3:是否独占本次连接(是否为排他队列),这个队列是否只限于当前连接使用。如果连接关闭,那么队列自动被删除
             * ① 是否独占,只能有一个消费者监听这个队列
             * ② 当connection关闭时,是否删除队列
             * autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除。队列长时间没有使用,服务会自动删除队列
             * arguments 参数5:队列其它参数,队列的一些属性。例如:队列超时时间、队列连接长度等...
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            //5、发送信息
            String message = "Hello,RabbitMQ!";
            /*
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchange,简单模式下默认为“”即可
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:配置信息,消息属性信息
             * 参数4:消息内容
             */
            AMQP.BasicProperties props = new AMQP.BasicProperties();
            //构建之后一定要赋值给一个新的对象,要不然无效
            AMQP.BasicProperties build = props.builder().appId("app01").userId("soberw").messageId("msg01").build();
            channel.basicPublish("", QUEUE_NAME, build, message.getBytes());
            System.out.println("已经发送的信息:" + message);
    
            //6、关闭资源
            channel.close();
            connection.close();
        }
    }
    

    运行程序:http://192.168.6.200:15672

    在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:

    image-20220531161632553

    image-20220531161701052

    image-20220531161732176

    消费者

    package com.soberw.simple;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author soberw
     * @Classname Consumer
     * @Description 从消息中间件上获取消息,并处理消息的应用程序
     * @Date 2022-05-31 16:18
     */
    public class Consumer {
    
        public static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1、创建连接
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置mq服务器连接信息
            //设置虚拟主机名称   默认是  /
            factory.setVirtualHost("/");
            //设置mq服务器连接地址  默认是 localhost
            factory.setHost("192.168.6.200");
            //设置连接用户名  默认为  guest
            factory.setUsername("soberw");
            //设置密码  默认为 guest
            factory.setPassword("123456");
            //设置连接端口  默认为 5672
            factory.setPort(5672);
    
            //2、创建连接
            Connection connection = factory.newConnection();
    
            //3、在连接上创建频道(信道)
            Channel channel = connection.createChannel();
    
            //4、声明(创建)队列(可以省略)
            // 原则上消费者是可以省略不写的,但是考虑到容错性的问题,建议写上
            // 因为如果生产者还没有进入队列的时候,如果消费者先进入队列,则会报错
            // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            //5、获取并处理消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 回调方法,当收到消息后,会自动执行该方法
                 * @param consumerTag  标识
                 * @param envelope  获取一些信息,交换机,路由key。。。
                 * @param properties 配置信息
                 * @param body 数据体
                 * @throws IOException  io异常
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumerTag = " + consumerTag);
                    System.out.println("DeliveryTag = " + envelope.getDeliveryTag()); //消息唯一标识。类似于ID
                    System.out.println("Exchange = " + envelope.getExchange());  //获取消息是从那个交换机过来的
                    System.out.println("RoutingKey = " + envelope.getRoutingKey());  //获取队列绑定交换机的路由 key
                    System.out.println("properties = " + properties);
                    System.out.println("body = " + new String(body));
                    //下面就可以进行一些操作,例如:将信息保存到数据库,给用户发消息,发短息,记录日志等
                }
            };
            /*
             * 消费者类似一个监听程序,主要是用来监听消息
             * 参数:
             * 1、queue: 队列名称
             * 2、autoAck:是否自动确认,类似于发短息的时候,发送成功手机会收到一个确认信息
             * 3、callback:回调对象
             */
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            //6、关闭资源
    //        channel.close();
    //        connection.close();
        }
    }
    

    image-20220531191643536

    取出消息并显示,注意此时并未关闭连接,表示消费者亦然处于开启状态,此时如果生产者再次发送消息,依然可以接受到:

    image-20220531191904233

    image-20220531191925106

    当消息被从消息队列中取出后,不管连接是否关闭,消息队列中对应的消息就不存在了:

    image-20220531193120217

    image-20220531193141422

    RabbitMQ执行流程总结

    在上面案例中:

    • 生产者发送消息
    1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
    2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
    3. 将路由键(空字符串)与队列绑定起来;
    4. 发送消息至RabbitMQ Broker;
    5. 关闭信道;
    6. 关闭连接;
    • 消费者接收消息
    1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
    2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
    3. 等待Broker投递响应队列中的消息,消费者接收消息;
    4. 确认(ack,自动确认)接收到的消息;
    5. RabbitMQ从队列中删除相应已经被确认的消息;
    6. 关闭信道;
    7. 关闭连接;

    抽取工具类

    由于下面对每个模式的测试都大致要遵循此流程,只是涉及到具体的操作方法会有所不同,因此这里可以对建立连接进行一个抽取,抽取出来一个工具类:

    package com.soberw.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author soberw
     * @Classname ConnectionUtil
     * @Description 连接抽取出来的工具类
     * @Date 2022-05-31 20:45
     */
    public class ConnectionUtil {
        public static Connection getConnection() throws IOException, TimeoutException {
            //定义连接工厂
    
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置服务地址
            factory.setHost("192.168.6.200");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/");
            factory.setUsername("soberw");
            factory.setPassword("123456");
    
            // 通过工程获取连接
            return factory.newConnection();
        }
    
        //测试一下
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = getConnection();
            //connection = amqp://soberw@192.168.6.200:5672/
            System.out.println("connection = " + connection);
        }
    }
    

    image-20220531204953633

    2)Work queues工作队列模式

    模式说明

    image-20220531203935070

    Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

    工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。

    应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

    编写代码

    因为是多个消费者共同消费同一个生产者的消息,因此创建多一个消费者。

    生产者

    package com.soberw.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.soberw.util.ConnectionUtil;
    
    /**
     * @author soberw
     * @Classname Publisher
     * @Description  work消息模型:多个消费者共同消费一个队列的消息。目的:提高消息处理速度
     * @Date 2022-05-31 20:57
     */
    public class Publisher {
        /**
         * 模拟生产者生产速度快
         */
        static final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            for (int i = 1; i <= 10; i++) {
                String body = i+"hello rabbitmq~~~";
                channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
            }
            channel.close();
            connection.close();
        }
    
    }
    
    

    image-20220531210105102

    公平分发

    默认测试

    按照上面的入门案例,编写两个消费者代码:

    消费者1
    package com.soberw.work;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname consumer1
     * @Description
     * @Date 2022-05-31 21:29
     */
    public class Consumer1 {
        static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body1:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }
    
    消费者2
    package com.soberw.work;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname Consumer2
     * @Description
     * @Date 2022-05-31 21:30
     */
    public class Consumer2 {
        static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body2:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    测试

    先开启两个消费者程序,然后运行生产者程序:

    image-20220531213708438

    image-20220531213723348

    image-20220531213751334

    看似是公平竞争的关系?

    两个消费者都抢占到了5个消息。

    添加不公平场景

    下面我们设置一些不公平的场景,通过给两个消费者添加不同的休眠时间来设置。

    让消费者1在获取到消息后,休眠1秒钟;让消费者2在获取到消息后,休眠2秒钟。

    消费者1
    package com.soberw.work;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname consumer1
     * @Description
     * @Date 2022-05-31 21:29
     */
    public class Consumer1 {
        static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body1:" + new String(body));
                    //添加休眠时间
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }
    
    消费者2

    代码同消费者1,将休眠时间改为2秒。

    测试

    image-20220531214745464

    image-20220531214805242

    发现此时依然是公平的竞争关系,虽然设置了不同的消息处理时间,但是最后不同消费者还是会拿到同等的消息。

    手动确认消息

    上面的消息处理方式都是设置的自动确认

    image-20220531215538646

    这里需要改为手动确认

    因为是手动确认,所以需要我们在程序中添加确认接收的代码,即只有将消息处理无误后,才确认。

    消费者1
    package com.soberw.work;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname consumer1
     * @Description
     * @Date 2022-05-31 21:29
     */
    public class Consumer1 {
        static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body1:" + new String(body));
                    //添加休眠时间
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //在获取到消息后,可以进行一些操作,例如数据库操作、发送邮件、记录日志等...
                    /*
                     * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息
                     *
                     * 第一个参数:添加手动确认消息的唯一标识
                     * 第二个参数:确认方式。是否一次抓取并确认多个消息
                     *   true:设置为true,即表示可以批量确认消息,这样可以挺高执行速度,当消息量大时,避免繁琐的处理,
                     *         但是队列会一次性将多个消息同时从队列中删除,也会导致系统的不稳定性,存在安全隐患
                     *   false:设置为false,即表示无论多少个消息,都需要一次一次的确认,此设置会可能影响处理的性能
                     *   实际中可根据不同场景进行设置...
                     */
                    channel.basicAck(envelope.getDeliveryTag(), false); //手动确认,业务逻辑没有问题后才确认
                }
            };
            //初始的自动确认
            //channel.basicConsume(QUEUE_NAME, true, consumer);
            //改为手动确认
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
    }
    
    消费者2

    与消费者1同理,改为手动确认。

    测试

    image-20220601092743479

    image-20220601092759741

    发现还是公平竞争的关系。

    不公平分发

    但这显然不符合所想,我们预期的结果是,处理消息快的消费者,应当分配多一点的消息,即按劳分配,能者多劳。

    概念

    如果采用上面的默认消息分发策略,消息是轮询发送的。但是消费者之间存在处理快慢问题,如果A处理慢,B处理快,他们接受同样数量的消息显然是不合理的。就是在这样情况下,不公平分发出现了,简而言之就是能者多劳,处理快的多处理,处理慢的少处理。

    如何实现不公平分发?

    这里涉及到一个概念:

    QoS

    当网络发生拥塞的时候,所有的数据流都有可能被丢弃;为满足用户对不同应用不同服务质量的要求,就需要网络能根据用户的要求分配和调度资源,对不同的数据流提供不同的服务质量:对实时性强且重要的数据报文优先处理;对于实时性不强的普通数据报文,提供较低的处理优先级,网络拥塞时甚至丢弃。QoS应运而生。支持QoS功能的设备,能够提供传输品质服务;针对某种类别的数据流,可以为它赋予某个级别的传输优先级,来标识它的相对重要性,并使用设备所提供的各种优先级转发策略、拥塞避免等机制为这些数据流提供特殊的传输服务。配置了QoS的网络环境,增加了网络性能的可预知性,并能够有效地分配网络带宽,更加合理地利用网络资源。

    预先抓取值

    在RabbitMQ中,我们可以设置预先抓取值来实现不公平分发:

    消费者1

    先设置抓取数量为 1

    package com.soberw.work;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname consumer1
     * @Description
     * @Date 2022-05-31 21:29
     */
    public class Consumer1 {
        static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            //设置预先抓取消息的数量,消费完成抓取数量后的消息后再来继续抓取,注意此时必须改为手动确认
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body1:" + new String(body));
                    //添加休眠时间
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //在获取到消息后,可以进行一些操作,例如数据库操作、发送邮件、记录日志等...
                    /*
                     * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息
                     *
                     * 第一个参数:添加手动确认消息的唯一标识
                     * 第二个参数:确认方式。是否一次抓取并确认多个消息
                     *   true:设置为true,即表示可以批量确认消息,这样可以挺高执行速度,当消息量大时,避免繁琐的处理,
                     *         但是队列会一次性将多个消息同时从队列中删除,也会导致系统的不稳定性,存在安全隐患
                     *   false:设置为false,即表示无论多少个消息,都需要一次一次的确认,此设置会可能影响处理的性能
                     *   实际中可根据不同场景进行设置...
                     */
                    channel.basicAck(envelope.getDeliveryTag(), true); //手动确认,业务逻辑没有问题后才确认
                }
            };
            //初始的自动确认
            //channel.basicConsume(QUEUE_NAME, true, consumer);
            //改为手动确认
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
    }
    
    消费者2

    同消费者1,设置抓取数量为 1 。

    断点测试

    为了直观的看到程序的流程,我们加上断点测试一下,加在确认消息的位置:

    image-20220601094533796

    生产者发送10条消息,消费者1和2都预先抓取了一条消息,并执行到确认消息处暂停:

    image-20220601094830199

    此时查看消息队列:

    image-20220601095233051

    存在两条为确认的消息,分别对应着两个消费者抓取的消息。

    此时先放行消费者1,即让消费者1确认消息:

    image-20220601095849398

    发现队列中的消息减少了一个,观察控制台:

    image-20220601095951441

    成功取出。

    此时删除断点,让两个消费者自然执行:

    image-20220601100409460

    image-20220601100423211

    我们发现,消费者1因为执行速度快,就得到了更多的消息。

    抓取值机制

    上面我们通过设置预先抓取值,可以实现按劳分配的不公平分发效果,设置的值为1,如果设置为2呢?

    image-20220601102000157

    依然断点启动,方便查看效果:

    image-20220601102130684

    两个消费者各自抓取了两个。

    此时放行消费者2:

    image-20220601102558939

    image-20220601102832493

    我们发现,消费者2成功取出一个消息,队列中删除了一个,但是显示未确认的消息数量还是4个,这与其执行机制有关,我们先放行:

    image-20220601103015069

    image-20220601103025160

    依然是消费者1执行的多。

    抓取值机制:

    • 我们给消费程序设置的预先抓取值,更像是一个分配阀值,当队列发现程序中设置的抓取值与实际数量不等时,如果队列中还有消息,就给其分配消息,直到与预先抓取值相同为止。
    • 这也就是为什么队列中始终有4个未确认消息的原因
    • 可以认为,队列分配消息的方式是渐进式的,而不是隔断式的
    • 因此,在同等执行速度下,预先抓取值设置的越大,则抓取的消息越多

    下面验证一下:

    将消费者1设置为2,消费者2设置为1,将他们的休眠时间都设置为1

    image-20220601103752009

    image-20220601103803279

    对比上面两组测试,得出结论:

    • 在同等执行效率的情况下,设置的预先抓取值越大,则抓取的消息越多
    • 在设置同等预先抓取值的情况下,执行效率越快,则抓取的消息越多

    小结

    1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
    2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

    订阅模式类型

    订阅模式示例图:

    前面2个案例中,只有3个角色:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • queue:消息队列,图中红色部分

    而在订阅模型中,多了一个exchange角色,而且过程略有变化:

    • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • C:消费者,消息的接受者,会一直等待消息到来。
    • Queue:消息队列,接收消息、缓存消息。
    • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定routing key 的队列
      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    3)Publish/Subscribe发布与订阅模式

    模式说明

    image-20220601105754929

    发布订阅模式:

    • 1、每个消费者监听自己的队列。
    • 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

    编写代码

    生产者

    需要创建交换机以进行消息转发:

    package com.soberw.fanout;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.soberw.util.ConnectionUtil;
    
    /**
     * @author soberw
     * @Classname Publisher
     * @Description Publish/Subscribe发布与订阅模式的生产者应用程序
     * @Date 2022-06-01 11:00
     */
    public class Publisher {
        public static void main(String[] args) throws Exception {
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            /*
             *exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
             *参数:
             * 1. exchange:交换机名称
             * 2. type:交换机类型
             *     DIRECT("direct"),:定向
             *     FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定的队列。
             *     TOPIC("topic"),通配符的方式,发送给符合通配符条件的队列
             *     HEADERS("headers");参数匹配
             * 3. durable:是否持久化
             * 4. autoDelete:自动删除
             * 5. internal:内部使用。 一般false
             * 6. arguments:参数
             */
            String exchangeName = "test_fanout";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
            //6. 创建队列
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name, true, false, false, null);
            channel.queueDeclare(queue2Name, true, false, false, null);
            //7. 绑定队列和交换机
            /*
             *queueBind(String queue, String exchange, String routingKey)
             *参数:
             *    1. queue:队列名称
             *    2. exchange:交换机名称
             *    3. routingKey:路由键,绑定规则
             *        如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(queue1Name, exchangeName, "");
            channel.queueBind(queue2Name, exchangeName, "");
    
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(exchangeName, "", null, body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
        }
    
    }
    

    image-20220601112241433

    消费者1

    package com.soberw.fanout;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname Consumer
     * @Description Publish/Subscribe发布与订阅模式的消费者应用程序
     * @Date 2022-06-01 11:00
     */
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_fanout_queue1";
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:" + new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name, true, consumer);
    
        }
    }
    

    消费者2

    将队列名称改为2号队列,其他一样。

    测试

    启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

    image-20220601112354826

    image-20220601112344215

    发现,两个消费者都成功取到了消息,并且情空了各自的队列:

    image-20220601112440948

    在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

    image-20220601112644055

    小结

    交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。、

    发布订阅模式与工作队列模式的区别

    • 1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
    • 2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
    • 3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。

    4)Routing路由模式

    模式说明

    路由模式特点:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息

    图解:

    • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
    • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    编写代码

    在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key

    生产者

    package com.soberw.routing;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.soberw.util.ConnectionUtil;
    
    /**
     * @author soberw
     * @Classname Publisher
     * @Description Routing路由模式的生产者应用程序
     * @Date 2022-06-01 12:55
     */
    public class Publisher {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //定义交换机名字
            String exchangeName = "test_direct";
            // 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
            // 创建队列
            String queue1Name = "test_direct_queue1";
            String queue2Name = "test_direct_queue2";
            // 声明(创建)队列
            channel.queueDeclare(queue1Name, true, false, false, null);
            channel.queueDeclare(queue2Name, true, false, false, null);
            // 队列绑定交换机
            // 队列1绑定error
            channel.queueBind(queue1Name, exchangeName, "error");
            // 队列2绑定info warning
            channel.queueBind(queue2Name, exchangeName, "info");
            channel.queueBind(queue2Name, exchangeName, "warning");
    
            String errorMessage = "日志信息:张三调用了delete方法.错误了,日志级别error...";
            String warningMessage = "日志信息:张三调用了delete方法.错误了,日志级别warning...";
            String infoMessage = "日志信息:张三调用了delete方法.错误了,日志级别info...";
            // 发送消息
            channel.basicPublish(exchangeName, "error", null, errorMessage.getBytes());
            channel.basicPublish(exchangeName, "warning", null, warningMessage.getBytes());
            channel.basicPublish(exchangeName, "info", null, infoMessage.getBytes());
    
            channel.close();
            connection.close();
        }
    
    }
    

    image-20220601141102579

    image-20220601141016150

    消费者1

    package com.soberw.routing;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname Consumer1
     * @Description Routing路由模式的消费者应用程序
     * @Date 2022-06-01 12:55
     */
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_direct_queue1";
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:" + new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name, true, consumer);
        }
    
    }
    

    消费者2

    消费者2从队列2中取消息。

    测试

    启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。

    image-20220601141321050

    image-20220601141330752

    小结

    Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

    5)Topics主题模式

    模式说明

    Topic主题模式也叫通配符模式。Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

    通配符规则:

    • #:匹配零个或多个词

    • *:匹配不多不少恰好1个词

    举例:

    item.#:能够匹配item.insert.abc 或者 item.insert

    item.*:只能匹配item.insert

    图解:

    • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
    • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

    编写代码

    使用topic类型的Exchange,发送不同消息的routing key。

    生产者

    package com.soberw.topic;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.soberw.util.ConnectionUtil;
    
    /**
     * @author soberw
     * @Classname Publisher
     * @Description Topic通配符模式的生产者程序
     * @Date 2022-06-01 14:51
     */
    public class Publisher {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String exchangeName = "test_topic";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
            String queue1Name = "test_topic_queue1";
            String queue2Name = "test_topic_queue2";
            channel.queueDeclare(queue1Name, true, false, false, null);
            channel.queueDeclare(queue2Name, true, false, false, null);
            // 绑定队列和交换机
            /*
             *  参数:
             *  1. queue:队列名称
             *  2. exchange:交换机名称
             *  3. routingKey:路由键,绑定规则
             *  如果交换机的类型为fanout ,routingKey设置为""
             */
            // routing key  系统的名称.日志的级别。
            //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
            channel.queueBind(queue1Name, exchangeName, "#.error");   //匹配零个或者多个以.error结束的词
            channel.queueBind(queue1Name, exchangeName, "order.*");   //匹配一个以order.开头的词
            channel.queueBind(queue2Name, exchangeName, "*.*");     //匹配两个以 . 分割的词
    
            //定义不同的routing key
            String key1 = "order.info.error";  //可被队列 1 匹配到
            String key2 = "order.info";   //可被队列 1  2 匹配到
            String key3 = "goods.info";   //可被队列  2 匹配到
            String key4 = "goods.error";  //可被队列  1  2  匹配到
            String body1 = "日志信息:张三通过" + key1 + "调用了方法...";
            String body2 = "日志信息:张三通过" + key2 + "调用了方法...";
            String body3 = "日志信息:张三通过" + key3 + "调用了方法...";
            String body4 = "日志信息:张三通过" + key4 + "调用了方法...";
            //发送消息
            channel.basicPublish(exchangeName, key1, null, body1.getBytes());
            channel.basicPublish(exchangeName, key2, null, body2.getBytes());
            channel.basicPublish(exchangeName, key3, null, body3.getBytes());
            channel.basicPublish(exchangeName, key4, null, body4.getBytes());
    
            channel.close();
            connection.close();
        }
    
    }
    

    image-20220601150756028

    image-20220601151130545

    消费者1

    package com.soberw.topic;
    
    import com.rabbitmq.client.*;
    import com.soberw.util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author soberw
     * @Classname Consumer1
     * @Description Topic通配符模式的消费者程序
     * @Date 2022-06-01 14:50
     */
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_topic_queue1";
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:" + new String(body));
                }
            };
            channel.basicConsume(queue1Name, true, consumer);
        }
    
    }
    

    消费者2

    同消费者1 一样,接收来自队列2 的消息。

    测试

    启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

    image-20220601151457025

    image-20220601151509922

    小结

    Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

    模式总结

    RabbitMQ工作模式:

    1、简单模式 HelloWorld

    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

    2、工作队列模式 Work Queue

    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

    3、发布订阅模式 Publish/subscribe

    需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

    4、路由模式 Routing

    需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

    5、通配符模式 Topic

    需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

    展开全文
  • 04. RabbitMQ几种工作模式

    千次阅读 2022-03-20 11:18:39
    04. RabbitMQ几种工作模式 姓名:程序员阿红 喜欢:Java编程 重要的事情说三遍!!! 欢迎大家关注哦,互相学习 欢迎大家访问哦,互相学习 欢迎大家收藏哦,互相学习 ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨...

    04. RabbitMQ的几种工作模式

    🍎姓名:程序员阿红🍎
    🍊喜欢:Java编程🍊
    🍉重要的事情说三遍!!!🍉
    🍓欢迎大家关注哦,互相学习🍓
    🍋欢迎大家访问哦,互相学习🍋
    🍑欢迎大家收藏哦,互相学习🍑

    ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨

    🍟🍟🍟RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种

    🍗🍗🍗在线手册:https://www.rabbitmq.com/getstarted.htm

    img

    5种消息模型,大体分为两类:

    • 1和2属于点对点
    • 3、4、5属于发布订阅模式(一对多)

    1. 模式概括

    1.1 点对点模式:

    P2P(point to point)模式包含三个角色:

    • 消息队列(queue),生产者(sender),消费者(receiver)

    • 每个消息发送到一个特定的队列中,消费者从中获得消息

    • 队列中保留这些消息,直到他们被消费或超时

    • 特点:

      1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了。
      2. 生产者和消费者之间没有依赖性,生产者只管生产消息,消费者只能消费消息。
      1. 接收者成功接收消息之后需向对象应答成功(确认|ACK机制)
    • 如果希望发送的每个消息都会被成功处理,那需要P2P

    1.2 发布订阅模式:

    publish(Pub)/subscribe(Sub):

    • pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者
      (subcriber)(没有队列是因为,队列只需要在消费者绑定路由即可)

    • 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者

    • 特点:

      1. 每个消息可以有多个订阅者
      2. 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅
        后,才能消费发布者的消息
      1. 为了消费消息,订阅者必须保持运行状态;
      2. 如果希望发送的消息被多个消费者处理,可采用本模式

    2. 模式分类

    2.1点对点模式-简单模式

    下面引用官网的一段介绍:
    Introduction
    RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as
    a post office: when you put the mail that you want posting in a post box, you can be sure
    that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy,
    RabbitMQ is a post box, a post office and a postman.
    译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要
    寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在
    这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
    RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
    类似邮局,处理信件的应该是收件人而不是邮局!

    • RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
    • 类似邮局,处理信件的应该是收件人而不是邮局!

    img

    生产者代码:

    package simplemode;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtil;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 06 16:39
     * 简单模式
     * 消息生产者
     */
    public class Sender {
        public static void main(String[] args) throws Exception {
            String msg ="学习rabbitMQ,ACK手动确认!";
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在连接中创建通道
            Channel channel = connection.createChannel();
            //3.创建消息队列(1、2、3、4、5)
            /**
             * 参数1:队列的名称
             * 参数2:队列中的数据是否持久化
             * 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
             * 参数4:是否自动删除(当队列连接数为0时,队列会销毁,不管队列是否存有保存数据)
             * 参数5:队列参数(没有参数为null)
             */
            channel.queueDeclare("queue1",false,false,false,null);
            //4.向指定的队列发送消息
            /**
             * 参数1:交换机名称,当前为简单模式,也就是p2p模式,没有交换机,所以名称为“”;
             * 参数2:目标队列的名称
             * 参数3:设置消息的属性(没有属性则为null)
             * 参数4:消息的内容(只接收字节数组)
             */
            channel.basicPublish("","queue1",null,msg.getBytes());
            System.out.println("发送:"+msg);
            //5.释放资源
            channel.close();
            connection.close();
    
        }
    }
    

    启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认

    img

    消费者代码:

    package simplemode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 06 17:00
     * 消息接收者
     */
    public class Recer {
        public static void main(String[] args)throws Exception{
            // 1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在连接中创建通道(信道)
            Channel channel =  connection.createChannel();
            // 3.从信道中获得信息
            DefaultConsumer consumer =  new DefaultConsumer(channel){
                @Override//交付处理(收件人信息,信封(包裹上的快递标签),协议的配置,消息)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的消息
                    String s= new String(body);
                    System.out.println("接收="+s);
                }
             };
            // 4.监听队列 true:自动消息确认
            channel.basicConsume("queue1",true,consumer);
    
    
        }
    }
    

    启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0

    img

    消息确认机制ACK:

    • 通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
    • RabbitMQ如何得知消息被消费者接收?
      1. 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
      2. 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
      1. ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200就是告诉我们服务器执行成功
      2. 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
      1. 不过这种回执ACK分为两种情况:
        1. 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
        2. 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
      1. 两种情况如何选择,需要看消息的重要性:
        1. 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
        2. 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把
        1. 消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
    • 修改手动消息确认
            // 4.监听队列 false:手动消息确认
            channel.basicConsume("queue1", false,consumer);
    
    • 结果如下

    img

    确认ACK代码如下:

    package simplemode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @BelongsProject: lagou-rabbitmq
     * @Author: GuoAn.Sun
     * @CreateTime: 2020-08-10 15:08
     * @Description: 消息接收者,加入ACK确认机制
     */
    public class RecerByACK {
        public static void main(String[] args) throws  Exception {
            // 1.获得连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.获得通道(信道)
            final Channel channel = connection.createChannel();
            // 3.从信道中获得消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body就是从队列中获取的消息
                    String s = new String(body);
                    System.out.println("接收 = " + s);
                    // 手动确认(收件人信息,是否同时确认多个消息)
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 4.监听队列 false:手动消息确认
            channel.basicConsume("queue1", false,consumer);
        }
    }
    

    ACK确认模式:

    *// 4.监听队列 false:手动消息确认,必须为手动消息确认,ACK确认机制才生效。
    *channel.basicConsume(“queue1”,fasle,consumer);

    img

    没执行channel.basicAck(envelope.getDeliveryTag(),false);该语句时消息是未被确认的。

    2.2 工作队列模式

    img

    • 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)

    • 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?

    • 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)

    生产者代码:

    package workmode;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtil;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 06 19:23
     * 工作模型
     */
    public class Sender {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("work_queue",false,false,false,null);
            for(int i = 1;i<=100;i++) {
                String msg = "羊肉串 --> " + i;
                channel.basicPublish("", "work_queue", null, msg.getBytes());
                System.out.println("新鲜出炉:" + msg);
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者1代码:

    package workmode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 06 21:22
     */
    public class Recer1 {
        static  int i =1;
        public static void main(String[] args)throws Exception{
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列、双重含义
            // 如果队列不存在,就创建;如果队列存在,则获取
            channel.queueDeclare("work_queue",false,false,false,null);
            channel.basicQos(1);//系统不会给消费者发送超过1个消息以上,只有确认ACK后,才回继续发下一个消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的消息
                    String s= new String(body);
                    System.out.println("【顾客1】吃掉 " + s+" ! 总共吃【"+i+++"】串!"+"信封收件人:"+envelope.getDeliveryTag());
                    //模拟网络延迟
                    try {
                        Thread.sleep(200);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    //手动确认(收件人信息,是否确认多个消息)
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 4.监听队列 false:手动消息确认
            channel.basicConsume("work_queue", false,consumer);
        }
    }
    

    消费者2代码:

    package workmode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @BelongsProject: lagou-rabbitmq
     * @Author: GuoAn.Sun
     * @CreateTime: 2020-08-10 15:08
     * @Description: 消息接收者,加入ACK确认机制
     */
    public class Recer2 {
        static  int i =1;
        public static void main(String[] args) throws  Exception {
            // 1.获得连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.获得通道(信道)
            final Channel channel = connection.createChannel();
            channel.queueDeclare("work_queue",false,false,false,null);
            channel.basicQos(1);
            // 3.从信道中获得消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body);
                    System.out.println("【顾客2】吃掉 " + s+" ! 总共吃【"+i+++"】串!");
                    // 模拟网络延迟
                    try{
                        Thread.sleep(900);
                    }catch (Exception e){
    
                    }
                    // 手动确认(收件人信息,是否同时确认多个消息)
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 4.监听队列 false:手动消息确认
            channel.basicConsume("work_queue", false,consumer);
        }
    }
    

    结果:

    生产者产生100个羊肉串

    img

    消费者1消费了81条消息

    img

    消费者2消费了19条消息

    img

    channel.basicQos:系统不会给消费者发送超过1个消息以上,只有确认ACK后,才回继续发下一个消息

    1. 不加channel.basicQos(1);
    • 不加channel.basicQos(1),消费者是相互竞争的关系,但是会把消息进行平分。
    • 例:1个生产者;2个消费者。生产者发送100条信息,消费者尽管消费速度不一样,这两个消费者还是会将这100条数据各分50条。
    1. 加channel.basicQos(1);
    • 遵循能者多劳机制,消费者是相互竞争的关系,但是不会把消息进行平分。消费速度快的会继续消费信息,直至所有消息被消费完。

    能者多劳必须要配合手动的ACK机制才生效:

    • 面试题:避免消息堆积?
    1. workqueue,多个消费者监听同一个队列

    2. 接收到消息后,通过线程池,异步消费

    2.3 发布订阅模式

    发布-订阅

    在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交

    付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。

    此模式称为“发布/订阅”。

    为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将

    发送日志消息,第二个将接收和打印它们。

    在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行

    一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。

    基本上,发布的日志消息将广播到所有接收方

    生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视

    频通知

    img

    上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)

    P生产者发送信息给X路由,X将信息转发给绑定X的队列

    img

    X队列将信息通过信道发送给消费者,从而进行消费

    整个过程,必须先创建路由

    路由在生产者程序中创建

    因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没

    有队列,路由并不知道将信息发送给谁

    运行程序的顺序:

    1.先运行生产者sender;创建路由

    2在运行消费者Recer1、Recer2。

    生产者代码

    package publishSubscribeMode;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtil;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 07 14:11
     * 发布订阅模式
     */
    public class Sender {
        public static void main(String[] args) throws Exception {
            // 1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在连接中创建信道
            Channel channel = connection.createChannel();
            // 3.声明路由(4种路由)
            // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
            channel.exchangeDeclare("test_exchange_fanout","fanout");
            // 4.发布消息
            String msg = "这是发布订阅模式的fanout路由模型的消息";
            channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());
            System.out.println("发送:"+msg);
            // 5.释放资源
            channel.close();
            connection.close();
        }
    }
    

    消费者1代码

    package publishSubscribeMode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 07 15:29
     * 发布订阅模式-消费者1
     */
    public class Recer1 {
        public static void main(String[] args)throws Exception{
            // 1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在链接中创建信道
            Channel channel = connection.createChannel();
            //2.1声明队列
            channel.queueDeclare("test_exchange_fanout_queue1",false,false,false,null);
            //2.2绑定路由
            channel.queueBind("test_exchange_fanout_queue1","test_exchange_fanout","");
            // 3.定义内部类重写方法接收消息\从信道中获得信息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body);
                    System.out.println("[消费者1] = "+ s);
                }
            };
            // 4.监听队列 true:自动消息确认
            channel.basicConsume("test_exchange_fanout_queue1",true,consumer);
    
        }
    }
    

    消费者2代码

    package publishSubscribeMode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 07 15:29
     * 发布订阅模式-消费者2
     */
    public class Recer2 {
        public static void main(String[] args)throws Exception{
            // 1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在链接中创建信道
            Channel channel = connection.createChannel();
            //2.1声明队列
            channel.queueDeclare("test_exchange_fanout_queue2",false,false,false,null);
            //2.2绑定路由
            channel.queueBind("test_exchange_fanout_queue2","test_exchange_fanout","");
            // 3.定义内部类重写方法接收消息\从信道中获得信息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body);
                    System.out.println("[消费者2] = "+ s);
                }
            };
            // 4.监听队列 true:自动消息确认
            channel.basicConsume("test_exchange_fanout_queue2",true,consumer);
    
        }
    }
    

    2.4 路由模式

    img

    • 路由会根据类型进行定向分发消息给不同的队列,如图所示

    生产者代码

    package routerMode;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtil;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 09 14:38
     * 路由模式-生产者
     * 路由会根据类型进行定向分发消息给不同的队列
     */
    public class Sender {
        public static void main(String[] args) throws Exception {
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在连接中创建信道
            Channel channel = connection.createChannel();
            //3.路由声明(路由名,路由类型)
            //direct:会根据路由键进行定向分发消息
            channel.exchangeDeclare("router_exchange_direct","direct");
            //4.向队列发送信息
            for(int i = 0; i < 10 ; i++){
                String msg = "查询的数据第"+i+"条消息!";
                //selecte:此时该参数为路由键不是队列名
                channel.basicPublish("router_exchange_direct","selecte",null,msg.getBytes());
                System.out.println("生产者发送="+msg);
            }
            //5.释放连接
            channel.close();
            connection.close();
        }
    }
    

    消费者1代码

    package routerMode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 09 15:05
     */
    public class Recer1 {
        public static void main(String[] args) throws Exception {
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在链接中创建信道
            Channel channel = connection.createChannel();
            //2.1声明队列
            channel.queueDeclare("exchange_direct_queue1",false,false,false,null);
            //2.2绑定路由(如果路由键的类型是插入、修改绑定到这个队列1上)
            channel.queueBind("exchange_direct_queue1","router_exchange_direct","insert");
            channel.queueBind("exchange_direct_queue1","router_exchange_direct","update");
            //3.定义内部类获取信息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String res = new String(body);
                    System.out.println("【消费者1】="+ res);
                }
    
            };
            //4.监听队列 true:自动消息确认
            channel.basicConsume("exchange_direct_queue1",true,consumer);
    
        }
    }
    

    消费者2代码

    package routerMode;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 09 15:05
     */
    public class Recer2 {
        public static void main(String[] args) throws Exception {
            //1.获取连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在链接中创建信道
            Channel channel = connection.createChannel();
            //2.1声明队列
            channel.queueDeclare("exchange_direct_queue2",false,false,false,null);
            //2.2绑定路由(如果路由键的类型是查询绑定到这个队列2上)
            channel.queueBind("exchange_direct_queue2","router_exchange_direct","selecte");
            //3.定义内部类获取信息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String res = new String(body);
                    System.out.println("【消费者2】="+ res);
                }
    
            };
            //4.监听队列 true:自动消息确认
            channel.basicConsume("exchange_direct_queue2",true,consumer);
    
        }
    }
    

    运行顺序

    1. 记住运行程序的顺序,先运行一次sender(创建路由器),

    2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定

    3. 再次运行sender,发出消息

    消费者接收信息由路由键确定,当生产者发送信息到路由上时,因为指定了路由键,所以消息会被指派到对应的消息队列里。消费者通过队列绑定路由接收到不同路由键的消息。

    //selecte:此时该参数为路由键不是队列名 
    channel.basicPublish("router_exchange_direct","selecte",null,msg.getBytes());
     //2.2绑定路由(如果路由键的类型是查询绑定到这个队列2上)
     channel.queueBind("exchange_direct_queue2","router_exchange_direct","selecte");
    

    2.5 通配符模式

    img

    和路由模式90%是一样的。

    唯独的区别就是路由键支持模糊匹配

    匹配符号

    *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)

    #:匹配0个或更多个词

    看一下官网案例:

    Q1绑定了路由键 .orange. Q2绑定了路由键 ..rabbit 和 lazy.#

    下面生产者的消息会被发送给哪个队列?

    quick.orange.rabbit 	# Q1 Q2
    lazy.orange.elephant 	# Q1 Q2
    quick.orange.fox 		# Q1
    lazy.brown.fox 			# Q2
    lazy.pink.rabbit 		# Q2
    quick.brown.fox			# 无
    orange 					# 无
    quick.orange.male.rabbit # 无
    

    生产者代码

    package topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import util.ConnectionUtil;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 14 14:03
     */
    public class Sender {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明路由(路由名,路由类型)
            // topic:模糊匹配的定向分发
            channel.exchangeDeclare("test_exchange_topic", "topic");
            String msg = "商品降价";
            channel.basicPublish("test_exchange_topic", "produce.price.test", null,
                    msg.getBytes());
            System.out.println("[用户系统]:" + msg);
            channel.close();
            connection.close();
        }
    }
    

    消费者1代码

    package topic;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 14 14:10
     */
    public class Recer1 {
        public static void main(String[] args) throws Exception {
            //1.建立连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在链接中建立信道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare("test_exchange_topic_queue1",false,false,false,null);
            //4.绑定路由
            channel.queueBind("test_exchange_topic_queue1","test_exchange_topic","user.*");
            //5.定义内部类接收消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println("[消费者1]接收到的信息:"+msg);
                }
    
            };
            //6.监听队列 true:消息自动确认
            channel.basicConsume("test_exchange_topic_queue1", true,consumer);
        }
    }
    

    消费者2代码

    package topic;
    
    import com.rabbitmq.client.*;
    import util.ConnectionUtil;
    
    import java.io.IOException;
    
    /**
     * @author WeiHong
     * @date 2021 -  09 - 14 14:10
     */
    public class Recer2 {
        public static void main(String[] args) throws Exception {
            //1.建立连接
            Connection connection = ConnectionUtil.getConnection();
            //2.在链接中建立信道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare("test_exchange_topic_queue2",false,false,false,null);
            //4.绑定路由
            channel.queueBind("test_exchange_topic_queue2","test_exchange_topic","produce.#");
            //5.定义内部类接收消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println("[消费者2]接收到的信息:"+msg);
                }
    
            };
            //6.监听队列 true:消息自动确认
            channel.basicConsume("test_exchange_topic_queue2", true,consumer);
        }
    }
    

    几种常见路由件说明:

    1. 发布/订阅模式

    *// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
    *channel.exchangeDeclare(“test_exchange_fanout”,“fanout”);

    1. 路由模式

    *//direct:会根据路由键进行定向分发消息
    *channel.exchangeDeclare(“router_exchange_direct”,“direct”);

    1. 通配符模式

    *// topic:模糊匹配的定向分发
    *channel.exchangeDeclare(“test_exchange_topic”, “topic”);

    🍎姓名:程序员阿红🍎
    🍊喜欢:Java编程🍊
    🍉重要的事情说三遍!!!🍉
    🍓欢迎大家关注哦,互相学习🍓
    🍋欢迎大家访问哦,互相学习🍋
    🍑欢迎大家收藏哦,互相学习🍑
    ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨
    展开全文
  • RabbitMQ种工作模式

    千次阅读 2021-05-21 17:12:30
    RabbitMQ种工作模式 1、简单队列 一个生产者对应一个消费者!! 2、work 模式 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!! 轮询分发就是将消息队列中的消息,依次发送给所有消费者。...

    RabbitMQ五种工作模式

    1、简单队列

    在这里插入图片描述
    一个生产者对应一个消费者!!

    2、work 模式

    在这里插入图片描述
    一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!
    轮询分发就是将消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。

    3、发布/订阅模式

    在这里插入图片描述
    一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

    ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
      两个消费者获得了同一条消息。即就是,一个消息从交换机同时发送给了两个队列中,监听这两个队列的消费者消费了这个消息;
    如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。

    4、路由模式

    在这里插入图片描述
    生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

    也就是让消费者有选择性的接收消息。
    路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

    5、主题模式

    在这里插入图片描述
      上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

    符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
      与路由模式相似,但是,主题模式是一种模糊的匹配方式。

    6.工作模式总结

    ​ 这五种工作模式,可以归为三类:

    生产者,消息队列,一个消费者;
    生产者,消息队列,多个消费者;
    生产者,交换机,多个消息队列,多个消费者;

    7、四种交换器

    1、direct 如果路由键完全匹配的话,消息才会被投放到相应的队列。

    2、fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

    3、topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

    ​ 4、header headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器

    展开全文
  • rabbitmq种工作模式

    千次阅读 2022-03-05 08:05:32
    2、工作模式(资源争抢) 1 生产者将消息交个交换机 2 交换机交给绑定的队列 3 队列由多个消费者同时监听,只有其中一个能够获取这一条消息,形成了资源的争抢,谁的资源空闲大,争抢到的可能越大; 应用场景:抢红包,大型...
  • RabbitMQ几种工作模式

    2021-04-11 14:34:55
    在RabbitMQ入门中我们提到RabbitMQ几种不同的通讯方式,通过Springboot实际操作一下。 pom <!-- com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <...
  • RabbitMQ 的六种工作模式

    千次阅读 2022-04-11 00:36:27
    RabbitMQ提供了6消息模型,但是第6其实是RPC,并不是MQ,因此不予学习。那么也就剩下5。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。1.1 simple简单模式消息产生者将消息放入队列消息的...
  • RabbitMQ的六种工作模式一、simple简单模式二、work工作模式(资源的竞争)三、publish/subscribe发布订阅(共享资源)四、routing路由模式五、topic 主题模式(路由模式的一种)六、远程过程调用(RPC)总结 基于erlang...
  • RabbitMQ几种工作模式

    万次阅读 2017-04-29 22:12:23
    maven: <!-- RabbitMQ的客户端 --> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client <version>3.4.1</vers
  • RabbitMQ的6种工作模式详解

    千次阅读 2020-10-28 11:42:29
    目录RabbitMQ几种工作模式1.Work queues代码实例1.生产者1.application.yml2.RabbitMqConfig3.TestSend2.消费者1.配置相同,config相同2.RabbitMqListen监听获取消息3.运行1.Publish发布订阅模式代码实例1.生产者1....
  • 下面小编就为大家带来一篇基于RabbitMQ几种Exchange 模式详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • RabbitMQ几种工作模式介绍

    千次阅读 2019-01-10 14:43:07
    1.RabbitMQ MQ:全称为message queue,即消息队列。它是由erlang语言开发,基于AMQP(Advanced Message Queue ...2.工作原理 RabbitMQ的基本结构 组成部分说明: Producer 消息生产者,即生产方客户端,将...
  • Rabbitmq几种工作模式介绍

    千次阅读 2020-10-28 18:09:46
    上一篇文章对mq做了简介,并且说了rabbitmq的六种模式,这篇文章主要记录一下这几种模式是什么样的,以及怎么用的。第一种不再说,跟简单。 一下几种模式均有对应的案列代码供参考,请看...
  • 简单模式 特点:无交换机转发数据(使用默认交换机名""),生产者直接将数据发送至队列,单个消费者消费指定队列的消息。 开发步骤 1.创建连接MQ的连接工厂对象 ConnectionFactory connectionFactory = new ...
  • 一、Maven依赖添加 com.rabbitmq amqp-client 3.0.4 二、七种工作模式的java实例 1、简单模式 最简单的一个消费者和一个生产者模式,生产者生成消息,消费者监听消息,若是消费者监听到它所需要的消息,就会消费该...
  • beego - rabbitmq简单模式和工作模式 demo
  • RabbitMQ系列之三 RabbitMQ几种典型模式

    万次阅读 多人点赞 2018-11-15 22:40:55
    本文详细介绍简单模式Simple、工作模式Work、发布订阅模式Publish/Subscribe、路由模式Routing、通配符模式Topics、远程调用模式RPC(区别于普通的消息队列的范畴了,所以这里暂不对该队列模式进行详解) 模式1:简单...
  • RabbitMQ的五种工作模式超详解

    千次阅读 2022-05-31 10:48:22
    一、简单模式。二、Work。三、Routing模式。... 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)6. 创建队列queue7. 设置发送内容,使用channal.basicPublish()发送8. 释放资源
  • 文章目录.net RabbitMQ几种工作模式"Hello World"demo"Work queues"demo"Publish/Subscribe"demo"Routing"demo"Topics"demo .net RabbitMQ几种工作模式 查看官网https://www.rabbitmq.com/getstarted.html可以...
  • RabbitMQ几种工作模式(转)

    千次阅读 2018-11-29 18:17:28
    RabbitMQ有以下几种工作模式 : Work queues Publish/Subscribe Routing Topics Header RPC Work queues work queues两个消费端共同消费同一个队列中的消息。 应用场景:对于 任务过重或任务较多情况使用工作队列...
  • rabbitmq官方的六种工作模式

    万次阅读 多人点赞 2018-09-04 14:58:21
    1.RabbitMq 1.1介绍 RabbitMQ是一个消息代理:它接受并转发消息。你可以把它当成一个邮局:当你想邮寄信件的时候,你会把信件放在投递箱中,并确信邮递员最终会将信件送到收件人的手里。在这个例子中,RabbitMQ就...
  • RabbitMQ的五种工作模式及实例使用场景 1. 简单模式 1)、做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在...
  • RabbitMQ几种工作模式介绍 目录一、工作队列模式——work queue消息确认公平派遣消息持久化二、订阅模式——Publish/Subscribe交换器与队列的绑定三、路由模式——Routing直接交换多重绑定四、通配符模式——Topic五...
  • RabbitMQ六大工作模式

    2022-07-10 13:02:01
    rabbitmq工作模式
  • 聊一聊RabbitMQ种工作模式与应用场景

    万次阅读 多人点赞 2021-09-21 11:20:30
    今天我们来聊一聊 RabbitMQ工作模式与其对于的应用场景有哪些。 你可能会疑惑,作为 MQ 不就是生产者将消息发送到 MQ ,再讲消息发送到消费者哪里,任务不就完成了吗? 其实,不是这样的,MQ 的使用会根据业务...
  • 2.Fanout 这种交换机类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 这种交换机类型来进行替换,这种交换机类型的工作方式是消息只去到它绑定的 routingKey 队列中去 ...
  • RabbitMQ 工作模式介绍
  • 简介 今天我们来聊一聊...在说六中工作模式前,需要先了解一下 RabbitMQ 的基本组件与概念,这样才能更好的学习 RabbitMQ工作模式。 Producer 生产者,消息的提供者 Consumer 消费者,消息的使用者 Br..

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 28,976
精华内容 11,590
关键字:

rabbitmq几种工作模式