精华内容
下载资源
问答
  • RabbitMq订阅模式

    2019-08-14 01:11:47
    RabbitMQ 工作原理 对于 RabbitMQ 来说, 除了这三个基本模块以外, 还添加了一个模块, 即交换机(Exchange). 它使得生产者和消息队列之间产生了隔离, 生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息...

    RabbitMQ 工作原理

    对于 RabbitMQ 来说, 除了这三个基本模块以外, 还添加了一个模块, 即交换机(Exchange). 它使得生产者和消息队列之间产生了隔离, 生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列. 那么 RabitMQ 的工作流程如下所示:

    说一下交换机: 交换机的主要作用是接收相应的消息并且绑定到指定的队列. 交换机有四种类型, 分别为Direct, topic, headers, Fanout.

    Direct 是 RabbitMQ 默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个 BindingKey. 当发送者发送消息的时候, 指定对应的 Key. 当 Key 和消息队列的 BindingKey 一致的时候,消息将会被发送到该消息队列中.

    topic 转发信息主要是依据通配符, 队列和交换机的绑定主要是依据一种模式(通配符+字符串), 而当发送消息的时候, 只有指定的 Key 和该模式相匹配的时候, 消息才会被发送到该消息队列中.

    headers 也是根据一个规则进行匹配, 在消息队列和交换机绑定的时候会指定一组键值对规则, 而发送消息的时候也会指定一组键值对规则, 当两组键值对规则相匹配的时候, 消息会被发送到匹配的消息队列中.

    Fanout 是路由广播的形式, 将会把消息发给绑定它的全部队列, 即便设置了 key, 也会被

     

     

    1.生产者

    package cn.zhm.util.simple;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    
    public class SendWork {
        /**
         * @Description:    java类作用描述
    
         * @Author:         zhaohaiming
    
         * @CreateDate:     2019/8/14 0:18
    
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "test_work_name";
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
        try {
            //获取连接
            connection = GetRabbitConnectUtil.getMQConnection();
            //从连接中获取一个通道
            channel =  connection.createChannel();
    
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            for (int  i = 0 ;i <50;i++){
                //发送信息内容
                String msg = "hello work!!"+i;
                System.out.println("SendWork send msg信息:"+msg);
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
                Thread.sleep(300);
    
            }
    
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            channel.close();
            connection.close();
        }
        }
    }
    

    消费者

    package cn.zhm.util.simple;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceptionWork {
        /**
         * @Description:    消费者接收信息
    
         * @Author:         zhaohaiming
    
         * @CreateDate:     2019/8/11 21:58
    
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "test_work_name";
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
    
               //创建队列声明
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                //获取消息
    
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      //  super.handleDelivery(consumerTag, envelope, properties, body);
                        String smg = new String(body,"utf-8");
                        System.out.println("消费者接收消息:"+smg);
    
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            System.out.println("is down");
                        }
                    }
                };
    
                //监听队列
                channel.basicConsume(QUEUE_NAME,true,consumer);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
               // channel.close();
               // connection.close();
            }
    
        }
    }
    
    //监听队列
    channel.basicConsume(QUEUE_NAME,true,consumer);

    ture :(自动确认模式) 一旦rabbitmq将消息分发给消费者,就会从内容中删除,

    这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息

     

    flase :(手动模式),如果 有一个消费者挂掉,就会交付给其他消费者。rabbitMq支持消应答,消费者发送一个消息应答,告诉rabitMq这个消息我已经处理完,你可以删除,,然后rabbitmq就删除内存中的消息

    默认是flase

    消息的持久

    //创建队列声明
      boolean durable = false;
      channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
    已经设置durable=false 不能再修改成为ture,尽管代码是正确的,他也不会运行成功,因为我们已经定义了一个 test_work_name这个queue是未持久化rabbitmq不准许重新定义(不同参数)一个已经存在的队列。

    订阅模型

    解读

    1.一个生产者,多个消费者

    2.每一个消费者都的自己的队列

    3.生产者没有直接把消息发送到队列,而是发到了交换机转发器到exchanger

    4.每个队列都是要在绑定到交换器上

    5.生产者发送的消息经过交换机,到达队列  一个消息对应多个消费者消费

    注册  -》 邮件 ——》短信

    生产者

    package cn.zhm.util.su;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
        /**
         * @Description:    订阅 生产者
    
         * @Author:         zhaohaiming
    
         * @CreateDate:     2019/8/15 0:42
    
         * @Version:        1.0
    
         */
        private static final String EXCHANGE_NAME = "exchange_test";
        public static void main(String[] args) throws IOException, TimeoutException {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//fanout 分发 广播形式
                String message = "Hello wordle exchang....";
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                channel.close();
                connection.close();
            }
    
        }
    }
    

    消息到那去了 ?? 丢失了 因为交换机没有存储的能力,在rabbitmq里面只有队列有存储能力。

    因为这个时候还没有队列绑定到这个交换机所以数据丢失了

     

    消费者1

    package cn.zhm.util.su;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Reception {
        /**
         * @Description:    订阅 消费者
    
         * @Author:         zhaohaiming
    
         * @CreateDate:     2019/8/15 0:59
    
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "test_sub_name_info";
        private static final String EXCHANGE_NAME = "exchange_test";
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
    
                //创建队列声明
                boolean durable = false;
                channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
    
                //绑定交换机
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
                //获取消息
    
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //  super.handleDelivery(consumerTag, envelope, properties, body);
                        String smg = new String(body,"utf-8");
                        System.out.println("test_sub_name_info_消费者接收消息:"+smg);
    
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            System.out.println("test_sub_name_info is down");
                        }
                    }
                };
    
                //监听队列
                channel.basicConsume(QUEUE_NAME,true,consumer);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                // channel.close();
                // connection.close();
            }
        }
    }
    

    运行结果:

    test_sub_name_info_消费者接收消息:Hello wordle exchang....
    test_sub_name_info is down
    

    消费者2

    package cn.zhm.util.su;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceptionEmail {
        /**
         * @Description:    订阅 消费者
    
         * @Author:         zhaohaiming
    
         * @CreateDate:     2019/8/15 0:59
    
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "test_sub_name_email";
        private static final String EXCHANGE_NAME = "exchange_test";
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
    
                //创建队列声明
                boolean durable = false;
                channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
    
                //绑定交换机
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
                //获取消息
    
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //  super.handleDelivery(consumerTag, envelope, properties, body);
                        String smg = new String(body,"utf-8");
                        System.out.println("test_sub_name_email_消费者接收消息:"+smg);
    
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            System.out.println("test_sub_name_email is down");
                        }
                    }
                };
    
                //监听队列
                channel.basicConsume(QUEUE_NAME,true,consumer);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                // channel.close();
                // connection.close();
            }
        }
    }
    

    运行结果:

    test_sub_name_email_消费者接收消息:Hello wordle exchang....
    test_sub_name_email is down
    

    6.Exchange(交换机,转发器)

    一方面是接收生产的消息,另一方面是向队列推送消息

    匿名转发: “”

    Fanout(不处理路由键)

    Direct (处理路由键)

    路由模式

    模型

    生产者:

    package routing;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    
    /**
     * @description: 路由模式发送者
     * @author: zhaohaiming
     * @Version V1.0.0
     * @create: 2019-08-17 00:11
     **/
    public class Send {
        /**
         * @Description: java类作用描述
         * @Author: zhaohaiming
         * @CreateDate: 2019/8/17 0:13
         * @Version: 1.0
         */
        private static final String QUEUE_NAME = "test_rout_name";
        private static final String EXCHANF_NAME = "exchan_name";
    
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection = GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel = connection.createChannel();
    
                //Exchang
                channel.exchangeDeclare(EXCHANF_NAME, "direct");//处理路由键
                String smg = "send diret megges!";
                String rountKey = "error";//路由key
    
                //发送信息
                channel.basicPublish(EXCHANF_NAME,rountKey,null,smg.getBytes());
    
                System.out.println("send===>"+smg);
    
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                channel.close();
                connection.close();
            }
    
        }
    }

    消费者1

    package routing;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * @description: 路由模式消费者
     * @author: zhaohaiming
     * @Version V1.0.0
     * @create: 2019-08-17 00:48
     **/
    public class Reception {
        /**
         * @Description:    java类作用描述
         * @Author:         zhaohaiming
         * @CreateDate:     2019/8/17 0:49
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "queue_rout_name";
        private static final String EXCHANF_NAME = "exchan_name";
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
    
                //创建队列声明
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                //交换机与路由绑定
                channel.queueBind(QUEUE_NAME,EXCHANF_NAME,"error");
    
                channel.basicQos(1);
    
                //获取消息
                final Channel finalChannel = channel;
                DefaultConsumer consumer = new DefaultConsumer(finalChannel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //  super.handleDelivery(consumerTag, envelope, properties, body);
                        String smg = new String(body,"utf-8");
                        System.out.println("消费者接收消息:"+smg);
    
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            System.out.println("1111is down");
                            finalChannel.basicAck(envelope.getDeliveryTag(),false);
                        }
                    }
                };
    
                //监听队列
                boolean auteAck = false ; //自动回答消息
                channel.basicConsume(QUEUE_NAME,auteAck,consumer);
    
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    消费者2

    package routing;
    
    import cn.zhm.util.GetRabbitConnectUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * @description: 路由模式消费者
     * @author: zhaohaiming
     * @Version V1.0.0
     * @create: 2019-08-17 00:48
     **/
    public class Reception2 {
        /**
         * @Description:    java类作用描述
         * @Author:         zhaohaiming
         * @CreateDate:     2019/8/17 0:49
         * @Version:        1.0
    
         */
        private static final String QUEUE_NAME = "queue_rout_name2";
        private static final String EXCHANF_NAME = "exchan_name";
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Connection connection = null;
            Channel channel = null;
            try {
                //获取一个连接
                connection =  GetRabbitConnectUtil.getMQConnection();
                //从连接中获取一个通道
                channel =  connection.createChannel();
    
                //创建队列声明
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                //交换机与路由绑定
                channel.queueBind(QUEUE_NAME,EXCHANF_NAME,"error");
                channel.queueBind(QUEUE_NAME,EXCHANF_NAME,"info");
                channel.queueBind(QUEUE_NAME,EXCHANF_NAME,"warning");
    
                channel.basicQos(1);
    
                //获取消息
                final Channel finalChannel = channel;
                DefaultConsumer consumer = new DefaultConsumer(finalChannel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //  super.handleDelivery(consumerTag, envelope, properties, body);
                        String smg = new String(body,"utf-8");
                        System.out.println("消费者接收消息:"+smg);
    
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            System.out.println("22222is down");
                            finalChannel.basicAck(envelope.getDeliveryTag(),false);
                        }
                    }
                };
    
                //监听队列
                boolean auteAck = false ; //自动回答消息
                channel.basicConsume(QUEUE_NAME,auteAck,consumer);
    
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    运行结果

    当发送是 error 时 消费1 和消费2都能接收信息

    发送者发送信息:

    send===>send diret megges!

    消费1 收到消息:
    消费者接收消息:send diret megges!
    1111is down

    消费2 收到消息:

    消费者接收消息:send diret megges!
    22222is down

     

    当发送是 info 时 只有消费2能接收信息,消费1收不消息

     

    展开全文
  • RabbitMQ 订阅模式

    2019-01-08 14:52:29
    1.订阅模式模型 a) 一个生产者 多个消费者 b) 每个消费者都有自己的队列 c) 生产者没有直接把消息发送给队列,而是先发送给交换机exchange d) 每个队列都要绑定到交换机上 e) 生产者发送的消息是经过交换机的...

    1.订阅模式模型

    这里写图片描述

    a) 一个生产者 多个消费者
    b) 每个消费者都有自己的队列
    c) 生产者没有直接把消息发送给队列,而是先发送给交换机exchange
    d) 每个队列都要绑定到交换机上
    e) 生产者发送的消息是经过交换机的,然后到达队列,就能实现一个消息被多个消费者消费

    应用场景:
    比如 注册之后需要发送邮件 同时需要发送短信
    a) 一个生产者 多个消费者
    b) 每个消费者都有自己的队列
    c) 生产者没有直接把消息发送给队列,而是先发送给交换机exchange
    d) 每个队列都要绑定到交换机上
    e) 生产者发送的消息是经过交换机的,然后到达队列,就能实现一个消息被多个消费者消费 应用场景: 比如 注册之后需要发送邮件 同时需要发送短信

    生产者

    
    package com.ithzk.rabbitmq.ps;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class Send {
    
        private final static String EXCHANGER_NAME = "test_exchange_fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            // 分发
            channel.exchangeDeclare(EXCHANGER_NAME,"fanout");
    
            String msg = "hello exchange";
    
            channel.basicPublish(EXCHANGER_NAME,"",null,msg.getBytes());
    
            System.out.println("Send msg"+msg);
    
            channel.close();
            connection.close();
        }
    
    }
    

    1消息丢失了,因为交换机没有存储的能力,rabbitMQ中只有队列有存储能力,此时还没有队列绑定到该交换机上,所以数据丢失了。
    

    消费者1

    package com.ithzk.rabbitmq.ps;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import javax.sound.midi.Soundbank;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class Recv1 {
    
        private static final String QUEUE_NAME="test_queue_fanout_email";
        private final static String EXCHANGER_NAME = "test_exchange_fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取连接
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //从连接中获取频道
            final Channel channel = connection.createChannel();
    
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //绑定队列到交换机 转发器
            channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,"");
    
            //保证一次只发一个
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg:" + msg);
    
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
           boolean autoAck = false;
    
           channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
           System.out.println("[Consumer 1 start]");
    
    
        }
    
    
    }
    

    消费者2

    package com.ithzk.rabbitmq.ps;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class Recv2 {
    
        private static final String QUEUE_NAME="test_queue_fanout_sms";
        private final static String EXCHANGER_NAME = "test_exchange_fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取连接
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //从连接中获取频道
            final Channel channel = connection.createChannel();
    
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //绑定队列到交换机 转发器
            channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,"");
    
            //保证一次只发一个
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg:" + msg);
    
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
           boolean autoAck = false;
    
           channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
           System.out.println("[Consumer 2 start]");
    
    
        }
    
    
    }
    

    在这里插入图片描述
    和交换机绑定的队列都会收到消息

    转载至:https://blog.csdn.net/u013985664/article/details/79512747

    展开全文
  • rabbitmq订阅模式

    2019-06-24 12:46:00
    https://www.cnblogs.com/wy697495/p/9614005.html 转载于:https://www.cnblogs.com/hshy/p/11076435.html

    https://www.cnblogs.com/wy697495/p/9614005.html

    转载于:https://www.cnblogs.com/hshy/p/11076435.html

    展开全文
  • 7_rabbitmq订阅模式 PublishSubscribe

    千次阅读 2018-03-19 07:37:19
    rabbitmq订阅模式 PublishSubscribe更多干货分布式实战(干货)spring cloud 实战(干货)mybatis 实战(干货)spring boot 实战(干货)React 入门实战(干货)构建中小型互联网企业架构(干货)python 学习持续...

    rabbitmq订阅模式 PublishSubscribe

    更多干货

    例子代码地址

    模型图

    我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

    image

    • 在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
    • 这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

    举列:

    类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

    那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,pb 模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息

    解读:

    • 1、1 个生产者,多个消费者
    • 2、每一个消费者都有自己的一个队列
    • 3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
    • 4、每个队列都要绑定到交换机
    • 5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

    注册完 发短信 发邮件

    生产者

    后台注册 ->邮件->短信

    public class Send {
    	 private final static String EXCHANGE_NAME = "test_exchange_fanout";
    	 public static void main(String[] argv) throws Exception {
    		 // 获取到连接以及mq通道
    		 Connection connection = ConnectionUtils.getConnection();
    		 Channel channel = connection.createChannel();
    		 // 声明exchange 交换机 转发器
    		 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂
    		 // 消息内容
    		 String message = "Hello PB";
    		 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    		 System.out.println(" [x] Sent '" + message + "'");
    		 channel.close();
    		 connection.close();
    	 }
    }
    

    那么先看一下控制台 是不是有这个交换机

    image

    但是这个发送的消息到哪了呢? 消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的能力.因为这时还没有队列,所以就会丢失;

    小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!

    那么我们再来写消费者

    消费者 1

    邮件发送系统

    public class Recv {
    	private final static String QUEUE_NAME = "test_queue_fanout_email";
    	private final static String EXCHANGE_NAME = "test_exchange_fanout";
    	public static void main(String[] argv) throws Exception {
    		// 获取到连接以及mq通道
    		Connection connection = ConnectionUtils.getConnection();
    		final Channel channel = connection.createChannel();
    		// 声明队列
    		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    		// 绑定队列到交换机
    		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    		//------------下面逻辑和work模式一样-----
    		// 同一时刻服务器只会发一条消息给消费者
    		channel.basicQos(1);
    		// 定义一个消费者
    		Consumer consumer = new DefaultConsumer(channel) {
    			// 消息到达 触发这个方法
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    			BasicProperties properties, byte[] body) throws IOException {
    				String msg = new String(body, "utf-8");
    				System.out.println("[1] Recv msg:" + msg);
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				} finally {
    					System.out.println("[1] done ");
    					// 手动回执
    					channel.basicAck(envelope.getDeliveryTag(), false);
    				}
    			}
    		};
    		boolean autoAck = false;
    		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    	}
    }
    

    消费者 2

    类似短信发送系统

    public class Recv2 {
    
    	private final static String QUEUE_NAME = "test_queue_fanout_2";
    	private final static String EXCHANGE_NAME = "test_exchange_fanout";
    	
    	public static void main(String[] argv) throws Exception {
    		// 获取到连接以及mq通道
    		Connection connection = ConnectionUtils.getConnection();
    		final Channel channel = connection.createChannel();
    		// 声明队列
    		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    		// 绑定队列到交换机
    		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    		// 同一时刻服务器只会发一条消息给消费者
    		// 定义一个消费者
    		Consumer consumer = new DefaultConsumer(channel) {
    			// 消息到达 触发这个方法
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    			BasicProperties properties, byte[] body) throws IOException {
    				String msg = new String(body, "utf-8");
    				System.out.println("[2] Recv msg:" + msg);
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				} finally {
    					System.out.println("[2] done ");
    					// 手动回执
    					channel.basicAck(envelope.getDeliveryTag(), false);
    				}
    			}
    		};
    		boolean autoAck = false;
    		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    	}
    }

    测试

    一个消息 可以被多个消费者获取 image

    展开全文
  • 背景: 为了减轻服务器的压力,现在原有项目的基础上集成消息队列来异步处理消息! 此项目是企业真实需求,项目的代码属于线上生产代码,直接用于...文章目录一、RabbitMQ 订阅模式快速入门1. RabbitMQ 订阅模式简述...
  • 订阅模式 1.生产者代码示例 public class publish_subscribe_producer { // 创建一个名为QUEUE_NAME的队列,防止队列不存在 private final static String QUEUE_NAME = "csl"; // 创建一个交换机 private final...
  • 接上一篇:(企业内部需求实战_进阶_06)SSM集成RabbitMQ 订阅模式 关键代码讲解、开发、测试 https://gblfy.blog.csdn.net/article/details/104219096 此项目采用MQ发送消息模式为:订阅模式,如果对RabbitMQ不...
  •  rabbitMQ中生产者声明的交换机中绑定了两个消费者的对列,当生产者向交换机中发送消息的时候,交换机会将消息分别发往两个队列,那么两个消费者就能消费相同数据。 源码地址:https://github.com/Carlutf8/rabbitMQ...
  • 这个项目采用的RabbitMQ订阅模式Topic,生产者发送消息到交换机中,消费者进行队列声明或者创建,然后,在交换机中进行队列绑定交换机操作。 项目源码(企业实战): https://github.com/gb-heima/order 切换到...
  • RabbitMQ订阅模式

    2018-08-15 21:38:40
    订阅模式 一个生产者,多个消费者 每一个消费者,都有一个独立的队列 生产者没有将消息直接发送到队列,而是发送到了交换机 每个队列都要绑定到交换机 生产者发送的消息,经过交换机,到达队列 实现,一个...
  • RabbitMQ订阅模式

    2019-06-05 10:53:19
    RabbitMQ订阅模式简介 一个生产者,多个消费者 每一个消费者都有自己的队列 生产者没有直接将消息发送到队列中去,而是发送到了交换机或者转发器(exchange)中 每个队列都要绑定到交换机上 生产者发送的消息,...
  • 发布订阅模式下创建RabbitMq实例 发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空 rabbitmq服务封装 package rabbitmq import ( "fmt" "github.com/pkg/errors" "github....
  • RabbitMQ-订阅模式

    2018-09-28 21:37:38
    学习完《RabbitMQ-简单队列》和《RabbitMQ-Work模式》,今天我们一起学习订阅模式。如下图所示: 解读: 1、1个生产者,多个消费者。 2、每一个消费者都有自己的一个队列。 3、生产者没有将消息直接发送到队列...
  • RabbitMQ发布订阅模式

    2021-05-05 20:49:34
    X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费 相关场景:邮件群发,群...
  • rabbitMQ订阅收发.zip

    2021-01-26 16:26:58
    里面有两个程序,主要为了测试rabbitmq订阅模式的收发性能,还有订单和xsd文件
  • rabbitmq 发布订阅订阅模式 尝试使用fanout模式进行发送。 具体操作步骤如下 发送端 准备10条消息 ConnectionFactory connectionFactory = connectionFactory(); Connection connection = connectionFactory.new...
  • 发布订阅模式即向多个消费者传递同一条信息1).Exchanges 交换机RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的...
  • RabbitMq 消息订阅模式

    2021-03-31 11:36:50
    文章目录 Rabbit 概念 名词解释 发送方 订阅方 总结 Rabbit 概念 名词解释 名词解释系小米自己的理解。和书籍、网站等有所出入。仅代表本人理解的 生产者 是发送消息的 交换器 是负责转发消息的 队列 是存储消息的 ...
  • 发布订阅模式常用于一个通知通知给不同的消费者,消费者去队列里面进行不同的处理 这个生产者负责在mq上生成1个交换机和2个队列, 下面这句是对交换机进行声明并设置 一、channel.exchangeDeclare(EXCHANGE_...
  • Publish/Subscribe模式,也叫订阅模式,在该模式中,使用到了Exchanger(交换机),消息的传递过程发生了变化: Publisher(生产者):消息现在不直接发送到Queue中了,而是把消息发送给Exchanger Consumer(消费...
  • 2. 在 application.yml 中配置rabbitmq的 连接信息: 3.创建生产者 4.创建两个消费者 6.测试类中测试 结果 :控制台打印出消费者的log日志: 可见,两个消费者都收到了生产者发的5条消息 1.先加入maven依赖 ...
  • 发布订阅模式下创建RabbitMq实例 发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空 func NewRabbitMqPubSub(exchangeName string) *RabbitMq { //创建mq实例 rabbitmq := ...
  • C#RabbitMQ发布订阅模式配置

    千次阅读 2018-11-08 16:59:02
    首先我们需要搭建rabbitmq服务器来中转我们的消息。 1.环境搭建 1.1由于RabbitMQ使用Erlang语言编写,所有我们需要先安装Erlang环境(安装没什么难度直接双击运行就可以了) 1.2安装RabbitMQ服务端程序 以上...
  • RabbitMQ工作模式之Pub/Sub发布订阅模式 4.3.1 简介 在发布订阅模式中,Producer发送消息到指定的交换机(Exchange)中,由Exchange绑定不同的Queues,消费者依旧监听这些队列进行消费(work模式使用的是默认的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,575
精华内容 630
关键字:

rabbitmq订阅模式